Adapt output buffering to Poseidon and DCP
* Refactor flushing of messages * Introduce two separate buffers for stdout and stderr
This commit is contained in:
@ -96,14 +96,11 @@ class SubmissionsController < ApplicationController
|
|||||||
end
|
end
|
||||||
|
|
||||||
client_socket.onmessage do |raw_event|
|
client_socket.onmessage do |raw_event|
|
||||||
event = if raw_event == "\n"
|
# Obviously, this is just flushing the current connection: Filtering.
|
||||||
# Obviously, this is just flushing the current connection.
|
next if raw_event == "\n"
|
||||||
# We temporarily wrap it and then forward the original event intentionally.
|
|
||||||
{cmd: 'result'}
|
# Otherwise, we expect to receive a JSON: Parsing.
|
||||||
else
|
event = JSON.parse(raw_event).deep_symbolize_keys
|
||||||
# We expect to receive a JSON
|
|
||||||
JSON.parse(raw_event).deep_symbolize_keys
|
|
||||||
end
|
|
||||||
|
|
||||||
case event[:cmd].to_sym
|
case event[:cmd].to_sym
|
||||||
when :client_kill
|
when :client_kill
|
||||||
|
@ -22,7 +22,8 @@ class Runner::Connection
|
|||||||
@status = :established
|
@status = :established
|
||||||
@event_loop = event_loop
|
@event_loop = event_loop
|
||||||
@locale = locale
|
@locale = locale
|
||||||
@buffer = Buffer.new
|
@stdout_buffer = Buffer.new
|
||||||
|
@stderr_buffer = Buffer.new
|
||||||
|
|
||||||
# For every event type of Faye WebSockets, the corresponding
|
# For every event type of Faye WebSockets, the corresponding
|
||||||
# RunnerConnection method starting with `on_` is called.
|
# RunnerConnection method starting with `on_` is called.
|
||||||
@ -83,14 +84,7 @@ class Runner::Connection
|
|||||||
|
|
||||||
def on_message(raw_event)
|
def on_message(raw_event)
|
||||||
Rails.logger.debug { "#{Time.zone.now.getutc}: Receiving from #{@socket.url}: #{raw_event.data.inspect}" }
|
Rails.logger.debug { "#{Time.zone.now.getutc}: Receiving from #{@socket.url}: #{raw_event.data.inspect}" }
|
||||||
@buffer.store raw_event.data
|
event = decode(raw_event.data)
|
||||||
@buffer.events.each do |event_data|
|
|
||||||
forward_message event_data
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def forward_message(event_data)
|
|
||||||
event = decode(event_data)
|
|
||||||
return unless BACKEND_OUTPUT_SCHEMA.valid?(event)
|
return unless BACKEND_OUTPUT_SCHEMA.valid?(event)
|
||||||
|
|
||||||
event = event.deep_symbolize_keys
|
event = event.deep_symbolize_keys
|
||||||
@ -102,7 +96,6 @@ class Runner::Connection
|
|||||||
close(:error)
|
close(:error)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
private :forward_message
|
|
||||||
|
|
||||||
def on_open(_event)
|
def on_open(_event)
|
||||||
@start_callback.call
|
@start_callback.call
|
||||||
@ -112,7 +105,7 @@ class Runner::Connection
|
|||||||
|
|
||||||
def on_close(_event)
|
def on_close(_event)
|
||||||
Rails.logger.debug { "#{Time.zone.now.getutc}: Closing connection to #{@socket.url} with status: #{@status}" }
|
Rails.logger.debug { "#{Time.zone.now.getutc}: Closing connection to #{@socket.url} with status: #{@status}" }
|
||||||
forward_message @buffer.flush unless @buffer.empty?
|
flush_buffers
|
||||||
|
|
||||||
# Depending on the status, we might want to destroy the runner at management.
|
# Depending on the status, we might want to destroy the runner at management.
|
||||||
# This ensures we get a new runner on the next request.
|
# This ensures we get a new runner on the next request.
|
||||||
@ -149,11 +142,17 @@ class Runner::Connection
|
|||||||
end
|
end
|
||||||
|
|
||||||
def handle_stdout(event)
|
def handle_stdout(event)
|
||||||
@stdout_callback.call event[:data]
|
@stdout_buffer.store event[:data]
|
||||||
|
@stdout_buffer.events.each do |event_data|
|
||||||
|
@stdout_callback.call event_data
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_stderr(event)
|
def handle_stderr(event)
|
||||||
@stderr_callback.call event[:data]
|
@stderr_buffer.store event[:data]
|
||||||
|
@stderr_buffer.events.each do |event_data|
|
||||||
|
@stderr_callback.call event_data
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_error(_event); end
|
def handle_error(_event); end
|
||||||
@ -163,4 +162,10 @@ class Runner::Connection
|
|||||||
def handle_timeout(_event)
|
def handle_timeout(_event)
|
||||||
@status = :timeout
|
@status = :timeout
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def flush_buffers
|
||||||
|
@stdout_callback.call @stdout_buffer.flush unless @stdout_buffer.empty?
|
||||||
|
@stderr_callback.call @stderr_buffer.flush unless @stderr_buffer.empty?
|
||||||
|
end
|
||||||
|
private :flush_buffers
|
||||||
end
|
end
|
||||||
|
@ -166,7 +166,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
|||||||
end
|
end
|
||||||
|
|
||||||
def encode(data)
|
def encode(data)
|
||||||
data
|
"#{data}\n"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user