
Previously, the same runner could be used multiple times with different submissions simultaneously. This, however, yielded errors, for example when one submission time oud (causing the running to be deleted) while another submission was still executed. Admin actions, such as the shell, can be still executed regardless of any other code execution. Fixes CODEOCEAN-HG Fixes openHPI/poseidon#423
236 lines
8.9 KiB
Ruby
236 lines
8.9 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
class Runner < ApplicationRecord
|
|
include ContributorCreation
|
|
belongs_to :execution_environment
|
|
|
|
before_validation :request_id
|
|
validates :runner_id, presence: true
|
|
|
|
attr_accessor :strategy
|
|
|
|
def self.strategy_class
|
|
@strategy_class ||= if Runner.management_active?
|
|
strategy_name = CodeOcean::Config.new(:code_ocean).read[:runner_management][:strategy]
|
|
"runner/strategy/#{strategy_name}".camelize.constantize
|
|
else
|
|
Runner::Strategy::Null
|
|
end
|
|
end
|
|
|
|
def self.management_active?
|
|
@management_active ||= begin
|
|
runner_management = CodeOcean::Config.new(:code_ocean).read[:runner_management]
|
|
if runner_management
|
|
runner_management[:enabled]
|
|
else
|
|
false
|
|
end
|
|
end
|
|
end
|
|
|
|
def self.for(contributor, execution_environment)
|
|
runner = find_by(contributor:, execution_environment:)
|
|
if runner.nil?
|
|
runner = Runner.create(contributor:, execution_environment:)
|
|
# The `strategy` is added through the before_validation hook `:request_id`.
|
|
raise Runner::Error::Unknown.new("Runner could not be saved: #{runner.errors.inspect}") unless runner.persisted?
|
|
else
|
|
# This information is required but not persisted in the runner model.
|
|
runner.strategy = strategy_class.new(runner.runner_id, runner.execution_environment)
|
|
end
|
|
|
|
runner
|
|
end
|
|
|
|
def copy_files(files)
|
|
reserve!
|
|
@strategy.copy_files(files)
|
|
rescue Runner::Error::RunnerNotFound
|
|
request_new_id
|
|
save
|
|
@strategy.copy_files(files)
|
|
ensure
|
|
release!
|
|
end
|
|
|
|
def download_file(desired_file, privileged_execution:, exclusive: true)
|
|
reserve! if exclusive
|
|
@strategy.download_file(desired_file, privileged_execution:)
|
|
release! if exclusive
|
|
end
|
|
|
|
def retrieve_files(raise_exception: true, exclusive: true, **)
|
|
reserve! if exclusive
|
|
try = 0
|
|
begin
|
|
if try.nonzero?
|
|
request_new_id
|
|
save
|
|
end
|
|
@strategy.retrieve_files(**)
|
|
rescue Runner::Error::RunnerNotFound => e
|
|
Rails.logger.debug { "Retrieving files failed for the first time: #{e.message}" }
|
|
try += 1
|
|
|
|
if try == 1
|
|
# This is only used if no files were copied to the runner. Thus requesting a second runner is performed here
|
|
# Reset the variable. This is required to prevent raising an outdated exception after a successful second try
|
|
e = nil
|
|
retry
|
|
end
|
|
ensure
|
|
# We forward the exception if requested
|
|
raise e if raise_exception && defined?(e) && e.present?
|
|
|
|
# Otherwise, we return an hash with empty files and release the runner
|
|
release! if exclusive
|
|
{'files' => []}
|
|
end
|
|
end
|
|
|
|
def attach_to_execution(command, privileged_execution: false, exclusive: true, &block)
|
|
reserve! if exclusive
|
|
Rails.logger.debug { "#{Time.zone.now.getutc.inspect}: Starting execution with Runner #{id} for #{contributor_type} #{contributor_id}." }
|
|
starting_time = Time.zone.now
|
|
begin
|
|
# As the EventMachine reactor is probably shared with other threads, we cannot use EventMachine.run with
|
|
# stop_event_loop to wait for the WebSocket connection to terminate. Instead we use a self built event
|
|
# loop for that: Runner::EventLoop. The attach_to_execution method of the strategy is responsible for
|
|
# initializing its Runner::Connection with the given event loop. The Runner::Connection class ensures that
|
|
# this event loop is stopped after the socket was closed.
|
|
event_loop = Runner::EventLoop.new
|
|
socket = @strategy.attach_to_execution(command, event_loop, starting_time, privileged_execution:, &block)
|
|
event_loop.wait
|
|
raise socket.error if socket.error.present?
|
|
rescue Runner::Error => e
|
|
e.starting_time = starting_time
|
|
e.execution_duration = Time.zone.now - starting_time
|
|
raise
|
|
end
|
|
release! if exclusive
|
|
Rails.logger.debug { "#{Time.zone.now.getutc.inspect}: Stopped execution with Runner #{id} for #{contributor_type} #{contributor_id}." }
|
|
Time.zone.now - starting_time # execution duration
|
|
end
|
|
|
|
def execute_command(command, privileged_execution: false, raise_exception: true, exclusive: true)
|
|
output = {
|
|
stdout: +'',
|
|
stderr: +'',
|
|
messages: [],
|
|
exit_code: 1, # default to error
|
|
}
|
|
try = 0
|
|
|
|
begin
|
|
if try.nonzero?
|
|
request_new_id
|
|
save
|
|
end
|
|
|
|
execution_time = attach_to_execution(command, privileged_execution:, exclusive:) do |socket, starting_time|
|
|
socket.on :stderr do |data|
|
|
output[:stderr] << data
|
|
output[:messages].push({cmd: :write, stream: :stderr, log: data, timestamp: Time.zone.now - starting_time})
|
|
end
|
|
socket.on :stdout do |data|
|
|
output[:stdout] << data
|
|
output[:messages].push({cmd: :write, stream: :stdout, log: data, timestamp: Time.zone.now - starting_time})
|
|
end
|
|
socket.on :exit do |received_exit_code|
|
|
output[:exit_code] = received_exit_code
|
|
end
|
|
end
|
|
output.merge!(container_execution_time: execution_time, status: output[:exit_code].zero? ? :ok : :failed)
|
|
rescue Runner::Error::ExecutionTimeout => e
|
|
Rails.logger.debug { "Running command `#{command}` timed out: #{e.message}" }
|
|
output.merge!(status: :timeout, container_execution_time: e.execution_duration)
|
|
rescue Runner::Error::OutOfMemory => e
|
|
Rails.logger.debug { "Running command `#{command}` caused an out of memory error: #{e.message}" }
|
|
output.merge!(status: :out_of_memory, container_execution_time: e.execution_duration)
|
|
rescue Runner::Error::RunnerInUse => e
|
|
Rails.logger.debug { "Running command `#{command}` failed because the runner was already in use: #{e.message}" }
|
|
output.merge!(status: :runner_in_use, container_execution_time: e.execution_duration)
|
|
rescue Runner::Error::RunnerNotFound => e
|
|
Rails.logger.debug { "Running command `#{command}` failed for the first time: #{e.message}" }
|
|
try += 1
|
|
|
|
if try == 1
|
|
# This is only used if no files were copied to the runner. Thus requesting a second runner is performed here
|
|
# Reset the variable. This is required to prevent raising an outdated exception after a successful second try
|
|
e = nil
|
|
retry
|
|
end
|
|
|
|
Rails.logger.debug { "Running command `#{command}` failed for the second time: #{e.message}" }
|
|
output.merge!(status: :failed, container_execution_time: e.execution_duration)
|
|
rescue Runner::Error => e
|
|
Rails.logger.debug { "Running command `#{command}` failed: #{e.message}" }
|
|
output.merge!(status: :container_depleted, container_execution_time: e.execution_duration)
|
|
ensure
|
|
# We forward the exception if requested
|
|
raise e if raise_exception && defined?(e) && e.present?
|
|
|
|
# If the process was killed with SIGKILL, it is most likely that the OOM killer was triggered.
|
|
output[:status] = :out_of_memory if output[:exit_code] == 137
|
|
end
|
|
end
|
|
|
|
def destroy_at_management
|
|
@strategy.destroy_at_management
|
|
update!(runner_id: nil, reserved_until: nil)
|
|
end
|
|
|
|
def reserve!
|
|
with_lock do
|
|
if reserved_until.present? && reserved_until > Time.zone.now
|
|
@error = Runner::Error::RunnerInUse.new("The desired Runner #{id} is already in use until #{reserved_until.iso8601}.")
|
|
raise @error
|
|
else
|
|
update!(reserved_until: Time.zone.now + execution_environment.permitted_execution_time.seconds)
|
|
@error = nil
|
|
end
|
|
end
|
|
end
|
|
|
|
def release!
|
|
return if @error.present?
|
|
|
|
with_lock do
|
|
update!(reserved_until: nil)
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def request_id
|
|
request_new_id if runner_id.blank?
|
|
end
|
|
|
|
def request_new_id
|
|
strategy_class = self.class.strategy_class
|
|
begin
|
|
self.runner_id = strategy_class.request_from_management(execution_environment)
|
|
@strategy = strategy_class.new(runner_id, execution_environment)
|
|
rescue Runner::Error::EnvironmentNotFound
|
|
# Whenever the environment could not be found by the runner management, we
|
|
# try to synchronize it and then forward a more specific error to our callee.
|
|
begin
|
|
strategy_class.sync_environment(execution_environment)
|
|
rescue Runner::Error
|
|
# An additional error was raised during synchronization
|
|
raise Runner::Error::EnvironmentNotFound.new(
|
|
"The execution environment with id #{execution_environment.id} was not found by the runner management. " \
|
|
'In addition, it could not be synced so that this probably indicates a permanent error.'
|
|
)
|
|
else
|
|
# No error was raised during synchronization
|
|
raise Runner::Error::EnvironmentNotFound.new(
|
|
"The execution environment with id #{execution_environment.id} was not found yet by the runner management. " \
|
|
'It has been successfully synced now so that the next request should be successful.'
|
|
)
|
|
end
|
|
end
|
|
end
|
|
end
|