Introduce new error types for runners
The errors are raised in the runner model and in the runner connection class. In the submission controller the errors are rescued and, depending on the error, the status timeout / container depleted is sent to the client.
This commit is contained in:

committed by
Sebastian Serth

parent
5e913c8a1a
commit
d5b274c9f2
@ -133,7 +133,7 @@ class SubmissionsController < ApplicationController
|
|||||||
end
|
end
|
||||||
|
|
||||||
def handle_websockets(tubesock, runner, socket)
|
def handle_websockets(tubesock, runner, socket)
|
||||||
tubesock.send_data JSON.dump({'cmd' => 'status', 'status' => :container_running})
|
tubesock.send_data JSON.dump({cmd: :status, status: :container_running})
|
||||||
@output = +''
|
@output = +''
|
||||||
|
|
||||||
socket.on :output do |data|
|
socket.on :output do |data|
|
||||||
@ -182,21 +182,21 @@ class SubmissionsController < ApplicationController
|
|||||||
|
|
||||||
def run
|
def run
|
||||||
hijack do |tubesock|
|
hijack do |tubesock|
|
||||||
if @embed_options[:disable_run]
|
return kill_socket(tubesock) if @embed_options[:disable_run]
|
||||||
kill_socket(tubesock)
|
|
||||||
else
|
@container_execution_time = @submission.run(sanitize_filename) do |runner, socket|
|
||||||
begin
|
@waiting_for_container_time = runner.waiting_time
|
||||||
@container_execution_time = @submission.run(sanitize_filename) do |runner, socket|
|
handle_websockets(tubesock, runner, socket)
|
||||||
@waiting_for_container_time = runner.waiting_time
|
|
||||||
handle_websockets(tubesock, runner, socket)
|
|
||||||
end
|
|
||||||
save_run_output
|
|
||||||
rescue RunnerNotAvailableError
|
|
||||||
tubesock.send_data JSON.dump({cmd: :timeout})
|
|
||||||
kill_socket(tubesock)
|
|
||||||
Rails.logger.debug('Runner not available')
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
save_run_output
|
||||||
|
rescue Runner::Error::ExecutionTimeout => e
|
||||||
|
tubesock.send_data JSON.dump({cmd: :status, status: :timeout})
|
||||||
|
kill_socket(tubesock)
|
||||||
|
Rails.logger.debug { "Running a submission failed: #{e.message}" }
|
||||||
|
rescue Runner::Error => e
|
||||||
|
tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted})
|
||||||
|
kill_socket(tubesock)
|
||||||
|
Rails.logger.debug { "Runner error while running a submission: #{e.message}" }
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -244,8 +244,15 @@ class SubmissionsController < ApplicationController
|
|||||||
tubesock.send_data(@submission.calculate_score)
|
tubesock.send_data(@submission.calculate_score)
|
||||||
# To enable hints when scoring a submission, uncomment the next line:
|
# To enable hints when scoring a submission, uncomment the next line:
|
||||||
# send_hints(tubesock, StructuredError.where(submission: @submission))
|
# send_hints(tubesock, StructuredError.where(submission: @submission))
|
||||||
|
rescue Runner::Error::ExecutionTimeout => e
|
||||||
tubesock.send_data JSON.dump({'cmd' => 'exit'})
|
tubesock.send_data JSON.dump({cmd: :status, status: :timeout})
|
||||||
|
Rails.logger.debug { "Running a submission failed: #{e.message}" }
|
||||||
|
rescue Runner::Error => e
|
||||||
|
tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted})
|
||||||
|
Rails.logger.debug { "Runner error while scoring a submission: #{e.message}" }
|
||||||
|
ensure
|
||||||
|
tubesock.send_data JSON.dump({cmd: :exit})
|
||||||
|
tubesock.close
|
||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
ActiveRecord::Base.connection_pool.release_connection
|
ActiveRecord::Base.connection_pool.release_connection
|
||||||
|
3
app/errors/runner/error.rb
Normal file
3
app/errors/runner/error.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Error < ApplicationError; end
|
3
app/errors/runner/error/bad_request.rb
Normal file
3
app/errors/runner/error/bad_request.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Error::BadRequest < Runner::Error; end
|
3
app/errors/runner/error/execution_timeout.rb
Normal file
3
app/errors/runner/error/execution_timeout.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Error::ExecutionTimeout < Runner::Error; end
|
3
app/errors/runner/error/internal_server_error.rb
Normal file
3
app/errors/runner/error/internal_server_error.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Error::InternalServerError < Runner::Error; end
|
3
app/errors/runner/error/not_available.rb
Normal file
3
app/errors/runner/error/not_available.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Error::NotAvailable < Runner::Error; end
|
3
app/errors/runner/error/not_found.rb
Normal file
3
app/errors/runner/error/not_found.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Error::NotFound < Runner::Error; end
|
3
app/errors/runner/error/unauthorized.rb
Normal file
3
app/errors/runner/error/unauthorized.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Error::Unauthorized < Runner::Error; end
|
3
app/errors/runner/error/unknown.rb
Normal file
3
app/errors/runner/error/unknown.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class Runner::Error::Unknown < Runner::Error; end
|
@ -1,4 +0,0 @@
|
|||||||
# frozen_string_literal: true
|
|
||||||
|
|
||||||
class RunnerNotAvailableError < ApplicationError
|
|
||||||
end
|
|
@ -4,51 +4,60 @@ class Runner < ApplicationRecord
|
|||||||
BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url]
|
BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url]
|
||||||
HEADERS = {'Content-Type' => 'application/json'}.freeze
|
HEADERS = {'Content-Type' => 'application/json'}.freeze
|
||||||
UNUSED_EXPIRATION_TIME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:unused_runner_expiration_time].seconds
|
UNUSED_EXPIRATION_TIME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:unused_runner_expiration_time].seconds
|
||||||
|
ERRORS = %w[NOMAD_UNREACHABLE NOMAD_OVERLOAD NOMAD_INTERNAL_SERVER_ERROR UNKNOWN].freeze
|
||||||
|
|
||||||
|
ERRORS.each do |error|
|
||||||
|
define_singleton_method :"error_#{error.downcase}" do
|
||||||
|
error
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
belongs_to :execution_environment
|
belongs_to :execution_environment
|
||||||
belongs_to :user, polymorphic: true
|
belongs_to :user, polymorphic: true
|
||||||
|
|
||||||
before_create :new_runner
|
before_validation :request_remotely
|
||||||
before_destroy :destroy_runner
|
|
||||||
|
|
||||||
validates :execution_environment, presence: true
|
validates :execution_environment, :user, :runner_id, presence: true
|
||||||
validates :user, presence: true
|
|
||||||
|
|
||||||
def self.for(user, exercise)
|
def self.for(user, exercise)
|
||||||
execution_environment = ExecutionEnvironment.find(exercise.execution_environment_id)
|
execution_environment = ExecutionEnvironment.find(exercise.execution_environment_id)
|
||||||
runner = find_or_create_by(user: user, execution_environment: execution_environment)
|
runner = find_or_create_by(user: user, execution_environment: execution_environment)
|
||||||
|
|
||||||
return runner if runner.save
|
unless runner.persisted?
|
||||||
|
# runner was not saved in the database (was not valid)
|
||||||
|
raise Runner::Error::InternalServerError.new("Provided runner could not be saved: #{runner.errors.inspect}")
|
||||||
|
end
|
||||||
|
|
||||||
raise RunnerNotAvailableError.new('No runner available')
|
runner
|
||||||
end
|
end
|
||||||
|
|
||||||
def copy_files(files)
|
def copy_files(files)
|
||||||
url = "#{runner_url}/files"
|
url = "#{runner_url}/files"
|
||||||
body = {copy: files.map {|filename, content| {path: filename, content: Base64.strict_encode64(content)} }}
|
body = {copy: files.map {|filename, content| {path: filename, content: Base64.strict_encode64(content)} }}
|
||||||
response = Faraday.patch(url, body.to_json, HEADERS)
|
response = Faraday.patch(url, body.to_json, HEADERS)
|
||||||
return unless response.status == 404
|
handle_error response unless response.status == 204
|
||||||
|
|
||||||
# runner has disappeared for some reason
|
|
||||||
destroy
|
|
||||||
raise RunnerNotAvailableError.new('Runner unavailable')
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def execute_command(command)
|
def execute_command(command)
|
||||||
url = "#{runner_url}/execute"
|
url = "#{runner_url}/execute"
|
||||||
body = {command: command, timeLimit: execution_environment.permitted_execution_time}
|
body = {command: command, timeLimit: execution_environment.permitted_execution_time}
|
||||||
response = Faraday.post(url, body.to_json, HEADERS)
|
response = Faraday.post(url, body.to_json, HEADERS)
|
||||||
if response.status == 404
|
if response.status == 200
|
||||||
# runner has disappeared for some reason
|
response_body = parse response
|
||||||
destroy
|
websocket_url = response_body[:websocketUrl]
|
||||||
raise RunnerNotAvailableError.new('Runner unavailable')
|
if websocket_url.present?
|
||||||
|
return websocket_url
|
||||||
|
else
|
||||||
|
raise Runner::Error::Unknown.new('Runner management sent unexpected response')
|
||||||
|
end
|
||||||
end
|
end
|
||||||
parse response
|
|
||||||
|
handle_error response
|
||||||
end
|
end
|
||||||
|
|
||||||
def execute_interactively(command)
|
def execute_interactively(command)
|
||||||
starting_time = Time.zone.now
|
starting_time = Time.zone.now
|
||||||
websocket_url = execute_command(command)[:websocketUrl]
|
websocket_url = execute_command(command)
|
||||||
EventMachine.run do
|
EventMachine.run do
|
||||||
socket = Runner::Connection.new(websocket_url)
|
socket = Runner::Connection.new(websocket_url)
|
||||||
yield(self, socket) if block_given?
|
yield(self, socket) if block_given?
|
||||||
@ -56,24 +65,64 @@ class Runner < ApplicationRecord
|
|||||||
Time.zone.now - starting_time # execution time
|
Time.zone.now - starting_time # execution time
|
||||||
end
|
end
|
||||||
|
|
||||||
def destroy_runner
|
# This method is currently not used.
|
||||||
Faraday.delete runner_url
|
# This does *not* destroy the ActiveRecord model.
|
||||||
end
|
def destroy_remotely
|
||||||
|
response = Faraday.delete runner_url
|
||||||
|
return if response.status == 204
|
||||||
|
|
||||||
def status
|
if response.status == 404
|
||||||
# TODO: return actual state retrieved via websocket
|
raise Runner::Error::NotFound.new('Runner not found')
|
||||||
:timeouted
|
else
|
||||||
|
handle_error response
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def new_runner
|
def request_remotely
|
||||||
|
return if runner_id.present?
|
||||||
|
|
||||||
url = "#{BASE_URL}/runners"
|
url = "#{BASE_URL}/runners"
|
||||||
body = {executionEnvironmentId: execution_environment.id, inactivityTimeout: UNUSED_EXPIRATION_TIME}
|
body = {executionEnvironmentId: execution_environment.id, inactivityTimeout: UNUSED_EXPIRATION_TIME}
|
||||||
response = Faraday.post(url, body.to_json, HEADERS)
|
response = Faraday.post(url, body.to_json, HEADERS)
|
||||||
response_body = parse response
|
|
||||||
self.runner_id = response_body[:runnerId]
|
case response.status
|
||||||
throw :abort unless response.status == 200
|
when 200
|
||||||
|
response_body = parse response
|
||||||
|
runner_id = response_body[:runnerId]
|
||||||
|
throw(:abort) if runner_id.blank?
|
||||||
|
self.runner_id = response_body[:runnerId]
|
||||||
|
when 404
|
||||||
|
raise Runner::Error::NotFound.new('Execution environment not found')
|
||||||
|
else
|
||||||
|
handle_error response
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_error(response)
|
||||||
|
case response.status
|
||||||
|
when 400
|
||||||
|
response_body = parse response
|
||||||
|
raise Runner::Error::BadRequest.new(response_body[:message])
|
||||||
|
when 401
|
||||||
|
raise Runner::Error::Unauthorized.new('Authentication with runner management failed')
|
||||||
|
when 404
|
||||||
|
# The runner does not exist in the runner management (e.g. due to an inactivity timeout).
|
||||||
|
# Delete the runner model in this case as it can not be used anymore.
|
||||||
|
destroy
|
||||||
|
raise Runner::Error::NotFound.new('Runner not found')
|
||||||
|
when 500
|
||||||
|
response_body = parse response
|
||||||
|
error_code = response_body[:errorCode]
|
||||||
|
if error_code == Runner.error_nomad_overload
|
||||||
|
raise Runner::Error::NotAvailable.new("No runner available (#{error_code}): #{response_body[:message]}")
|
||||||
|
else
|
||||||
|
raise Runner::Error::InternalServerError.new("#{response_body[:errorCode]}: #{response_body[:message]}")
|
||||||
|
end
|
||||||
|
else
|
||||||
|
raise Runner::Error::Unknown.new('Runner management sent unexpected response')
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def runner_url
|
def runner_url
|
||||||
@ -82,5 +131,8 @@ class Runner < ApplicationRecord
|
|||||||
|
|
||||||
def parse(response)
|
def parse(response)
|
||||||
JSON.parse(response.body).deep_symbolize_keys
|
JSON.parse(response.body).deep_symbolize_keys
|
||||||
|
rescue JSON::ParserError => e
|
||||||
|
# the runner management should not send invalid json
|
||||||
|
raise Runner::Error::Unknown.new("Error parsing response from runner management: #{e.message}")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -4,16 +4,20 @@ require 'faye/websocket/client'
|
|||||||
require 'json_schemer'
|
require 'json_schemer'
|
||||||
|
|
||||||
class Runner::Connection
|
class Runner::Connection
|
||||||
|
# These are events for which callbacks can be registered.
|
||||||
EVENTS = %i[start output exit stdout stderr].freeze
|
EVENTS = %i[start output exit stdout stderr].freeze
|
||||||
BACKEND_OUTPUT_SCHEMA = JSONSchemer.schema(JSON.parse(File.read('lib/runner/backend-output.schema.json')))
|
BACKEND_OUTPUT_SCHEMA = JSONSchemer.schema(JSON.parse(File.read('lib/runner/backend-output.schema.json')))
|
||||||
|
|
||||||
def initialize(url)
|
def initialize(url)
|
||||||
@socket = Faye::WebSocket::Client.new(url, [], ping: 5)
|
@socket = Faye::WebSocket::Client.new(url, [], ping: 5)
|
||||||
|
|
||||||
|
# For every event type of faye websockets, the corresponding
|
||||||
|
# RunnerConnection method starting with `on_` is called.
|
||||||
%i[open message error close].each do |event_type|
|
%i[open message error close].each do |event_type|
|
||||||
@socket.on(event_type) {|event| __send__(:"on_#{event_type}", event) }
|
@socket.on(event_type) {|event| __send__(:"on_#{event_type}", event) }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# This registers empty default callbacks.
|
||||||
EVENTS.each {|event_type| instance_variable_set(:"@#{event_type}_callback", ->(e) {}) }
|
EVENTS.each {|event_type| instance_variable_set(:"@#{event_type}_callback", ->(e) {}) }
|
||||||
@start_callback = -> {}
|
@start_callback = -> {}
|
||||||
@exit_code = 0
|
@exit_code = 0
|
||||||
@ -43,6 +47,7 @@ class Runner::Connection
|
|||||||
return unless BACKEND_OUTPUT_SCHEMA.valid?(JSON.parse(event.data))
|
return unless BACKEND_OUTPUT_SCHEMA.valid?(JSON.parse(event.data))
|
||||||
|
|
||||||
event = decode(event.data)
|
event = decode(event.data)
|
||||||
|
# There is one `handle_` method for every message type defined in the WebSocket schema.
|
||||||
__send__("handle_#{event[:type]}", event)
|
__send__("handle_#{event[:type]}", event)
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -50,7 +55,7 @@ class Runner::Connection
|
|||||||
@start_callback.call
|
@start_callback.call
|
||||||
end
|
end
|
||||||
|
|
||||||
def on_error(event); end
|
def on_error(_event); end
|
||||||
|
|
||||||
def on_close(_event)
|
def on_close(_event)
|
||||||
@exit_callback.call @exit_code
|
@exit_callback.call @exit_code
|
||||||
@ -70,11 +75,11 @@ class Runner::Connection
|
|||||||
@output_callback.call event[:data]
|
@output_callback.call event[:data]
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_error(event) end
|
def handle_error(_event); end
|
||||||
|
|
||||||
def handle_start(event) end
|
def handle_start(_event); end
|
||||||
|
|
||||||
def handle_timeout(event)
|
def handle_timeout(_event)
|
||||||
# TODO: set the runner state
|
raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user