implemented pooling for Docker containers
This commit is contained in:
@ -1,3 +1,5 @@
|
||||
require 'concurrent'
|
||||
|
||||
class DockerClient
|
||||
CONTAINER_WORKSPACE_PATH = '/workspace'
|
||||
LOCAL_WORKSPACE_ROOT = Rails.root.join('tmp', 'files', Rails.env)
|
||||
@ -5,23 +7,12 @@ class DockerClient
|
||||
attr_reader :assigned_ports
|
||||
attr_reader :container_id
|
||||
|
||||
def bound_folders
|
||||
@submission ? ["#{remote_workspace_path}:#{CONTAINER_WORKSPACE_PATH}"] : []
|
||||
end
|
||||
private :bound_folders
|
||||
|
||||
def self.check_availability!
|
||||
initialize_environment
|
||||
Timeout::timeout(config[:connection_timeout]) { Docker.version }
|
||||
rescue Excon::Errors::SocketError, Timeout::Error
|
||||
raise Error.new("The Docker host at #{Docker.url} is not reachable!")
|
||||
end
|
||||
|
||||
def clean_workspace
|
||||
FileUtils.rm_rf(local_workspace_path)
|
||||
end
|
||||
private :clean_workspace
|
||||
|
||||
def command_substitutions(filename)
|
||||
{class_name: File.basename(filename, File.extname(filename)).camelize, filename: filename}
|
||||
end
|
||||
@ -32,28 +23,29 @@ class DockerClient
|
||||
end
|
||||
|
||||
def copy_file_to_workspace(options = {})
|
||||
FileUtils.cp(options[:file].native_file.path, File.join(local_workspace_path, options[:file].path || '', options[:file].name_with_extension))
|
||||
FileUtils.cp(options[:file].native_file.path, File.join(self.class.local_workspace_path(options[:container]), options[:file].path || '', options[:file].name_with_extension))
|
||||
end
|
||||
|
||||
def create_container(options = {})
|
||||
Docker::Container.create('Cmd' => options[:command], 'Image' => @image.info['RepoTags'].first)
|
||||
def self.create_container(execution_environment)
|
||||
container = Docker::Container.create('Image' => find_image_by_tag(execution_environment.docker_image).info['RepoTags'].first, 'OpenStdin' => true, 'StdinOnce' => true)
|
||||
container.start('Binds' => mapped_directories, 'PortBindings' => mapped_ports(execution_environment))
|
||||
container
|
||||
end
|
||||
private :create_container
|
||||
|
||||
def create_workspace
|
||||
def create_workspace(container)
|
||||
@submission.collect_files.each do |file|
|
||||
FileUtils.mkdir_p(File.join(local_workspace_path, file.path || ''))
|
||||
FileUtils.mkdir_p(File.join(self.class.local_workspace_path(container), file.path || ''))
|
||||
if file.file_type.binary?
|
||||
copy_file_to_workspace(file: file)
|
||||
copy_file_to_workspace(container: container, file: file)
|
||||
else
|
||||
create_workspace_file(file: file)
|
||||
create_workspace_file(container: container, file: file)
|
||||
end
|
||||
end
|
||||
end
|
||||
private :create_workspace
|
||||
|
||||
def create_workspace_file(options = {})
|
||||
file = File.new(File.join(local_workspace_path, options[:file].path || '', options[:file].name_with_extension), 'w')
|
||||
file = File.new(File.join(self.class.local_workspace_path(options[:container]), options[:file].path || '', options[:file].name_with_extension), 'w')
|
||||
file.write(options[:file].content)
|
||||
file.close
|
||||
end
|
||||
@ -65,51 +57,43 @@ class DockerClient
|
||||
port = configuration.first['HostPort'].to_i
|
||||
PortPool.release(port)
|
||||
end
|
||||
FileUtils.rm_rf(local_workspace_path(container))
|
||||
container.delete(force: true)
|
||||
end
|
||||
|
||||
def execute_command(command, &block)
|
||||
container = create_container(command: ['bash', '-c', command])
|
||||
def execute_arbitrary_command(command, &block)
|
||||
container = DockerContainerPool.get_container(@execution_environment)
|
||||
@container_id = container.id
|
||||
start_container(container, &block)
|
||||
send_command(command, container, &block)
|
||||
end
|
||||
|
||||
def execute_in_workspace(submission, &block)
|
||||
@submission = submission
|
||||
create_workspace
|
||||
block.call
|
||||
ensure
|
||||
clean_workspace if @submission
|
||||
end
|
||||
private :execute_in_workspace
|
||||
|
||||
def execute_run_command(submission, filename, &block)
|
||||
execute_in_workspace(submission) do
|
||||
execute_command(@execution_environment.run_command % command_substitutions(filename), &block)
|
||||
[:run, :test].each do |cause|
|
||||
define_method("execute_#{cause}_command") do |submission, filename, &block|
|
||||
container = DockerContainerPool.get_container(submission.execution_environment)
|
||||
@container_id = container.id
|
||||
@submission = submission
|
||||
create_workspace(container)
|
||||
command = submission.execution_environment.send(:"#{cause}_command") % command_substitutions(filename)
|
||||
send_command(command, container, &block)
|
||||
end
|
||||
end
|
||||
|
||||
def execute_test_command(submission, filename)
|
||||
execute_in_workspace(submission) do
|
||||
execute_command(@execution_environment.test_command % command_substitutions(filename))
|
||||
end
|
||||
end
|
||||
|
||||
def find_image_by_tag(tag)
|
||||
def self.find_image_by_tag(tag)
|
||||
Docker::Image.all.detect { |image| image.info['RepoTags'].flatten.include?(tag) }
|
||||
end
|
||||
private :find_image_by_tag
|
||||
|
||||
def self.generate_remote_workspace_path
|
||||
File.join(config[:workspace_root], SecureRandom.uuid)
|
||||
end
|
||||
|
||||
def self.image_tags
|
||||
check_availability!
|
||||
Docker::Image.all.map { |image| image.info['RepoTags'] }.flatten.reject { |tag| tag.include?('<none>') }
|
||||
end
|
||||
|
||||
def initialize(options = {})
|
||||
self.class.check_availability!
|
||||
@execution_environment = options[:execution_environment]
|
||||
@user = options[:user]
|
||||
@image = find_image_by_tag(@execution_environment.docker_image)
|
||||
@image = self.class.find_image_by_tag(@execution_environment.docker_image)
|
||||
raise Error.new("Cannot find image #{@execution_environment.docker_image}!") unless @image
|
||||
end
|
||||
|
||||
@ -118,38 +102,33 @@ class DockerClient
|
||||
raise Error.new('Docker configuration missing!')
|
||||
end
|
||||
Docker.url = config[:host] if config[:host]
|
||||
check_availability!
|
||||
FileUtils.mkdir_p(LOCAL_WORKSPACE_ROOT)
|
||||
end
|
||||
|
||||
def local_workspace_path
|
||||
File.join(LOCAL_WORKSPACE_ROOT, @submission.id.to_s)
|
||||
def self.local_workspace_path(container)
|
||||
Pathname.new(container.binds.first.split(':').first.sub(config[:workspace_root], LOCAL_WORKSPACE_ROOT.to_s))
|
||||
end
|
||||
private :local_workspace_path
|
||||
|
||||
def mapped_ports
|
||||
@assigned_ports = []
|
||||
(@execution_environment.exposed_ports || '').gsub(/\s/, '').split(',').map do |port|
|
||||
@assigned_ports << PortPool.available_port
|
||||
["#{port}/tcp", [{'HostPort' => @assigned_ports.last.to_s}]]
|
||||
def self.mapped_directories
|
||||
["#{generate_remote_workspace_path}:#{CONTAINER_WORKSPACE_PATH}"]
|
||||
end
|
||||
|
||||
def self.mapped_ports(execution_environment)
|
||||
(execution_environment.exposed_ports || '').gsub(/\s/, '').split(',').map do |port|
|
||||
["#{port}/tcp", [{'HostPort' => PortPool.available_port.to_s}]]
|
||||
end.to_h
|
||||
end
|
||||
private :mapped_ports
|
||||
|
||||
def self.pull(docker_image)
|
||||
`docker pull #{docker_image}` if docker_image
|
||||
end
|
||||
|
||||
def remote_workspace_path
|
||||
File.join(self.class.config[:workspace_root], @submission.id.to_s)
|
||||
end
|
||||
private :remote_workspace_path
|
||||
|
||||
def start_container(container, &block)
|
||||
def send_command(command, container, &block)
|
||||
Timeout::timeout(@execution_environment.permitted_execution_time) do
|
||||
container.start('Binds' => bound_folders, 'PortBindings' => mapped_ports)
|
||||
container.wait(@execution_environment.permitted_execution_time)
|
||||
stderr = []
|
||||
stdout = []
|
||||
container.streaming_logs(stderr: true, stdout: true) do |stream, chunk|
|
||||
container.attach(stdin: StringIO.new(command)) do |stream, chunk|
|
||||
block.call(stream, chunk) if block_given?
|
||||
if stream == :stderr
|
||||
stderr.push(chunk)
|
||||
@ -159,15 +138,13 @@ class DockerClient
|
||||
end
|
||||
{status: :ok, stderr: stderr.join, stdout: stdout.join}
|
||||
end
|
||||
rescue Docker::Error::TimeoutError, Timeout::Error
|
||||
rescue Timeout::Error
|
||||
{status: :timeout}
|
||||
ensure
|
||||
self.class.destroy_container(container)
|
||||
Concurrent::Future.execute { self.class.destroy_container(container) }
|
||||
end
|
||||
private :start_container
|
||||
private :send_command
|
||||
end
|
||||
|
||||
class DockerClient::Error < RuntimeError
|
||||
end
|
||||
|
||||
DockerClient.initialize_environment
|
||||
|
52
lib/docker_container_pool.rb
Normal file
52
lib/docker_container_pool.rb
Normal file
@ -0,0 +1,52 @@
|
||||
require 'concurrent/future'
|
||||
require 'concurrent/timer_task'
|
||||
require 'concurrent/utilities'
|
||||
|
||||
class DockerContainerPool
|
||||
@containers = ThreadSafe::Hash[ExecutionEnvironment.all.map { |execution_environment| [execution_environment.id, ThreadSafe::Array.new] }]
|
||||
|
||||
def self.clean_up
|
||||
@refill_task.try(:shutdown)
|
||||
@containers.each do |key, value|
|
||||
while !value.empty? do
|
||||
DockerClient.destroy_container(value.shift)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.config
|
||||
@config ||= CodeOcean::Config.new(:docker).read(erb: true)[:pool]
|
||||
end
|
||||
|
||||
def self.create_container(execution_environment)
|
||||
DockerClient.create_container(execution_environment)
|
||||
end
|
||||
|
||||
def self.get_container(execution_environment)
|
||||
if config[:active]
|
||||
@containers[execution_environment.id].try(:shift) || create_container(execution_environment)
|
||||
else
|
||||
create_container(execution_environment)
|
||||
end
|
||||
end
|
||||
|
||||
def self.quantities
|
||||
@containers.map { |key, value| [key, value.length] }.to_h
|
||||
end
|
||||
|
||||
def self.refill
|
||||
ExecutionEnvironment.all.each do |execution_environment|
|
||||
refill_count = execution_environment.pool_size - @containers[execution_environment.id].length
|
||||
if refill_count > 0
|
||||
Concurrent::Future.execute do
|
||||
@containers[execution_environment.id] += refill_count.times.map { create_container(execution_environment) }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.start_refill_task
|
||||
@refill_task = Concurrent::TimerTask.new(execution_interval: config[:interval], run_now: true) { refill }
|
||||
@refill_task.execute
|
||||
end
|
||||
end
|
Reference in New Issue
Block a user