Improve error resilience and handling
Timeouts are now handled correctly and the Runner automatically creates the execution environment if it could not be found in Poseidon. The runner is deleted locally if Poseidon returns a bad request error.
This commit is contained in:

committed by
Sebastian Serth

parent
b6bc578aea
commit
413f9b2705
@ -150,6 +150,10 @@ class SubmissionsController < ApplicationController
|
||||
end
|
||||
|
||||
socket.on :exit do |exit_code|
|
||||
# As this is sometimes called before the timeout is handled, we must not close the
|
||||
# socket to the user here. The socket will be closed after handling the timeout.
|
||||
next if exit_code == Runner::Connection::TIMEOUT_EXIT_STATUS
|
||||
|
||||
EventMachine.stop_event_loop
|
||||
if @output.empty?
|
||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"})
|
||||
|
@ -39,8 +39,8 @@ class Runner < ApplicationRecord
|
||||
define_method(method) do |*args, &block|
|
||||
@strategy.send(method, *args, &block)
|
||||
rescue Runner::Error::NotFound
|
||||
update(runner_id: self.class.strategy_class.request_from_management(execution_environment))
|
||||
@strategy = self.class.strategy_class.new(runner_id, execution_environment)
|
||||
request_new_id
|
||||
save
|
||||
@strategy.send(method, *args, &block)
|
||||
end
|
||||
end
|
||||
@ -48,10 +48,22 @@ class Runner < ApplicationRecord
|
||||
private
|
||||
|
||||
def request_id
|
||||
return if runner_id.present?
|
||||
request_new_id if runner_id.blank?
|
||||
end
|
||||
|
||||
def request_new_id
|
||||
strategy_class = self.class.strategy_class
|
||||
self.runner_id = strategy_class.request_from_management(execution_environment)
|
||||
@strategy = strategy_class.new(runner_id, execution_environment)
|
||||
rescue Runner::Error::NotFound
|
||||
if strategy_class.sync_environment(execution_environment)
|
||||
raise Runner::Error::NotFound.new(
|
||||
"The execution environment with id #{execution_environment.id} was not found and was successfully synced with the runner management"
|
||||
)
|
||||
else
|
||||
raise Runner::Error::NotFound.new(
|
||||
"The execution environment with id #{execution_environment.id} was not found and could not be synced with the runner management"
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -3,7 +3,7 @@
|
||||
class CleanExposedPortsInExecutionEnvironment < ActiveRecord::Migration[6.1]
|
||||
def change
|
||||
ExecutionEnvironment.all.each do |execution_environment|
|
||||
continue if execution_environment.exposed_ports.nil?
|
||||
next if execution_environment.exposed_ports.nil?
|
||||
|
||||
cleaned = execution_environment.exposed_ports.gsub(/[[:space:]]/, '')
|
||||
list = cleaned.split(',').map(&:to_i).uniq
|
||||
|
@ -7,6 +7,7 @@ class Runner::Connection
|
||||
# These are events for which callbacks can be registered.
|
||||
EVENTS = %i[start output exit stdout stderr].freeze
|
||||
BACKEND_OUTPUT_SCHEMA = JSONSchemer.schema(JSON.parse(File.read('lib/runner/backend-output.schema.json')))
|
||||
TIMEOUT_EXIT_STATUS = -100
|
||||
|
||||
def initialize(url)
|
||||
@socket = Faye::WebSocket::Client.new(url, [], ping: 5)
|
||||
@ -20,7 +21,8 @@ class Runner::Connection
|
||||
# This registers empty default callbacks.
|
||||
EVENTS.each {|event_type| instance_variable_set(:"@#{event_type}_callback", ->(e) {}) }
|
||||
@start_callback = -> {}
|
||||
@exit_code = 0
|
||||
# Fail if no exit status was returned.
|
||||
@exit_code = 1
|
||||
end
|
||||
|
||||
def on(event, &block)
|
||||
@ -80,6 +82,7 @@ class Runner::Connection
|
||||
def handle_start(_event); end
|
||||
|
||||
def handle_timeout(_event)
|
||||
@exit_code = TIMEOUT_EXIT_STATUS
|
||||
raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
|
||||
end
|
||||
end
|
||||
|
@ -10,6 +10,10 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
||||
end
|
||||
end
|
||||
|
||||
def self.sync_environment(environment)
|
||||
environment.copy_to_poseidon
|
||||
end
|
||||
|
||||
def self.request_from_management(environment)
|
||||
url = "#{Runner::BASE_URL}/runners"
|
||||
body = {executionEnvironmentId: environment.id, inactivityTimeout: Runner::UNUSED_EXPIRATION_TIME}
|
||||
@ -21,10 +25,12 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
||||
runner_id = response_body[:runnerId]
|
||||
runner_id.presence || raise(Runner::Error::Unknown.new('Poseidon did not send a runner id'))
|
||||
when 404
|
||||
raise Runner::Error::NotFound.new('Execution environment not found')
|
||||
raise Runner::Error::EnvironmentNotFound.new
|
||||
else
|
||||
handle_error response
|
||||
end
|
||||
rescue Faraday::Error => e
|
||||
raise Runner::Error::Unknown.new("Faraday request to runner management failed: #{e.inspect}")
|
||||
end
|
||||
|
||||
def self.handle_error(response)
|
||||
@ -35,7 +41,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
||||
when 401
|
||||
raise Runner::Error::Unauthorized.new('Authentication with Poseidon failed')
|
||||
when 404
|
||||
raise Runner::Error::NotFound.new('Runner not found')
|
||||
raise Runner::Error::RunnerNotFound.new
|
||||
when 500
|
||||
response_body = parse response
|
||||
error_code = response_body[:errorCode]
|
||||
@ -60,7 +66,12 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
||||
url = "#{runner_url}/files"
|
||||
body = {copy: files.map {|filename, content| {path: filename, content: Base64.strict_encode64(content)} }}
|
||||
response = Faraday.patch(url, body.to_json, HEADERS)
|
||||
self.class.handle_error response unless response.status == 204
|
||||
return if response.status == 204
|
||||
|
||||
Runner.destroy(@runner_id) if response.status == 400
|
||||
self.class.handle_error response
|
||||
rescue Faraday::Error => e
|
||||
raise Runner::Error::Unknown.new("Faraday request to runner management failed: #{e.inspect}")
|
||||
end
|
||||
|
||||
def attach_to_execution(command)
|
||||
@ -68,7 +79,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
||||
websocket_url = execute_command(command)
|
||||
EventMachine.run do
|
||||
socket = Runner::Connection.new(websocket_url)
|
||||
yield(socket) if block_given?
|
||||
yield(socket)
|
||||
end
|
||||
Time.zone.now - starting_time # execution duration
|
||||
end
|
||||
@ -76,6 +87,8 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
||||
def destroy_at_management
|
||||
response = Faraday.delete runner_url
|
||||
self.class.handle_error response unless response.status == 204
|
||||
rescue Faraday::Error => e
|
||||
raise Runner::Error::Unknown.new("Faraday request to runner management failed: #{e.inspect}")
|
||||
end
|
||||
|
||||
private
|
||||
@ -84,17 +97,22 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
||||
url = "#{runner_url}/execute"
|
||||
body = {command: command, timeLimit: @execution_environment.permitted_execution_time}
|
||||
response = Faraday.post(url, body.to_json, HEADERS)
|
||||
if response.status == 200
|
||||
response_body = self.class.parse response
|
||||
websocket_url = response_body[:websocketUrl]
|
||||
if websocket_url.present?
|
||||
return websocket_url
|
||||
else
|
||||
raise Runner::Error::Unknown.new('Poseidon did not send websocket url')
|
||||
end
|
||||
case response.status
|
||||
when 200
|
||||
response_body = self.class.parse response
|
||||
websocket_url = response_body[:websocketUrl]
|
||||
if websocket_url.present?
|
||||
return websocket_url
|
||||
else
|
||||
raise Runner::Error::Unknown.new('Poseidon did not send websocket url')
|
||||
end
|
||||
when 400
|
||||
Runner.destroy(@runner_id)
|
||||
end
|
||||
|
||||
self.class.handle_error response
|
||||
rescue Faraday::Error => e
|
||||
raise Runner::Error::Unknown.new("Faraday request to runner management failed: #{e.inspect}")
|
||||
end
|
||||
|
||||
def runner_url
|
||||
|
@ -16,11 +16,26 @@ describe Runner::Strategy::Poseidon do
|
||||
let(:response_status) { 400 }
|
||||
|
||||
it 'raises an error' do
|
||||
allow(Runner).to receive(:destroy).with(runner_id)
|
||||
expect { action.call }.to raise_error(Runner::Error::BadRequest, /#{error_message}/)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Only #copy_files and #execute_command destroy the runner locally in case
|
||||
# of a BadRequest (400) response.
|
||||
shared_examples 'BadRequest (400) destroys local runner' do
|
||||
context 'when Poseidon returns BadRequest (400)' do
|
||||
let(:response_body) { {message: error_message}.to_json }
|
||||
let(:response_status) { 400 }
|
||||
|
||||
it 'destroys the runner locally' do
|
||||
expect(Runner).to receive(:destroy).with(runner_id)
|
||||
expect { action.call }.to raise_error(Runner::Error::BadRequest)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# All requests handle a Unauthorized (401) response the same way.
|
||||
shared_examples 'Unauthorized (401) error handling' do
|
||||
context 'when Poseidon returns Unauthorized (401)' do
|
||||
@ -193,6 +208,7 @@ describe Runner::Strategy::Poseidon do
|
||||
end
|
||||
|
||||
include_examples 'BadRequest (400) error handling'
|
||||
include_examples 'BadRequest (400) destroys local runner'
|
||||
include_examples 'Unauthorized (401) error handling'
|
||||
include_examples 'NotFound (404) error handling'
|
||||
include_examples 'InternalServerError (500) error handling'
|
||||
@ -247,6 +263,7 @@ describe Runner::Strategy::Poseidon do
|
||||
end
|
||||
|
||||
include_examples 'BadRequest (400) error handling'
|
||||
include_examples 'BadRequest (400) destroys local runner'
|
||||
include_examples 'Unauthorized (401) error handling'
|
||||
include_examples 'NotFound (404) error handling'
|
||||
include_examples 'InternalServerError (500) error handling'
|
||||
|
@ -61,28 +61,81 @@ describe Runner do
|
||||
include_examples 'delegates method sends to its strategy', :attach_to_execution, nil
|
||||
end
|
||||
|
||||
describe '#request_id' do
|
||||
it 'requests a runner from the runner management when created' do
|
||||
describe 'creation' do
|
||||
let(:user) { FactoryBot.create :external_user }
|
||||
let(:execution_environment) { FactoryBot.create :ruby }
|
||||
let(:create_action) { -> { described_class.create(user: user, execution_environment: execution_environment) } }
|
||||
|
||||
it 'requests a runner id from the runner management' do
|
||||
expect(strategy_class).to receive(:request_from_management)
|
||||
described_class.create
|
||||
create_action.call
|
||||
end
|
||||
|
||||
it 'sets the runner id when created' do
|
||||
it 'returns a valid runner' do
|
||||
allow(strategy_class).to receive(:request_from_management).and_return(runner_id)
|
||||
runner = described_class.create
|
||||
expect(runner.runner_id).to eq(runner_id)
|
||||
expect(create_action.call).to be_valid
|
||||
end
|
||||
|
||||
it 'sets the strategy when created' do
|
||||
it 'sets the strategy' do
|
||||
allow(strategy_class).to receive(:request_from_management).and_return(runner_id)
|
||||
runner = described_class.create
|
||||
expect(runner.strategy).to be_present
|
||||
strategy = strategy_class.new(runner_id, execution_environment)
|
||||
allow(strategy_class).to receive(:new).with(runner_id, execution_environment).and_return(strategy)
|
||||
runner = create_action.call
|
||||
expect(runner.strategy).to eq(strategy)
|
||||
end
|
||||
|
||||
it 'does not call the runner management again when validating the model' do
|
||||
it 'does not call the runner management again while a runner id is set' do
|
||||
expect(strategy_class).to receive(:request_from_management).and_return(runner_id).once
|
||||
runner = described_class.create
|
||||
runner.valid?
|
||||
runner = create_action.call
|
||||
runner.update(user: FactoryBot.create(:external_user))
|
||||
end
|
||||
end
|
||||
|
||||
describe '#request_new_id' do
|
||||
let(:runner) { FactoryBot.create :runner }
|
||||
|
||||
context 'when the environment is available in the runner management' do
|
||||
it 'requests the runner management' do
|
||||
expect(strategy_class).to receive(:request_from_management)
|
||||
runner.send(:request_new_id)
|
||||
end
|
||||
|
||||
it 'updates the runner id' do
|
||||
allow(strategy_class).to receive(:request_from_management).and_return(runner_id)
|
||||
runner.send(:request_new_id)
|
||||
expect(runner.runner_id).to eq(runner_id)
|
||||
end
|
||||
|
||||
it 'updates the strategy' do
|
||||
allow(strategy_class).to receive(:request_from_management).and_return(runner_id)
|
||||
strategy = strategy_class.new(runner_id, runner.execution_environment)
|
||||
allow(strategy_class).to receive(:new).with(runner_id, runner.execution_environment).and_return(strategy)
|
||||
runner.send(:request_new_id)
|
||||
expect(runner.strategy).to eq(strategy)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the environment could not be found in the runner management' do
|
||||
let(:environment_id) { runner.execution_environment.id }
|
||||
|
||||
before { allow(strategy_class).to receive(:request_from_management).and_raise(Runner::Error::NotFound) }
|
||||
|
||||
it 'syncs the execution environment' do
|
||||
expect(strategy_class).to receive(:sync_environment).with(runner.execution_environment)
|
||||
runner.send(:request_new_id)
|
||||
rescue Runner::Error::NotFound
|
||||
# Ignored because this error is expected (see tests below).
|
||||
end
|
||||
|
||||
it 'raises an error when the environment could be synced' do
|
||||
allow(strategy_class).to receive(:sync_environment).with(runner.execution_environment).and_return(true)
|
||||
expect { runner.send(:request_new_id) }.to raise_error(Runner::Error::NotFound, /#{environment_id}.*successfully synced/)
|
||||
end
|
||||
|
||||
it 'raises an error when the environment could not be synced' do
|
||||
allow(strategy_class).to receive(:sync_environment).with(runner.execution_environment).and_return(false)
|
||||
expect { runner.send(:request_new_id) }.to raise_error(Runner::Error::NotFound, /#{environment_id}.*could not be synced/)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
Reference in New Issue
Block a user