Add strategy for DockerContainerPool

In order to provide an alternative to Poseidon, a strategy for the
DockerContainerPool is added that is used by the runner model.

Co-authored-by: Sebastian Serth <Sebastian.Serth@hpi.de>
This commit is contained in:
Felix Auringer
2021-06-10 16:17:02 +02:00
committed by Sebastian Serth
parent 1d3f0d7ad8
commit 704407b9fc
12 changed files with 282 additions and 69 deletions

View File

@ -3,6 +3,11 @@
require 'concurrent/future'
require 'concurrent/timer_task'
# get_container, destroy_container was moved to lib/runner/strategy/docker_container_pool.rb.
# return_container is not used anymore because runners are not shared between users anymore.
# create_container is done by the DockerContainerPool.
# dump_info and quantities are still in use.
class DockerContainerPool
def self.config
# TODO: Why erb?
@ -22,6 +27,7 @@ class DockerContainerPool
nil
end
# not in use because DockerClient::RECYCLE_CONTAINERS == false
def self.return_container(container, execution_environment)
Faraday.get("#{config[:location]}/docker_container_pool/return_container/#{container.id}")
rescue StandardError => e

View File

@ -7,10 +7,13 @@ class Runner::Connection
# These are events for which callbacks can be registered.
EVENTS = %i[start output exit stdout stderr].freeze
BACKEND_OUTPUT_SCHEMA = JSONSchemer.schema(JSON.parse(File.read('lib/runner/backend-output.schema.json')))
TIMEOUT_EXIT_STATUS = -100
def initialize(url)
attr_writer :status
def initialize(url, strategy)
@socket = Faye::WebSocket::Client.new(url, [], ping: 5)
@strategy = strategy
@status = :established
# For every event type of faye websockets, the corresponding
# RunnerConnection method starting with `on_` is called.
@ -37,18 +40,19 @@ class Runner::Connection
private
def decode(event)
JSON.parse(event).deep_symbolize_keys
def decode(_raw_event)
raise NotImplementedError
end
def encode(data)
data
def encode(_data)
raise NotImplementedError
end
def on_message(event)
return unless BACKEND_OUTPUT_SCHEMA.valid?(JSON.parse(event.data))
def on_message(raw_event)
event = decode(raw_event)
return unless BACKEND_OUTPUT_SCHEMA.valid?(event)
event = decode(event.data)
event = event.deep_symbolize_keys
# There is one `handle_` method for every message type defined in the WebSocket schema.
__send__("handle_#{event[:type]}", event)
end
@ -60,10 +64,20 @@ class Runner::Connection
def on_error(_event); end
def on_close(_event)
@exit_callback.call @exit_code
case @status
when :timeout
raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
when :terminated
@exit_callback.call @exit_code
else # :established
# If the runner is killed by the DockerContainerPool after the maximum allowed time per user and
# while the owning user is running an execution, the command execution stops and log output is incomplete.
raise Runner::Error::Unknown.new('Execution terminated with an unknown reason')
end
end
def handle_exit(event)
@status = :terminated
@exit_code = event[:data]
end
@ -82,7 +96,6 @@ class Runner::Connection
def handle_start(_event); end
def handle_timeout(_event)
@exit_code = TIMEOUT_EXIT_STATUS
raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
@status = :timeout
end
end

View File

@ -1,8 +1,7 @@
# frozen_string_literal: true
class Runner::Strategy
def initialize(runner_id, environment)
@runner_id = runner_id
def initialize(_runner_id, environment)
@execution_environment = environment
end

View File

@ -1,3 +0,0 @@
# frozen_string_literal: true
class Runner::Strategy::Docker < Runner::Strategy; end

View File

@ -0,0 +1,138 @@
# frozen_string_literal: true
class Runner::Strategy::DockerContainerPool < Runner::Strategy
attr_reader :container_id, :command, :execution_environment
def self.config
# Since the docker configuration file contains code that must be executed, we use ERB templating.
@config ||= CodeOcean::Config.new(:docker).read(erb: true)
end
def self.request_from_management(environment)
container_id = JSON.parse(Faraday.get("#{config[:pool][:location]}/docker_container_pool/get_container/#{environment.id}").body)['id']
container_id.presence || raise(Runner::Error::NotAvailable.new("DockerContainerPool didn't return a container id"))
rescue Faraday::Error => e
raise Runner::Error::Unknown.new("Faraday request to DockerContainerPool failed: #{e.inspect}")
rescue JSON::ParserError => e
raise Runner::Error::Unknown.new("DockerContainerPool returned invalid JSON: #{e.inspect}")
end
def initialize(runner_id, _environment)
super
@container_id = runner_id
end
def copy_files(files)
FileUtils.mkdir_p(local_workspace_path)
clean_workspace
files.each do |file|
if file.path.present?
local_directory_path = local_path(file.path)
FileUtils.mkdir_p(local_directory_path)
end
local_file_path = local_path(file.filepath)
if file.file_type.binary?
FileUtils.cp(file.native_file.path, local_file_path)
else
begin
File.open(local_file_path, 'w') {|f| f.write(file.content) }
rescue IOError => e
# TODO: try catch i/o exception and log failed attempts
# Does this fix the issue @Sebastian? What exceptions did you have in mind?
raise Runner::Error::Unknown.new("Could not create workspace file #{file.filepath}: #{e.inspect}")
end
end
end
FileUtils.chmod_R('+rwX', local_workspace_path)
end
def destroy_at_management
Faraday.get("#{self.class.config[:pool][:location]}/docker_container_pool/destroy_container/#{container.id}")
rescue Faraday::Error => e
raise Runner::Error::Unknown.new("Faraday request to DockerContainerPool failed: #{e.inspect}")
end
def attach_to_execution(command)
@command = command
starting_time = Time.zone.now
query_params = 'logs=0&stream=1&stderr=1&stdout=1&stdin=1'
websocket_url = "#{self.class.config[:ws_host]}/v1.27/containers/#{@container_id}/attach/ws?#{query_params}"
EventMachine.run do
socket = Connection.new(websocket_url, self)
EventMachine.add_timer(@execution_environment.permitted_execution_time) do
socket.status = :timeout
destroy_at_management
end
socket.send(command)
yield(socket)
end
Time.zone.now - starting_time # execution duration in seconds
end
private
def container
return @container if @container.present?
@container = Docker::Container.get(@container_id)
raise Runner::Error::RunnerNotFound unless @container.info['State']['Running']
@container
rescue Docker::Error::NotFoundError
raise Runner::Error::RunnerNotFound
end
def local_path(path)
unclean_path = local_workspace_path.join(path)
clean_path = File.expand_path(unclean_path)
unless clean_path.to_s.start_with? local_workspace_path.to_s
raise Runner::Error::Unknown.new("Local filepath #{clean_path.inspect} not allowed")
end
Pathname.new(clean_path)
end
def clean_workspace
FileUtils.rm_r(local_workspace_path.children, secure: true)
rescue Errno::ENOENT => e
raise Runner::Error::Unknown.new("The workspace directory does not exist and cannot be deleted: #{e.inspect}")
rescue Errno::EACCES => e
# TODO: Why was this rescued before @Sebastian?
raise Runner::Error::Unknown.new("Not allowed to clean workspace #{local_workspace_path}: #{e.inspect}")
end
def local_workspace_path
@local_workspace_path ||= Pathname.new(container.binds.first.split(':').first)
end
class Connection < Runner::Connection
def initialize(*args)
@stream = 'stdout'
super
end
def encode(data)
"#{data}\n"
end
def decode(raw_event)
case raw_event.data
when /@#{@strategy.container_id[0..11]}/
# Assume correct termination for now and return exit code 0
# TODO: Can we use the actual exit code here?
@exit_code = 0
@status = :terminated
@socket.close
when /#{format(@strategy.execution_environment.test_command, class_name: '.*', filename: '.*', module_name: '.*')}/
# TODO: Super dirty hack to redirect test output to stderr (remove attr_reader afterwards)
@stream = 'stderr'
when /#{@strategy.command}/
when /bash: cmd:canvasevent: command not found/
else
{'type' => @stream, 'data' => raw_event.data}
end
end
end
end

View File

@ -62,13 +62,24 @@ class Runner::Strategy::Poseidon < Runner::Strategy
raise Runner::Error::Unknown.new("Error parsing response from Poseidon: #{e.message}")
end
def initialize(runner_id, _environment)
super
@allocation_id = runner_id
end
def copy_files(files)
copy = files.map do |file|
{
path: file.filepath,
content: Base64.strict_encode64(file.content),
}
end
url = "#{runner_url}/files"
body = {copy: files.map {|filename, content| {path: filename, content: Base64.strict_encode64(content)} }}
body = {copy: copy}
response = Faraday.patch(url, body.to_json, HEADERS)
return if response.status == 204
Runner.destroy(@runner_id) if response.status == 400
Runner.destroy(@allocation_id) if response.status == 400
self.class.handle_error response
rescue Faraday::Error => e
raise Runner::Error::Unknown.new("Faraday request to runner management failed: #{e.inspect}")
@ -78,7 +89,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy
starting_time = Time.zone.now
websocket_url = execute_command(command)
EventMachine.run do
socket = Runner::Connection.new(websocket_url)
socket = Connection.new(websocket_url, self)
yield(socket)
end
Time.zone.now - starting_time # execution duration
@ -107,7 +118,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy
raise Runner::Error::Unknown.new('Poseidon did not send websocket url')
end
when 400
Runner.destroy(@runner_id)
Runner.destroy(@allocation_id)
end
self.class.handle_error response
@ -116,6 +127,18 @@ class Runner::Strategy::Poseidon < Runner::Strategy
end
def runner_url
"#{Runner::BASE_URL}/runners/#{@runner_id}"
"#{Runner::BASE_URL}/runners/#{@allocation_id}"
end
class Connection < Runner::Connection
def decode(raw_event)
JSON.parse(raw_event.data)
rescue JSON::ParserError => e
raise Runner::Error::Unknown.new("The websocket message from Poseidon could not be decoded to JSON: #{e.inspect}")
end
def encode(data)
data
end
end
end