From 9e2cff7558d3d69517801b8bfe1e9e2d5cfe093e Mon Sep 17 00:00:00 2001 From: Felix Auringer <48409110+felixauringer@users.noreply.github.com> Date: Thu, 22 Jul 2021 10:09:24 +0200 Subject: [PATCH] Attach connection errors to socket Raising the errors would crash the current thread. As this thread contains the Eventmachine, that would influence other connections as well. Attaching the errors to the connection and reading them after the connection was closed ensures that the thread stays alive while handling the errors in the main thread of the current request. --- app/models/runner.rb | 3 ++- lib/runner/connection.rb | 13 ++++----- lib/runner/strategy/docker_container_pool.rb | 1 + lib/runner/strategy/poseidon.rb | 4 ++- spec/models/runner_spec.rb | 28 ++++++++++++++++---- 5 files changed, 36 insertions(+), 13 deletions(-) diff --git a/app/models/runner.rb b/app/models/runner.rb index 37cad548..30fa48a1 100644 --- a/app/models/runner.rb +++ b/app/models/runner.rb @@ -50,8 +50,9 @@ class Runner < ApplicationRecord # initializing its Runner::Connection with the given event loop. The Runner::Connection class ensures that # this event loop is stopped after the socket was closed. event_loop = Runner::EventLoop.new - @strategy.attach_to_execution(command, event_loop, &block) + socket = @strategy.attach_to_execution(command, event_loop, &block) event_loop.wait + raise socket.error if socket.error.present? rescue Runner::Error => e e.execution_duration = Time.zone.now - starting_time raise diff --git a/lib/runner/connection.rb b/lib/runner/connection.rb index d49c6eb8..d2cb4a21 100644 --- a/lib/runner/connection.rb +++ b/lib/runner/connection.rb @@ -10,6 +10,7 @@ class Runner::Connection BACKEND_OUTPUT_SCHEMA = JSONSchemer.schema(JSON.parse(File.read('lib/runner/backend-output.schema.json'))) attr_writer :status + attr_reader :error def initialize(url, strategy, event_loop) @socket = Faye::WebSocket::Client.new(url, [], ping: 5) @@ -73,7 +74,8 @@ class Runner::Connection if WEBSOCKET_MESSAGE_TYPES.include?(message_type) __send__("handle_#{message_type}", event) else - raise Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}") + @error = Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}") + close(:error) end end @@ -87,17 +89,16 @@ class Runner::Connection Rails.logger.debug { "#{Time.zone.now.getutc}: Closing connection to #{@socket.url} with status: #{@status}" } case @status when :timeout - raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit') + @error = Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit') when :terminated_by_codeocean, :terminated_by_management @exit_callback.call @exit_code - @event_loop.stop - when :terminated_by_client - @event_loop.stop + when :terminated_by_client, :error else # :established # If the runner is killed by the DockerContainerPool after the maximum allowed time per user and # while the owning user is running an execution, the command execution stops and log output is incomplete. - raise Runner::Error::Unknown.new('Execution terminated with an unknown reason') + @error = Runner::Error::Unknown.new('Execution terminated with an unknown reason') end + @event_loop.stop end def handle_exit(event) diff --git a/lib/runner/strategy/docker_container_pool.rb b/lib/runner/strategy/docker_container_pool.rb index 3da0dc75..afa4261d 100644 --- a/lib/runner/strategy/docker_container_pool.rb +++ b/lib/runner/strategy/docker_container_pool.rb @@ -68,6 +68,7 @@ class Runner::Strategy::DockerContainerPool < Runner::Strategy socket.close(:timeout) destroy_at_management end + socket end private diff --git a/lib/runner/strategy/poseidon.rb b/lib/runner/strategy/poseidon.rb index 488d4914..6a87d92b 100644 --- a/lib/runner/strategy/poseidon.rb +++ b/lib/runner/strategy/poseidon.rb @@ -89,6 +89,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy websocket_url = execute_command(command) socket = Connection.new(websocket_url, self, event_loop) yield(socket) + socket end def destroy_at_management @@ -130,7 +131,8 @@ class Runner::Strategy::Poseidon < Runner::Strategy def decode(raw_event) JSON.parse(raw_event.data) rescue JSON::ParserError => e - raise Runner::Error::UnexpectedResponse.new("The WebSocket message from Poseidon could not be decoded to JSON: #{e.inspect}") + @error = Runner::Error::UnexpectedResponse.new("The WebSocket message from Poseidon could not be decoded to JSON: #{e.inspect}") + close(:error) end def encode(data) diff --git a/spec/models/runner_spec.rb b/spec/models/runner_spec.rb index 4bea42e0..6105f819 100644 --- a/spec/models/runner_spec.rb +++ b/spec/models/runner_spec.rb @@ -63,12 +63,15 @@ describe Runner do let(:runner) { described_class.create } let(:command) { 'ls' } let(:event_loop) { instance_double(Runner::EventLoop) } + let(:connection) { instance_double(Runner::Connection) } before do allow(strategy_class).to receive(:request_from_management).and_return(runner_id) allow(strategy_class).to receive(:new).and_return(strategy) allow(event_loop).to receive(:wait) + allow(connection).to receive(:error).and_return(nil) allow(Runner::EventLoop).to receive(:new).and_return(event_loop) + allow(strategy).to receive(:attach_to_execution).and_return(connection) end it 'delegates to its strategy' do @@ -77,21 +80,36 @@ describe Runner do end it 'returns the execution time' do - allow(strategy).to receive(:attach_to_execution) starting_time = Time.zone.now execution_time = runner.attach_to_execution(command) test_time = Time.zone.now - starting_time expect(execution_time).to be_between(0.0, test_time).exclusive end - context 'when a runner error is raised' do - before { allow(strategy).to receive(:attach_to_execution).and_raise(Runner::Error) } + it 'blocks until the event loop is stopped' do + allow(event_loop).to receive(:wait) { sleep(1) } + execution_time = runner.attach_to_execution(command) + expect(execution_time).to be > 1 + end + + context 'when an error is returned' do + let(:error_message) { 'timeout' } + let(:error) { Runner::Error::ExecutionTimeout.new(error_message) } + + before { allow(connection).to receive(:error).and_return(error) } + + it 'raises the error' do + expect { runner.attach_to_execution(command) }.to raise_error do |raised_error| + expect(raised_error).to be_a(Runner::Error::ExecutionTimeout) + expect(raised_error.message).to eq(error_message) + end + end it 'attaches the execution time to the error' do starting_time = Time.zone.now - expect { runner.attach_to_execution(command) }.to raise_error do |error| + expect { runner.attach_to_execution(command) }.to raise_error do |raised_error| test_time = Time.zone.now - starting_time - expect(error.execution_duration).to be_between(0.0, test_time).exclusive + expect(raised_error.execution_duration).to be_between(0.0, test_time).exclusive end end end