From 345860c779e5edf544b3c6052eb244cc78706d71 Mon Sep 17 00:00:00 2001 From: Sebastian Serth Date: Sun, 17 Oct 2021 13:15:52 +0200 Subject: [PATCH] Adapt output buffering to Poseidon and DCP * Refactor flushing of messages * Introduce two separate buffers for stdout and stderr --- app/controllers/submissions_controller.rb | 13 ++++------ lib/runner/connection.rb | 31 +++++++++++++---------- lib/runner/strategy/poseidon.rb | 2 +- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/app/controllers/submissions_controller.rb b/app/controllers/submissions_controller.rb index f0ebd2ec..06456a77 100644 --- a/app/controllers/submissions_controller.rb +++ b/app/controllers/submissions_controller.rb @@ -96,14 +96,11 @@ class SubmissionsController < ApplicationController end client_socket.onmessage do |raw_event| - event = if raw_event == "\n" - # Obviously, this is just flushing the current connection. - # We temporarily wrap it and then forward the original event intentionally. - {cmd: 'result'} - else - # We expect to receive a JSON - JSON.parse(raw_event).deep_symbolize_keys - end + # Obviously, this is just flushing the current connection: Filtering. + next if raw_event == "\n" + + # Otherwise, we expect to receive a JSON: Parsing. + event = JSON.parse(raw_event).deep_symbolize_keys case event[:cmd].to_sym when :client_kill diff --git a/lib/runner/connection.rb b/lib/runner/connection.rb index 03368ab7..ab3f5279 100644 --- a/lib/runner/connection.rb +++ b/lib/runner/connection.rb @@ -22,7 +22,8 @@ class Runner::Connection @status = :established @event_loop = event_loop @locale = locale - @buffer = Buffer.new + @stdout_buffer = Buffer.new + @stderr_buffer = Buffer.new # For every event type of Faye WebSockets, the corresponding # RunnerConnection method starting with `on_` is called. @@ -83,14 +84,7 @@ class Runner::Connection def on_message(raw_event) Rails.logger.debug { "#{Time.zone.now.getutc}: Receiving from #{@socket.url}: #{raw_event.data.inspect}" } - @buffer.store raw_event.data - @buffer.events.each do |event_data| - forward_message event_data - end - end - - def forward_message(event_data) - event = decode(event_data) + event = decode(raw_event.data) return unless BACKEND_OUTPUT_SCHEMA.valid?(event) event = event.deep_symbolize_keys @@ -102,7 +96,6 @@ class Runner::Connection close(:error) end end - private :forward_message def on_open(_event) @start_callback.call @@ -112,7 +105,7 @@ class Runner::Connection def on_close(_event) Rails.logger.debug { "#{Time.zone.now.getutc}: Closing connection to #{@socket.url} with status: #{@status}" } - forward_message @buffer.flush unless @buffer.empty? + flush_buffers # Depending on the status, we might want to destroy the runner at management. # This ensures we get a new runner on the next request. @@ -149,11 +142,17 @@ class Runner::Connection end def handle_stdout(event) - @stdout_callback.call event[:data] + @stdout_buffer.store event[:data] + @stdout_buffer.events.each do |event_data| + @stdout_callback.call event_data + end end def handle_stderr(event) - @stderr_callback.call event[:data] + @stderr_buffer.store event[:data] + @stderr_buffer.events.each do |event_data| + @stderr_callback.call event_data + end end def handle_error(_event); end @@ -163,4 +162,10 @@ class Runner::Connection def handle_timeout(_event) @status = :timeout end + + def flush_buffers + @stdout_callback.call @stdout_buffer.flush unless @stdout_buffer.empty? + @stderr_callback.call @stderr_buffer.flush unless @stderr_buffer.empty? + end + private :flush_buffers end diff --git a/lib/runner/strategy/poseidon.rb b/lib/runner/strategy/poseidon.rb index 2c186e1f..10ce2a60 100644 --- a/lib/runner/strategy/poseidon.rb +++ b/lib/runner/strategy/poseidon.rb @@ -166,7 +166,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy end def encode(data) - data + "#{data}\n" end end end