Introduce strategy for runner behavior
The runner model is only a class responsible for storing information now. Based on the configuration it picks a strategy for the runner management. The Poseidon strategy is already implemented and tested. The Docker strategy will follow.
This commit is contained in:

committed by
Sebastian Serth

parent
cf58be97ee
commit
d0d1b1bffd
@@ -131,7 +131,7 @@ class SubmissionsController < ApplicationController
|
||||
end
|
||||
end
|
||||
|
||||
def handle_websockets(tubesock, runner, socket)
|
||||
def handle_websockets(tubesock, socket)
|
||||
tubesock.send_data JSON.dump({cmd: :status, status: :container_running})
|
||||
@output = +''
|
||||
|
||||
@@ -150,11 +150,10 @@ class SubmissionsController < ApplicationController
|
||||
|
||||
socket.on :exit do |exit_code|
|
||||
EventMachine.stop_event_loop
|
||||
status = runner.status
|
||||
if @output.empty?
|
||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"})
|
||||
end
|
||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"}) unless status == :timeouted
|
||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"})
|
||||
kill_socket(tubesock)
|
||||
end
|
||||
|
||||
@@ -170,6 +169,7 @@ class SubmissionsController < ApplicationController
|
||||
else
|
||||
Rails.logger.info("Unknown command from client: #{event[:cmd]}")
|
||||
end
|
||||
|
||||
rescue JSON::ParserError
|
||||
Rails.logger.debug { "Data received from client is not valid json: #{data}" }
|
||||
Sentry.set_extras(data: data)
|
||||
@@ -183,15 +183,16 @@ class SubmissionsController < ApplicationController
|
||||
hijack do |tubesock|
|
||||
return kill_socket(tubesock) if @embed_options[:disable_run]
|
||||
|
||||
@container_execution_time = @submission.run(sanitize_filename) do |runner, socket|
|
||||
@waiting_for_container_time = runner.waiting_time
|
||||
handle_websockets(tubesock, runner, socket)
|
||||
durations = @submission.run(sanitize_filename) do |socket|
|
||||
handle_websockets(tubesock, socket)
|
||||
end
|
||||
@container_execution_time = durations[:execution_duration]
|
||||
@waiting_for_container_time = durations[:waiting_duration]
|
||||
save_run_output
|
||||
rescue Runner::Error::ExecutionTimeout => e
|
||||
tubesock.send_data JSON.dump({cmd: :status, status: :timeout})
|
||||
kill_socket(tubesock)
|
||||
Rails.logger.debug { "Running a submission failed: #{e.message}" }
|
||||
Rails.logger.debug { "Running a submission timed out: #{e.message}" }
|
||||
rescue Runner::Error => e
|
||||
tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted})
|
||||
kill_socket(tubesock)
|
||||
@@ -244,7 +245,7 @@ class SubmissionsController < ApplicationController
|
||||
# send_hints(tubesock, StructuredError.where(submission: @submission))
|
||||
rescue Runner::Error::ExecutionTimeout => e
|
||||
tubesock.send_data JSON.dump({cmd: :status, status: :timeout})
|
||||
Rails.logger.debug { "Running a submission failed: #{e.message}" }
|
||||
Rails.logger.debug { "Scoring a submission timed out: #{e.message}" }
|
||||
rescue Runner::Error => e
|
||||
tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted})
|
||||
Rails.logger.debug { "Runner error while scoring a submission: #{e.message}" }
|
||||
|
@@ -1,138 +1,57 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require 'forwardable'
|
||||
|
||||
class Runner < ApplicationRecord
|
||||
BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url]
|
||||
HEADERS = {'Content-Type' => 'application/json'}.freeze
|
||||
UNUSED_EXPIRATION_TIME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:unused_runner_expiration_time].seconds
|
||||
ERRORS = %w[NOMAD_UNREACHABLE NOMAD_OVERLOAD NOMAD_INTERNAL_SERVER_ERROR UNKNOWN].freeze
|
||||
|
||||
ERRORS.each do |error|
|
||||
define_singleton_method :"error_#{error.downcase}" do
|
||||
error
|
||||
end
|
||||
end
|
||||
|
||||
belongs_to :execution_environment
|
||||
belongs_to :user, polymorphic: true
|
||||
|
||||
before_validation :request_remotely
|
||||
before_validation :request_id
|
||||
|
||||
validates :execution_environment, :user, :runner_id, presence: true
|
||||
|
||||
STRATEGY_NAME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:strategy]
|
||||
UNUSED_EXPIRATION_TIME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:unused_runner_expiration_time].seconds
|
||||
BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url]
|
||||
DELEGATED_STRATEGY_METHODS = %i[destroy_at_management attach_to_execution copy_files].freeze
|
||||
|
||||
attr_accessor :strategy
|
||||
|
||||
def self.strategy_class
|
||||
"runner/strategy/#{STRATEGY_NAME}".camelize.constantize
|
||||
end
|
||||
|
||||
def self.for(user, exercise)
|
||||
execution_environment = ExecutionEnvironment.find(exercise.execution_environment_id)
|
||||
runner = find_or_create_by(user: user, execution_environment: execution_environment)
|
||||
|
||||
unless runner.persisted?
|
||||
# runner was not saved in the database (was not valid)
|
||||
raise Runner::Error::InternalServerError.new("Provided runner could not be saved: #{runner.errors.inspect}")
|
||||
runner = find_by(user: user, execution_environment: execution_environment)
|
||||
if runner.nil?
|
||||
runner = Runner.create(user: user, execution_environment: execution_environment)
|
||||
raise Runner::Error::Unknown.new("Runner could not be saved: #{runner.errors.inspect}") unless runner.persisted?
|
||||
else
|
||||
runner.strategy = strategy_class.new(runner.runner_id, runner.execution_environment)
|
||||
end
|
||||
|
||||
runner
|
||||
end
|
||||
|
||||
def copy_files(files)
|
||||
url = "#{runner_url}/files"
|
||||
body = {copy: files.map {|filename, content| {path: filename, content: Base64.strict_encode64(content)} }}
|
||||
response = Faraday.patch(url, body.to_json, HEADERS)
|
||||
handle_error response unless response.status == 204
|
||||
end
|
||||
|
||||
def execute_command(command)
|
||||
url = "#{runner_url}/execute"
|
||||
body = {command: command, timeLimit: execution_environment.permitted_execution_time}
|
||||
response = Faraday.post(url, body.to_json, HEADERS)
|
||||
if response.status == 200
|
||||
response_body = parse response
|
||||
websocket_url = response_body[:websocketUrl]
|
||||
if websocket_url.present?
|
||||
return websocket_url
|
||||
else
|
||||
raise Runner::Error::Unknown.new('Runner management sent unexpected response')
|
||||
end
|
||||
end
|
||||
|
||||
handle_error response
|
||||
end
|
||||
|
||||
def execute_interactively(command)
|
||||
starting_time = Time.zone.now
|
||||
websocket_url = execute_command(command)
|
||||
EventMachine.run do
|
||||
socket = Runner::Connection.new(websocket_url)
|
||||
yield(self, socket) if block_given?
|
||||
end
|
||||
Time.zone.now - starting_time # execution time
|
||||
end
|
||||
|
||||
# This method is currently not used.
|
||||
# This does *not* destroy the ActiveRecord model.
|
||||
def destroy_remotely
|
||||
response = Faraday.delete runner_url
|
||||
return if response.status == 204
|
||||
|
||||
if response.status == 404
|
||||
raise Runner::Error::NotFound.new('Runner not found')
|
||||
else
|
||||
handle_error response
|
||||
DELEGATED_STRATEGY_METHODS.each do |method|
|
||||
define_method(method) do |*args, &block|
|
||||
@strategy.send(method, *args, &block)
|
||||
rescue Runner::Error::NotFound
|
||||
update(runner_id: self.class.strategy_class.request_from_management(execution_environment))
|
||||
@strategy = self.class.strategy_class.new(runner_id, execution_environment)
|
||||
@strategy.send(method, *args, &block)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def request_remotely
|
||||
def request_id
|
||||
return if runner_id.present?
|
||||
|
||||
url = "#{BASE_URL}/runners"
|
||||
body = {executionEnvironmentId: execution_environment.id, inactivityTimeout: UNUSED_EXPIRATION_TIME}
|
||||
response = Faraday.post(url, body.to_json, HEADERS)
|
||||
|
||||
case response.status
|
||||
when 200
|
||||
response_body = parse response
|
||||
runner_id = response_body[:runnerId]
|
||||
throw(:abort) if runner_id.blank?
|
||||
self.runner_id = response_body[:runnerId]
|
||||
when 404
|
||||
raise Runner::Error::NotFound.new('Execution environment not found')
|
||||
else
|
||||
handle_error response
|
||||
end
|
||||
end
|
||||
|
||||
def handle_error(response)
|
||||
case response.status
|
||||
when 400
|
||||
response_body = parse response
|
||||
raise Runner::Error::BadRequest.new(response_body[:message])
|
||||
when 401
|
||||
raise Runner::Error::Unauthorized.new('Authentication with runner management failed')
|
||||
when 404
|
||||
# The runner does not exist in the runner management (e.g. due to an inactivity timeout).
|
||||
# Delete the runner model in this case as it can not be used anymore.
|
||||
destroy
|
||||
raise Runner::Error::NotFound.new('Runner not found')
|
||||
when 500
|
||||
response_body = parse response
|
||||
error_code = response_body[:errorCode]
|
||||
if error_code == Runner.error_nomad_overload
|
||||
raise Runner::Error::NotAvailable.new("No runner available (#{error_code}): #{response_body[:message]}")
|
||||
else
|
||||
raise Runner::Error::InternalServerError.new("#{response_body[:errorCode]}: #{response_body[:message]}")
|
||||
end
|
||||
else
|
||||
raise Runner::Error::Unknown.new('Runner management sent unexpected response')
|
||||
end
|
||||
end
|
||||
|
||||
def runner_url
|
||||
"#{BASE_URL}/runners/#{runner_id}"
|
||||
end
|
||||
|
||||
def parse(response)
|
||||
JSON.parse(response.body).deep_symbolize_keys
|
||||
rescue JSON::ParserError => e
|
||||
# the runner management should not send invalid json
|
||||
raise Runner::Error::Unknown.new("Error parsing response from runner management: #{e.message}")
|
||||
strategy_class = self.class.strategy_class
|
||||
self.runner_id = strategy_class.request_from_management(execution_environment)
|
||||
@strategy = strategy_class.new(runner_id, execution_environment)
|
||||
end
|
||||
end
|
||||
|
@@ -141,13 +141,13 @@ class Submission < ApplicationRecord
|
||||
|
||||
def calculate_score
|
||||
score = nil
|
||||
prepared_runner do |runner|
|
||||
prepared_runner do |runner, waiting_duration|
|
||||
scores = collect_files.select(&:teacher_defined_assessment?).map do |file|
|
||||
score_command = command_for execution_environment.test_command, file.name_with_extension
|
||||
stdout = ''
|
||||
stderr = ''
|
||||
stdout = +''
|
||||
stderr = +''
|
||||
exit_code = 1 # default to error
|
||||
execution_time = runner.execute_interactively(score_command) do |_runner, socket|
|
||||
execution_time = runner.attach_to_execution(score_command) do |socket|
|
||||
socket.on :stderr do |data|
|
||||
stderr << data
|
||||
end
|
||||
@@ -161,7 +161,7 @@ class Submission < ApplicationRecord
|
||||
end
|
||||
output = {
|
||||
file_role: file.role,
|
||||
waiting_for_container_time: runner.waiting_time,
|
||||
waiting_for_container_time: waiting_duration,
|
||||
container_execution_time: execution_time,
|
||||
status: exit_code.zero? ? :ok : :failed,
|
||||
stdout: stdout,
|
||||
@@ -176,11 +176,12 @@ class Submission < ApplicationRecord
|
||||
|
||||
def run(file, &block)
|
||||
run_command = command_for execution_environment.run_command, file
|
||||
execution_time = 0
|
||||
prepared_runner do |runner|
|
||||
execution_time = runner.execute_interactively(run_command, &block)
|
||||
durations = {}
|
||||
prepared_runner do |runner, waiting_duration|
|
||||
durations[:execution_duration] = runner.attach_to_execution(run_command, &block)
|
||||
durations[:waiting_duration] = waiting_duration
|
||||
end
|
||||
execution_time
|
||||
durations
|
||||
end
|
||||
|
||||
private
|
||||
@@ -197,8 +198,8 @@ class Submission < ApplicationRecord
|
||||
request_time = Time.zone.now
|
||||
runner = Runner.for(user, exercise)
|
||||
copy_files_to runner
|
||||
runner.waiting_time = Time.zone.now - request_time
|
||||
yield(runner) if block_given?
|
||||
waiting_duration = Time.zone.now - request_time
|
||||
yield(runner, waiting_duration) if block_given?
|
||||
end
|
||||
|
||||
def command_for(template, file)
|
||||
|
Reference in New Issue
Block a user