diff --git a/app/controllers/submissions_controller.rb b/app/controllers/submissions_controller.rb index 97f9cd3c..b2e0bbe9 100644 --- a/app/controllers/submissions_controller.rb +++ b/app/controllers/submissions_controller.rb @@ -122,76 +122,85 @@ class SubmissionsController < ApplicationController end end - def handle_websockets(tubesock, socket) - tubesock.send_data JSON.dump({cmd: :status, status: :container_running}) - - socket.on :stdout do |data| - json_data = JSON.dump({cmd: :write, stream: :stdout, data: data}) - @output << json_data[0, max_output_buffer_size - @output.size] - tubesock.send_data(json_data) - end - - socket.on :stderr do |data| - json_data = JSON.dump({cmd: :write, stream: :stderr, data: data}) - @output << json_data[0, max_output_buffer_size - @output.size] - tubesock.send_data(json_data) - end - - socket.on :exit do |exit_code| - EventMachine.stop_event_loop - if @output.empty? - tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"}) - end - tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"}) - kill_socket(tubesock) - end - - tubesock.onmessage do |event| - event = JSON.parse(event).deep_symbolize_keys - case event[:cmd].to_sym - when :client_kill - EventMachine.stop_event_loop - kill_socket(tubesock) - Rails.logger.debug('Client exited container.') - when :result - socket.send event[:data] - else - Rails.logger.info("Unknown command from client: #{event[:cmd]}") - end - - rescue JSON::ParserError - Rails.logger.debug { "Data received from client is not valid json: #{data}" } - Sentry.set_extras(data: data) - rescue TypeError - Rails.logger.debug { "JSON data received from client cannot be parsed to hash: #{data}" } - Sentry.set_extras(data: data) - end - end - def run - @output = +'' - hijack do |tubesock| - return kill_socket(tubesock) if @embed_options[:disable_run] + # These method-local socket variables are required in order to use one socket + # in the callbacks of the other socket. As the callbacks for the client socket + # are registered first, the runner socket may still be nil. + client_socket, runner_socket = nil - durations = @submission.run(sanitize_filename) do |socket| - handle_websockets(tubesock, socket) + hijack do |tubesock| + client_socket = tubesock + return kill_client_socket(client_socket) if @embed_options[:disable_run] + + client_socket.onclose do |_event| + runner_socket&.close(:terminated_by_client) + end + + client_socket.onmessage do |event| + event = JSON.parse(event).deep_symbolize_keys + case event[:cmd].to_sym + when :client_kill + close_client_connection(client_socket) + Rails.logger.debug('Client exited container.') + when :result + # The client cannot send something before the runner connection is established. + if runner_socket.present? + runner_socket.send event[:data] + else + Rails.logger.info("Could not forward data from client because runner connection was not established yet: #{event[:data].inspect}") + end + else + Rails.logger.info("Unknown command from client: #{event[:cmd]}") + end + rescue JSON::ParserError + Rails.logger.info("Data received from client is not valid json: #{data.inspect}") + Sentry.set_extras(data: data) + rescue TypeError + Rails.logger.info("JSON data received from client cannot be parsed as hash: #{data.inspect}") + Sentry.set_extras(data: data) end - @container_execution_time = durations[:execution_duration] - @waiting_for_container_time = durations[:waiting_duration] - rescue Runner::Error::ExecutionTimeout => e - tubesock.send_data JSON.dump({cmd: :status, status: :timeout}) - kill_socket(tubesock) - Rails.logger.debug { "Running a submission timed out: #{e.message}" } - @output = "timeout: #{@output}" - extract_durations(e) - rescue Runner::Error => e - tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted}) - kill_socket(tubesock) - Rails.logger.debug { "Runner error while running a submission: #{e.message}" } - extract_durations(e) - ensure - save_run_output end + + @output = +'' + durations = @submission.run(sanitize_filename) do |socket| + runner_socket = socket + client_socket.send_data JSON.dump({cmd: :status, status: :container_running}) + + runner_socket.on :stdout do |data| + json_data = JSON.dump({cmd: :write, stream: :stdout, data: data}) + @output << json_data[0, max_output_buffer_size - @output.size] + client_socket.send_data(json_data) + end + + runner_socket.on :stderr do |data| + json_data = JSON.dump({cmd: :write, stream: :stderr, data: data}) + @output << json_data[0, max_output_buffer_size - @output.size] + client_socket.send_data(json_data) + end + + runner_socket.on :exit do |exit_code| + if @output.empty? + client_socket.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"}) + end + client_socket.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"}) + close_client_connection(client_socket) + end + end + @container_execution_time = durations[:execution_duration] + @waiting_for_container_time = durations[:waiting_duration] + rescue Runner::Error::ExecutionTimeout => e + client_socket.send_data JSON.dump({cmd: :status, status: :timeout}) + close_client_connection(client_socket) + Rails.logger.debug { "Running a submission timed out: #{e.message}" } + @output = "timeout: #{@output}" + extract_durations(e) + rescue Runner::Error => e + client_socket.send_data JSON.dump({cmd: :status, status: :container_depleted}) + close_client_connection(client_socket) + Rails.logger.debug { "Runner error while running a submission: #{e.message}" } + extract_durations(e) + ensure + save_run_output end def extract_durations(error) @@ -200,14 +209,16 @@ class SubmissionsController < ApplicationController end private :extract_durations - def kill_socket(tubesock) + def close_client_connection(client_socket) # search for errors and save them as StructuredError (for scoring runs see submission.rb) errors = extract_errors - send_hints(tubesock, errors) + send_hints(client_socket, errors) + kill_client_socket(client_socket) + end - # Hijacked connection needs to be notified correctly - tubesock.send_data JSON.dump({cmd: :exit}) - tubesock.close + def kill_client_socket(client_socket) + client_socket.send_data JSON.dump({cmd: :exit}) + client_socket.close end # save the output of this "run" as a "testrun" (scoring runs are saved in submission.rb) @@ -235,7 +246,7 @@ class SubmissionsController < ApplicationController def score hijack do |tubesock| - return kill_socket(tubesock) if @embed_options[:disable_run] + return if @embed_options[:disable_run] tubesock.send_data(JSON.dump(@submission.calculate_score)) # To enable hints when scoring a submission, uncomment the next line: @@ -244,8 +255,7 @@ class SubmissionsController < ApplicationController tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted}) Rails.logger.debug { "Runner error while scoring submission #{@submission.id}: #{e.message}" } ensure - tubesock.send_data JSON.dump({cmd: :exit}) - tubesock.close + kill_client_socket(tubesock) end end diff --git a/app/models/runner.rb b/app/models/runner.rb index 4e93d065..37cad548 100644 --- a/app/models/runner.rb +++ b/app/models/runner.rb @@ -41,9 +41,17 @@ class Runner < ApplicationRecord end def attach_to_execution(command, &block) + ensure_event_machine starting_time = Time.zone.now begin - @strategy.attach_to_execution(command, &block) + # As the EventMachine reactor is probably shared with other threads, we cannot use EventMachine.run with + # stop_event_loop to wait for the WebSocket connection to terminate. Instead we use a self built event + # loop for that: Runner::EventLoop. The attach_to_execution method of the strategy is responsible for + # initializing its Runner::Connection with the given event loop. The Runner::Connection class ensures that + # this event loop is stopped after the socket was closed. + event_loop = Runner::EventLoop.new + @strategy.attach_to_execution(command, event_loop, &block) + event_loop.wait rescue Runner::Error => e e.execution_duration = Time.zone.now - starting_time raise @@ -57,6 +65,22 @@ class Runner < ApplicationRecord private + # If there are multiple threads trying to connect to the WebSocket of their execution at the same time, + # the Faye WebSocket connections will use the same reactor. We therefore only need to start an EventMachine + # if there isn't a running reactor yet. + # See this StackOverflow answer: https://stackoverflow.com/a/8247947 + def ensure_event_machine + unless EventMachine.reactor_running? && EventMachine.reactor_thread.alive? + event_loop = Runner::EventLoop.new + Thread.new do + EventMachine.run { event_loop.stop } + ensure + ActiveRecord::Base.connection_pool.release_connection + end + event_loop.wait + end + end + def request_id request_new_id if runner_id.blank? end diff --git a/app/models/submission.rb b/app/models/submission.rb index 532a0c0c..b9442084 100644 --- a/app/models/submission.rb +++ b/app/models/submission.rb @@ -158,7 +158,6 @@ class Submission < ApplicationRecord end socket.on :exit do |received_exit_code| exit_code = received_exit_code - EventMachine.stop_event_loop end end output.merge!(container_execution_time: execution_time, status: exit_code.zero? ? :ok : :failed) diff --git a/lib/runner/connection.rb b/lib/runner/connection.rb index bc5d55e8..d49c6eb8 100644 --- a/lib/runner/connection.rb +++ b/lib/runner/connection.rb @@ -11,12 +11,13 @@ class Runner::Connection attr_writer :status - def initialize(url, strategy) + def initialize(url, strategy, event_loop) @socket = Faye::WebSocket::Client.new(url, [], ping: 5) @strategy = strategy @status = :established + @event_loop = event_loop - # For every event type of faye websockets, the corresponding + # For every event type of Faye WebSockets, the corresponding # RunnerConnection method starting with `on_` is called. %i[open message error close].each do |event_type| @socket.on(event_type) {|event| __send__(:"on_#{event_type}", event) } @@ -41,6 +42,17 @@ class Runner::Connection @socket.send(encoded_message) end + def close(status) + return unless active? + + @status = status + @socket.close + end + + def active? + @status == :established + end + private def decode(_raw_event) @@ -61,7 +73,7 @@ class Runner::Connection if WEBSOCKET_MESSAGE_TYPES.include?(message_type) __send__("handle_#{message_type}", event) else - raise Runner::Error::UnexpectedResponse.new("Unknown websocket message type: #{message_type}") + raise Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}") end end @@ -78,6 +90,9 @@ class Runner::Connection raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit') when :terminated_by_codeocean, :terminated_by_management @exit_callback.call @exit_code + @event_loop.stop + when :terminated_by_client + @event_loop.stop else # :established # If the runner is killed by the DockerContainerPool after the maximum allowed time per user and # while the owning user is running an execution, the command execution stops and log output is incomplete. diff --git a/lib/runner/event_loop.rb b/lib/runner/event_loop.rb new file mode 100644 index 00000000..ff30cc77 --- /dev/null +++ b/lib/runner/event_loop.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +# EventLoop is an abstraction around Ruby's queue so that its usage is better +# understandable in our context. +class Runner::EventLoop + def initialize + @queue = Queue.new + end + + # wait waits until another thread calls stop on this EventLoop. + # There may only be one active wait call per loop at a time, otherwise it is not + # deterministic which one will be unblocked if stop is called. + def wait + @queue.pop + end + + # stop unblocks the currently active wait call. If there is none, the + # next call to wait will not be blocking. + def stop + @queue.push nil if @queue.empty? + end +end diff --git a/lib/runner/strategy/docker_container_pool.rb b/lib/runner/strategy/docker_container_pool.rb index adc0089f..3da0dc75 100644 --- a/lib/runner/strategy/docker_container_pool.rb +++ b/lib/runner/strategy/docker_container_pool.rb @@ -51,19 +51,22 @@ class Runner::Strategy::DockerContainerPool < Runner::Strategy raise Runner::Error::FaradayError.new("Request to DockerContainerPool failed: #{e.inspect}") end - def attach_to_execution(command) + def attach_to_execution(command, event_loop) @command = command query_params = 'logs=0&stream=1&stderr=1&stdout=1&stdin=1' websocket_url = "#{self.class.config[:ws_host]}/v1.27/containers/#{@container_id}/attach/ws?#{query_params}" - EventMachine.run do - socket = Connection.new(websocket_url, self) - EventMachine.add_timer(@execution_environment.permitted_execution_time) do - socket.status = :timeout - destroy_at_management + socket = Connection.new(websocket_url, self, event_loop) + begin + Timeout.timeout(@execution_environment.permitted_execution_time) do + socket.send(command) + yield(socket) + event_loop.wait + event_loop.stop end - socket.send(command) - yield(socket) + rescue Timeout::Error + socket.close(:timeout) + destroy_at_management end end @@ -118,8 +121,7 @@ class Runner::Strategy::DockerContainerPool < Runner::Strategy # Assume correct termination for now and return exit code 0 # TODO: Can we use the actual exit code here? @exit_code = 0 - @status = :terminated_by_codeocean - @socket.close + close(:terminated_by_codeocean) when /#{format(@strategy.execution_environment.test_command, class_name: '.*', filename: '.*', module_name: '.*')}/ # TODO: Super dirty hack to redirect test output to stderr (remove attr_reader afterwards) @stream = 'stderr' diff --git a/lib/runner/strategy/poseidon.rb b/lib/runner/strategy/poseidon.rb index 54d51aab..488d4914 100644 --- a/lib/runner/strategy/poseidon.rb +++ b/lib/runner/strategy/poseidon.rb @@ -85,12 +85,10 @@ class Runner::Strategy::Poseidon < Runner::Strategy raise Runner::Error::FaradayError.new("Request to Poseidon failed: #{e.inspect}") end - def attach_to_execution(command) + def attach_to_execution(command, event_loop) websocket_url = execute_command(command) - EventMachine.run do - socket = Connection.new(websocket_url, self) - yield(socket) - end + socket = Connection.new(websocket_url, self, event_loop) + yield(socket) end def destroy_at_management @@ -113,7 +111,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy if websocket_url.present? return websocket_url else - raise Runner::Error::UnexpectedResponse.new('Poseidon did not send websocket url') + raise Runner::Error::UnexpectedResponse.new('Poseidon did not send a WebSocket URL') end when 400 Runner.destroy(@allocation_id) @@ -132,7 +130,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy def decode(raw_event) JSON.parse(raw_event.data) rescue JSON::ParserError => e - raise Runner::Error::UnexpectedResponse.new("The websocket message from Poseidon could not be decoded to JSON: #{e.inspect}") + raise Runner::Error::UnexpectedResponse.new("The WebSocket message from Poseidon could not be decoded to JSON: #{e.inspect}") end def encode(data)