From a074a5cb0dd4d1ba997a107407d338547cd8743d Mon Sep 17 00:00:00 2001 From: Sebastian Serth Date: Mon, 11 Oct 2021 09:47:17 +0200 Subject: [PATCH] Add buffering to output received from runner --- app/errors/runner/connection/buffer/error.rb | 7 ++ lib/runner/connection.rb | 36 ++++---- lib/runner/connection/buffer.rb | 96 ++++++++++++++++++++ lib/runner/strategy/docker_container_pool.rb | 3 + 4 files changed, 126 insertions(+), 16 deletions(-) create mode 100644 app/errors/runner/connection/buffer/error.rb create mode 100644 lib/runner/connection/buffer.rb diff --git a/app/errors/runner/connection/buffer/error.rb b/app/errors/runner/connection/buffer/error.rb new file mode 100644 index 00000000..106d95fb --- /dev/null +++ b/app/errors/runner/connection/buffer/error.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +class Runner::Connection::Buffer + class Error < ApplicationError + class NotEmpty < Error; end + end +end diff --git a/lib/runner/connection.rb b/lib/runner/connection.rb index c1dd99f5..addd22fd 100644 --- a/lib/runner/connection.rb +++ b/lib/runner/connection.rb @@ -18,6 +18,7 @@ class Runner::Connection @status = :established @event_loop = event_loop @locale = locale + @buffer = Buffer.new # For every event type of Faye WebSockets, the corresponding # RunnerConnection method starting with `on_` is called. @@ -78,25 +79,27 @@ class Runner::Connection def on_message(raw_event) Rails.logger.debug { "#{Time.zone.now.getutc}: Receiving from #{@socket.url}: #{raw_event.data.inspect}" } - # The WebSocket connection might group multiple lines. For further processing, we require all lines - # to be processed separately. Therefore, we split the lines by each newline character not part of an enclosed - # substring either in single or double quotes (e.g., within a JSON) - # Inspired by https://stackoverflow.com/questions/13040585/split-string-by-spaces-properly-accounting-for-quotes-and-backslashes-ruby - raw_event.data.scan(/(?:"(?:\\.|[^"])*"|'(?:\\.|[^'])*'|[^"\n])+/).each do |event_data| - event = decode(event_data) - next unless BACKEND_OUTPUT_SCHEMA.valid?(event) - - event = event.deep_symbolize_keys - message_type = event[:type].to_sym - if WEBSOCKET_MESSAGE_TYPES.include?(message_type) - __send__("handle_#{message_type}", event) - else - @error = Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}") - close(:error) - end + @buffer.store raw_event.data + @buffer.events.each do |event_data| + forward_message event_data end end + def forward_message(event_data) + event = decode(event_data) + return unless BACKEND_OUTPUT_SCHEMA.valid?(event) + + event = event.deep_symbolize_keys + message_type = event[:type].to_sym + if WEBSOCKET_MESSAGE_TYPES.include?(message_type) + __send__("handle_#{message_type}", event) + else + @error = Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}") + close(:error) + end + end + private :forward_message + def on_open(_event) @start_callback.call end @@ -105,6 +108,7 @@ class Runner::Connection def on_close(_event) Rails.logger.debug { "#{Time.zone.now.getutc}: Closing connection to #{@socket.url} with status: #{@status}" } + forward_message @buffer.flush case @status when :timeout @error = Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit') diff --git a/lib/runner/connection/buffer.rb b/lib/runner/connection/buffer.rb new file mode 100644 index 00000000..82dc1006 --- /dev/null +++ b/lib/runner/connection/buffer.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +class Runner::Connection::Buffer + # The WebSocket connection might group multiple lines. For further processing, we require all lines + # to be processed separately. Therefore, we split the lines by each newline character not part of an enclosed + # substring either in single or double quotes (e.g., within a JSON) + # Inspired by https://stackoverflow.com/questions/13040585/split-string-by-spaces-properly-accounting-for-quotes-and-backslashes-ruby + SPLIT_INDIVIDUAL_LINES = Regexp.compile(/(?:"(?:\\.|[^"])*"|'(?:\\.|[^'])*'|[^\n])+/) + + def initialize + @global_buffer = +'' + @buffering = false + @line_buffer = Queue.new + super + end + + def store(event_data) + # First, we append the new data to the existing `@global_buffer`. + # Either, the `@global_buffer` is empty and this is a NO OP. + # Or, the `@global_buffer` contains an incomplete string and thus requires the new part. + @global_buffer += event_data + # We process the full `@global_buffer`. Valid parts are removed from the buffer and + # the remaining invalid parts are still stored in `@global_buffer`. + @global_buffer = process_and_split @global_buffer + end + + def events + # Return all items from `@line_buffer` in an array (which is iterable) and clear the queue + Array.new(@line_buffer.size) { @line_buffer.pop } + end + + def flush + raise Error::NotEmpty unless @line_buffer.empty? + + remaining_buffer = @global_buffer + @buffering = false + @global_buffer = +'' + remaining_buffer + end + + private + + def process_and_split(message_parts, stop: false) + # We need a temporary buffer to operate on + buffer = +'' + message_parts.scan(SPLIT_INDIVIDUAL_LINES).each do |line| + # Same argumentation as above: We can always append (previous empty or invalid) + buffer += line + + if buffering_required_for? buffer + @buffering = true + # Check the existing substring `buffer` if it contains a valid message. + # The remaining buffer is stored for further processing. + buffer = process_and_split buffer, stop: true unless stop + else + add_to_line_buffer(buffer) + # Clear the current buffer. + buffer = +'' + end + end + # Return the remaining buffer which might become the `@global_buffer` + buffer + end + + def add_to_line_buffer(message) + @buffering = false + @global_buffer = +'' + @line_buffer.push message + end + + def buffering_required_for?(message) + # First, check if the message is very short and start with { + return true if message.size <= 5 && message.start_with?(/\s*{/) + + invalid_json = !valid_json?(message) + # Second, if we have the beginning of a valid command but an invalid JSON + return true if invalid_json && message.start_with?(/\s*{"cmd/) + # Third, global_buffer the message if it contains long messages (e.g., an image or turtle batch commands) + return true if invalid_json && (message.include?('