Add buffering to output received from runner
This commit is contained in:
7
app/errors/runner/connection/buffer/error.rb
Normal file
7
app/errors/runner/connection/buffer/error.rb
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Connection::Buffer
|
||||||
|
class Error < ApplicationError
|
||||||
|
class NotEmpty < Error; end
|
||||||
|
end
|
||||||
|
end
|
@ -18,6 +18,7 @@ class Runner::Connection
|
|||||||
@status = :established
|
@status = :established
|
||||||
@event_loop = event_loop
|
@event_loop = event_loop
|
||||||
@locale = locale
|
@locale = locale
|
||||||
|
@buffer = Buffer.new
|
||||||
|
|
||||||
# For every event type of Faye WebSockets, the corresponding
|
# For every event type of Faye WebSockets, the corresponding
|
||||||
# RunnerConnection method starting with `on_` is called.
|
# RunnerConnection method starting with `on_` is called.
|
||||||
@ -78,25 +79,27 @@ class Runner::Connection
|
|||||||
|
|
||||||
def on_message(raw_event)
|
def on_message(raw_event)
|
||||||
Rails.logger.debug { "#{Time.zone.now.getutc}: Receiving from #{@socket.url}: #{raw_event.data.inspect}" }
|
Rails.logger.debug { "#{Time.zone.now.getutc}: Receiving from #{@socket.url}: #{raw_event.data.inspect}" }
|
||||||
# The WebSocket connection might group multiple lines. For further processing, we require all lines
|
@buffer.store raw_event.data
|
||||||
# to be processed separately. Therefore, we split the lines by each newline character not part of an enclosed
|
@buffer.events.each do |event_data|
|
||||||
# substring either in single or double quotes (e.g., within a JSON)
|
forward_message event_data
|
||||||
# Inspired by https://stackoverflow.com/questions/13040585/split-string-by-spaces-properly-accounting-for-quotes-and-backslashes-ruby
|
|
||||||
raw_event.data.scan(/(?:"(?:\\.|[^"])*"|'(?:\\.|[^'])*'|[^"\n])+/).each do |event_data|
|
|
||||||
event = decode(event_data)
|
|
||||||
next unless BACKEND_OUTPUT_SCHEMA.valid?(event)
|
|
||||||
|
|
||||||
event = event.deep_symbolize_keys
|
|
||||||
message_type = event[:type].to_sym
|
|
||||||
if WEBSOCKET_MESSAGE_TYPES.include?(message_type)
|
|
||||||
__send__("handle_#{message_type}", event)
|
|
||||||
else
|
|
||||||
@error = Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}")
|
|
||||||
close(:error)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def forward_message(event_data)
|
||||||
|
event = decode(event_data)
|
||||||
|
return unless BACKEND_OUTPUT_SCHEMA.valid?(event)
|
||||||
|
|
||||||
|
event = event.deep_symbolize_keys
|
||||||
|
message_type = event[:type].to_sym
|
||||||
|
if WEBSOCKET_MESSAGE_TYPES.include?(message_type)
|
||||||
|
__send__("handle_#{message_type}", event)
|
||||||
|
else
|
||||||
|
@error = Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}")
|
||||||
|
close(:error)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
private :forward_message
|
||||||
|
|
||||||
def on_open(_event)
|
def on_open(_event)
|
||||||
@start_callback.call
|
@start_callback.call
|
||||||
end
|
end
|
||||||
@ -105,6 +108,7 @@ class Runner::Connection
|
|||||||
|
|
||||||
def on_close(_event)
|
def on_close(_event)
|
||||||
Rails.logger.debug { "#{Time.zone.now.getutc}: Closing connection to #{@socket.url} with status: #{@status}" }
|
Rails.logger.debug { "#{Time.zone.now.getutc}: Closing connection to #{@socket.url} with status: #{@status}" }
|
||||||
|
forward_message @buffer.flush
|
||||||
case @status
|
case @status
|
||||||
when :timeout
|
when :timeout
|
||||||
@error = Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
|
@error = Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
|
||||||
|
96
lib/runner/connection/buffer.rb
Normal file
96
lib/runner/connection/buffer.rb
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Connection::Buffer
|
||||||
|
# The WebSocket connection might group multiple lines. For further processing, we require all lines
|
||||||
|
# to be processed separately. Therefore, we split the lines by each newline character not part of an enclosed
|
||||||
|
# substring either in single or double quotes (e.g., within a JSON)
|
||||||
|
# Inspired by https://stackoverflow.com/questions/13040585/split-string-by-spaces-properly-accounting-for-quotes-and-backslashes-ruby
|
||||||
|
SPLIT_INDIVIDUAL_LINES = Regexp.compile(/(?:"(?:\\.|[^"])*"|'(?:\\.|[^'])*'|[^\n])+/)
|
||||||
|
|
||||||
|
def initialize
|
||||||
|
@global_buffer = +''
|
||||||
|
@buffering = false
|
||||||
|
@line_buffer = Queue.new
|
||||||
|
super
|
||||||
|
end
|
||||||
|
|
||||||
|
def store(event_data)
|
||||||
|
# First, we append the new data to the existing `@global_buffer`.
|
||||||
|
# Either, the `@global_buffer` is empty and this is a NO OP.
|
||||||
|
# Or, the `@global_buffer` contains an incomplete string and thus requires the new part.
|
||||||
|
@global_buffer += event_data
|
||||||
|
# We process the full `@global_buffer`. Valid parts are removed from the buffer and
|
||||||
|
# the remaining invalid parts are still stored in `@global_buffer`.
|
||||||
|
@global_buffer = process_and_split @global_buffer
|
||||||
|
end
|
||||||
|
|
||||||
|
def events
|
||||||
|
# Return all items from `@line_buffer` in an array (which is iterable) and clear the queue
|
||||||
|
Array.new(@line_buffer.size) { @line_buffer.pop }
|
||||||
|
end
|
||||||
|
|
||||||
|
def flush
|
||||||
|
raise Error::NotEmpty unless @line_buffer.empty?
|
||||||
|
|
||||||
|
remaining_buffer = @global_buffer
|
||||||
|
@buffering = false
|
||||||
|
@global_buffer = +''
|
||||||
|
remaining_buffer
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def process_and_split(message_parts, stop: false)
|
||||||
|
# We need a temporary buffer to operate on
|
||||||
|
buffer = +''
|
||||||
|
message_parts.scan(SPLIT_INDIVIDUAL_LINES).each do |line|
|
||||||
|
# Same argumentation as above: We can always append (previous empty or invalid)
|
||||||
|
buffer += line
|
||||||
|
|
||||||
|
if buffering_required_for? buffer
|
||||||
|
@buffering = true
|
||||||
|
# Check the existing substring `buffer` if it contains a valid message.
|
||||||
|
# The remaining buffer is stored for further processing.
|
||||||
|
buffer = process_and_split buffer, stop: true unless stop
|
||||||
|
else
|
||||||
|
add_to_line_buffer(buffer)
|
||||||
|
# Clear the current buffer.
|
||||||
|
buffer = +''
|
||||||
|
end
|
||||||
|
end
|
||||||
|
# Return the remaining buffer which might become the `@global_buffer`
|
||||||
|
buffer
|
||||||
|
end
|
||||||
|
|
||||||
|
def add_to_line_buffer(message)
|
||||||
|
@buffering = false
|
||||||
|
@global_buffer = +''
|
||||||
|
@line_buffer.push message
|
||||||
|
end
|
||||||
|
|
||||||
|
def buffering_required_for?(message)
|
||||||
|
# First, check if the message is very short and start with {
|
||||||
|
return true if message.size <= 5 && message.start_with?(/\s*{/)
|
||||||
|
|
||||||
|
invalid_json = !valid_json?(message)
|
||||||
|
# Second, if we have the beginning of a valid command but an invalid JSON
|
||||||
|
return true if invalid_json && message.start_with?(/\s*{"cmd/)
|
||||||
|
# Third, global_buffer the message if it contains long messages (e.g., an image or turtle batch commands)
|
||||||
|
return true if invalid_json && (message.include?('<img') || message.include?('"turtlebatch"'))
|
||||||
|
|
||||||
|
# If nothing applies, we don't want to global_buffer the current message
|
||||||
|
false
|
||||||
|
end
|
||||||
|
|
||||||
|
def currently_buffering?
|
||||||
|
@buffering
|
||||||
|
end
|
||||||
|
|
||||||
|
def valid_json?(data)
|
||||||
|
# Try parsing the JSON. If that is successful, we have a valid JSON (otherwise not)
|
||||||
|
JSON.parse(data)
|
||||||
|
true
|
||||||
|
rescue JSON::ParserError
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end
|
@ -124,6 +124,9 @@ class Runner::Strategy::DockerContainerPool < Runner::Strategy
|
|||||||
def decode(event_data)
|
def decode(event_data)
|
||||||
case event_data
|
case event_data
|
||||||
when /(@#{@strategy.container_id[0..11]}|#exit)/
|
when /(@#{@strategy.container_id[0..11]}|#exit)/
|
||||||
|
# TODO: The whole message line is kept back. If this contains the remaining buffer, this buffer is also lost.
|
||||||
|
# Example: A Java program prints `{` and then exists (with `#exit`). The `event_data` processed here is `{#exit`
|
||||||
|
|
||||||
# Assume correct termination for now and return exit code 0
|
# Assume correct termination for now and return exit code 0
|
||||||
# TODO: Can we use the actual exit code here?
|
# TODO: Can we use the actual exit code here?
|
||||||
@exit_code = 0
|
@exit_code = 0
|
||||||
|
Reference in New Issue
Block a user