diff --git a/app/controllers/submissions_controller.rb b/app/controllers/submissions_controller.rb index 4abe2632..b01cf970 100644 --- a/app/controllers/submissions_controller.rb +++ b/app/controllers/submissions_controller.rb @@ -137,7 +137,7 @@ class SubmissionsController < ApplicationController @output = +'' socket.on :output do |data| - Rails.logger.info("#{Time.zone.now.getutc}: Container sending: #{data}") + Rails.logger.info("#{Time.zone.now.getutc}: Container sending: #{data.inspect}") @output << data if @output.size + data.size <= max_output_buffer_size end @@ -150,10 +150,6 @@ class SubmissionsController < ApplicationController end socket.on :exit do |exit_code| - # As this is sometimes called before the timeout is handled, we must not close the - # socket to the user here. The socket will be closed after handling the timeout. - next if exit_code == Runner::Connection::TIMEOUT_EXIT_STATUS - EventMachine.stop_event_loop if @output.empty? tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"}) diff --git a/app/models/runner.rb b/app/models/runner.rb index 0293f181..5fd0628b 100644 --- a/app/models/runner.rb +++ b/app/models/runner.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require 'forwardable' - class Runner < ApplicationRecord belongs_to :execution_environment belongs_to :user, polymorphic: true @@ -13,7 +11,6 @@ class Runner < ApplicationRecord STRATEGY_NAME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:strategy] UNUSED_EXPIRATION_TIME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:unused_runner_expiration_time].seconds BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url] - DELEGATED_STRATEGY_METHODS = %i[destroy_at_management attach_to_execution copy_files].freeze attr_accessor :strategy @@ -35,14 +32,20 @@ class Runner < ApplicationRecord runner end - DELEGATED_STRATEGY_METHODS.each do |method| - define_method(method) do |*args, &block| - @strategy.send(method, *args, &block) - rescue Runner::Error::RunnerNotFound - request_new_id - save - @strategy.send(method, *args, &block) - end + def copy_files(files) + @strategy.copy_files(files) + rescue Runner::Error::RunnerNotFound + request_new_id + save + @strategy.copy_files(files) + end + + def attach_to_execution(command, &block) + @strategy.attach_to_execution(command, &block) + end + + def destroy_at_management + @strategy.destroy_at_management end private @@ -53,17 +56,19 @@ class Runner < ApplicationRecord def request_new_id strategy_class = self.class.strategy_class - self.runner_id = strategy_class.request_from_management(execution_environment) - @strategy = strategy_class.new(runner_id, execution_environment) - rescue Runner::Error::EnvironmentNotFound - if strategy_class.sync_environment(execution_environment) - raise Runner::Error::EnvironmentNotFound.new( - "The execution environment with id #{execution_environment.id} was not found and was successfully synced with the runner management" - ) - else - raise Runner::Error::EnvironmentNotFound.new( - "The execution environment with id #{execution_environment.id} was not found and could not be synced with the runner management" - ) + begin + self.runner_id = strategy_class.request_from_management(execution_environment) + @strategy = strategy_class.new(runner_id, execution_environment) + rescue Runner::Error::EnvironmentNotFound + if strategy_class.sync_environment(execution_environment) + raise Runner::Error::EnvironmentNotFound.new( + "The execution environment with id #{execution_environment.id} was not found and was successfully synced with the runner management" + ) + else + raise Runner::Error::EnvironmentNotFound.new( + "The execution environment with id #{execution_environment.id} was not found and could not be synced with the runner management" + ) + end end end end diff --git a/app/models/submission.rb b/app/models/submission.rb index 4bd2653b..11857df6 100644 --- a/app/models/submission.rb +++ b/app/models/submission.rb @@ -184,18 +184,10 @@ class Submission < ApplicationRecord private - def copy_files_to(runner) - files = {} - collect_files.each do |file| - files[file.name_with_extension] = file.content - end - runner.copy_files(files) - end - def prepared_runner request_time = Time.zone.now runner = Runner.for(user, exercise) - copy_files_to runner + runner.copy_files(collect_files) waiting_duration = Time.zone.now - request_time yield(runner, waiting_duration) end @@ -270,7 +262,7 @@ class Submission < ApplicationRecord update(score: score) if normalized_score.to_d == 1.0.to_d Thread.new do - RequestForComment.find_each(exercise_id: exercise_id, user_id: user_id, user_type: user_type) do |rfc| + RequestForComment.where(exercise_id: exercise_id, user_id: user_id, user_type: user_type).find_each do |rfc| rfc.full_score_reached = true rfc.save end diff --git a/lib/docker_container_pool.rb b/lib/docker_container_pool.rb index 85ea7381..a290c82c 100644 --- a/lib/docker_container_pool.rb +++ b/lib/docker_container_pool.rb @@ -3,6 +3,11 @@ require 'concurrent/future' require 'concurrent/timer_task' +# get_container, destroy_container was moved to lib/runner/strategy/docker_container_pool.rb. +# return_container is not used anymore because runners are not shared between users anymore. +# create_container is done by the DockerContainerPool. +# dump_info and quantities are still in use. + class DockerContainerPool def self.config # TODO: Why erb? @@ -22,6 +27,7 @@ class DockerContainerPool nil end + # not in use because DockerClient::RECYCLE_CONTAINERS == false def self.return_container(container, execution_environment) Faraday.get("#{config[:location]}/docker_container_pool/return_container/#{container.id}") rescue StandardError => e diff --git a/lib/runner/connection.rb b/lib/runner/connection.rb index 8dd8f348..4bf725af 100644 --- a/lib/runner/connection.rb +++ b/lib/runner/connection.rb @@ -7,10 +7,13 @@ class Runner::Connection # These are events for which callbacks can be registered. EVENTS = %i[start output exit stdout stderr].freeze BACKEND_OUTPUT_SCHEMA = JSONSchemer.schema(JSON.parse(File.read('lib/runner/backend-output.schema.json'))) - TIMEOUT_EXIT_STATUS = -100 - def initialize(url) + attr_writer :status + + def initialize(url, strategy) @socket = Faye::WebSocket::Client.new(url, [], ping: 5) + @strategy = strategy + @status = :established # For every event type of faye websockets, the corresponding # RunnerConnection method starting with `on_` is called. @@ -37,18 +40,19 @@ class Runner::Connection private - def decode(event) - JSON.parse(event).deep_symbolize_keys + def decode(_raw_event) + raise NotImplementedError end - def encode(data) - data + def encode(_data) + raise NotImplementedError end - def on_message(event) - return unless BACKEND_OUTPUT_SCHEMA.valid?(JSON.parse(event.data)) + def on_message(raw_event) + event = decode(raw_event) + return unless BACKEND_OUTPUT_SCHEMA.valid?(event) - event = decode(event.data) + event = event.deep_symbolize_keys # There is one `handle_` method for every message type defined in the WebSocket schema. __send__("handle_#{event[:type]}", event) end @@ -60,10 +64,20 @@ class Runner::Connection def on_error(_event); end def on_close(_event) - @exit_callback.call @exit_code + case @status + when :timeout + raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit') + when :terminated + @exit_callback.call @exit_code + else # :established + # If the runner is killed by the DockerContainerPool after the maximum allowed time per user and + # while the owning user is running an execution, the command execution stops and log output is incomplete. + raise Runner::Error::Unknown.new('Execution terminated with an unknown reason') + end end def handle_exit(event) + @status = :terminated @exit_code = event[:data] end @@ -82,7 +96,6 @@ class Runner::Connection def handle_start(_event); end def handle_timeout(_event) - @exit_code = TIMEOUT_EXIT_STATUS - raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit') + @status = :timeout end end diff --git a/lib/runner/strategy.rb b/lib/runner/strategy.rb index dce76adf..0007caab 100644 --- a/lib/runner/strategy.rb +++ b/lib/runner/strategy.rb @@ -1,8 +1,7 @@ # frozen_string_literal: true class Runner::Strategy - def initialize(runner_id, environment) - @runner_id = runner_id + def initialize(_runner_id, environment) @execution_environment = environment end diff --git a/lib/runner/strategy/docker.rb b/lib/runner/strategy/docker.rb deleted file mode 100644 index fd7cfafc..00000000 --- a/lib/runner/strategy/docker.rb +++ /dev/null @@ -1,3 +0,0 @@ -# frozen_string_literal: true - -class Runner::Strategy::Docker < Runner::Strategy; end diff --git a/lib/runner/strategy/docker_container_pool.rb b/lib/runner/strategy/docker_container_pool.rb new file mode 100644 index 00000000..7003f901 --- /dev/null +++ b/lib/runner/strategy/docker_container_pool.rb @@ -0,0 +1,138 @@ +# frozen_string_literal: true + +class Runner::Strategy::DockerContainerPool < Runner::Strategy + attr_reader :container_id, :command, :execution_environment + + def self.config + # Since the docker configuration file contains code that must be executed, we use ERB templating. + @config ||= CodeOcean::Config.new(:docker).read(erb: true) + end + + def self.request_from_management(environment) + container_id = JSON.parse(Faraday.get("#{config[:pool][:location]}/docker_container_pool/get_container/#{environment.id}").body)['id'] + container_id.presence || raise(Runner::Error::NotAvailable.new("DockerContainerPool didn't return a container id")) + rescue Faraday::Error => e + raise Runner::Error::Unknown.new("Faraday request to DockerContainerPool failed: #{e.inspect}") + rescue JSON::ParserError => e + raise Runner::Error::Unknown.new("DockerContainerPool returned invalid JSON: #{e.inspect}") + end + + def initialize(runner_id, _environment) + super + @container_id = runner_id + end + + def copy_files(files) + FileUtils.mkdir_p(local_workspace_path) + clean_workspace + files.each do |file| + if file.path.present? + local_directory_path = local_path(file.path) + FileUtils.mkdir_p(local_directory_path) + end + + local_file_path = local_path(file.filepath) + if file.file_type.binary? + FileUtils.cp(file.native_file.path, local_file_path) + else + begin + File.open(local_file_path, 'w') {|f| f.write(file.content) } + rescue IOError => e + # TODO: try catch i/o exception and log failed attempts + # Does this fix the issue @Sebastian? What exceptions did you have in mind? + raise Runner::Error::Unknown.new("Could not create workspace file #{file.filepath}: #{e.inspect}") + end + end + end + FileUtils.chmod_R('+rwX', local_workspace_path) + end + + def destroy_at_management + Faraday.get("#{self.class.config[:pool][:location]}/docker_container_pool/destroy_container/#{container.id}") + rescue Faraday::Error => e + raise Runner::Error::Unknown.new("Faraday request to DockerContainerPool failed: #{e.inspect}") + end + + def attach_to_execution(command) + @command = command + starting_time = Time.zone.now + query_params = 'logs=0&stream=1&stderr=1&stdout=1&stdin=1' + websocket_url = "#{self.class.config[:ws_host]}/v1.27/containers/#{@container_id}/attach/ws?#{query_params}" + + EventMachine.run do + socket = Connection.new(websocket_url, self) + EventMachine.add_timer(@execution_environment.permitted_execution_time) do + socket.status = :timeout + destroy_at_management + end + socket.send(command) + yield(socket) + end + Time.zone.now - starting_time # execution duration in seconds + end + + private + + def container + return @container if @container.present? + + @container = Docker::Container.get(@container_id) + raise Runner::Error::RunnerNotFound unless @container.info['State']['Running'] + + @container + rescue Docker::Error::NotFoundError + raise Runner::Error::RunnerNotFound + end + + def local_path(path) + unclean_path = local_workspace_path.join(path) + clean_path = File.expand_path(unclean_path) + unless clean_path.to_s.start_with? local_workspace_path.to_s + raise Runner::Error::Unknown.new("Local filepath #{clean_path.inspect} not allowed") + end + + Pathname.new(clean_path) + end + + def clean_workspace + FileUtils.rm_r(local_workspace_path.children, secure: true) + rescue Errno::ENOENT => e + raise Runner::Error::Unknown.new("The workspace directory does not exist and cannot be deleted: #{e.inspect}") + rescue Errno::EACCES => e + # TODO: Why was this rescued before @Sebastian? + raise Runner::Error::Unknown.new("Not allowed to clean workspace #{local_workspace_path}: #{e.inspect}") + end + + def local_workspace_path + @local_workspace_path ||= Pathname.new(container.binds.first.split(':').first) + end + + class Connection < Runner::Connection + def initialize(*args) + @stream = 'stdout' + super + end + + def encode(data) + "#{data}\n" + end + + def decode(raw_event) + case raw_event.data + when /@#{@strategy.container_id[0..11]}/ + # Assume correct termination for now and return exit code 0 + # TODO: Can we use the actual exit code here? + @exit_code = 0 + @status = :terminated + @socket.close + when /#{format(@strategy.execution_environment.test_command, class_name: '.*', filename: '.*', module_name: '.*')}/ + # TODO: Super dirty hack to redirect test output to stderr (remove attr_reader afterwards) + @stream = 'stderr' + when /#{@strategy.command}/ + when /bash: cmd:canvasevent: command not found/ + else + {'type' => @stream, 'data' => raw_event.data} + end + end + end +end diff --git a/lib/runner/strategy/poseidon.rb b/lib/runner/strategy/poseidon.rb index 15c19da0..1938c229 100644 --- a/lib/runner/strategy/poseidon.rb +++ b/lib/runner/strategy/poseidon.rb @@ -62,13 +62,24 @@ class Runner::Strategy::Poseidon < Runner::Strategy raise Runner::Error::Unknown.new("Error parsing response from Poseidon: #{e.message}") end + def initialize(runner_id, _environment) + super + @allocation_id = runner_id + end + def copy_files(files) + copy = files.map do |file| + { + path: file.filepath, + content: Base64.strict_encode64(file.content), + } + end url = "#{runner_url}/files" - body = {copy: files.map {|filename, content| {path: filename, content: Base64.strict_encode64(content)} }} + body = {copy: copy} response = Faraday.patch(url, body.to_json, HEADERS) return if response.status == 204 - Runner.destroy(@runner_id) if response.status == 400 + Runner.destroy(@allocation_id) if response.status == 400 self.class.handle_error response rescue Faraday::Error => e raise Runner::Error::Unknown.new("Faraday request to runner management failed: #{e.inspect}") @@ -78,7 +89,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy starting_time = Time.zone.now websocket_url = execute_command(command) EventMachine.run do - socket = Runner::Connection.new(websocket_url) + socket = Connection.new(websocket_url, self) yield(socket) end Time.zone.now - starting_time # execution duration @@ -107,7 +118,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy raise Runner::Error::Unknown.new('Poseidon did not send websocket url') end when 400 - Runner.destroy(@runner_id) + Runner.destroy(@allocation_id) end self.class.handle_error response @@ -116,6 +127,18 @@ class Runner::Strategy::Poseidon < Runner::Strategy end def runner_url - "#{Runner::BASE_URL}/runners/#{@runner_id}" + "#{Runner::BASE_URL}/runners/#{@allocation_id}" + end + + class Connection < Runner::Connection + def decode(raw_event) + JSON.parse(raw_event.data) + rescue JSON::ParserError => e + raise Runner::Error::Unknown.new("The websocket message from Poseidon could not be decoded to JSON: #{e.inspect}") + end + + def encode(data) + data + end end end diff --git a/spec/lib/runner/strategy/docker_spec.rb b/spec/lib/runner/strategy/docker_container_pool_spec.rb similarity index 62% rename from spec/lib/runner/strategy/docker_spec.rb rename to spec/lib/runner/strategy/docker_container_pool_spec.rb index 4110ebf9..67c9b20f 100644 --- a/spec/lib/runner/strategy/docker_spec.rb +++ b/spec/lib/runner/strategy/docker_container_pool_spec.rb @@ -2,14 +2,14 @@ require 'rails_helper' -describe Runner::Strategy::Docker do +describe Runner::Strategy::DockerContainerPool do let(:runner_id) { FactoryBot.attributes_for(:runner)[:runner_id] } let(:execution_environment) { FactoryBot.create :ruby } - let(:docker) { described_class.new(runner_id, execution_environment) } + let(:container_pool) { described_class.new(runner_id, execution_environment) } # TODO: add tests for these methods when implemented it 'defines all methods all runner management strategies must define' do - expect(docker.public_methods).to include(*Runner::DELEGATED_STRATEGY_METHODS) + expect(container_pool.public_methods).to include(:destroy_at_management, :copy_files, :attach_to_execution) expect(described_class.public_methods).to include(:request_from_management) end end diff --git a/spec/lib/runner/strategy/poseidon_spec.rb b/spec/lib/runner/strategy/poseidon_spec.rb index d3acf5f9..62bbd638 100644 --- a/spec/lib/runner/strategy/poseidon_spec.rb +++ b/spec/lib/runner/strategy/poseidon_spec.rb @@ -256,15 +256,15 @@ describe Runner::Strategy::Poseidon do end describe '#copy_files' do - let(:filename) { 'main.py' } let(:file_content) { 'print("Hello World!")' } - let(:action) { -> { poseidon.copy_files({filename => file_content}) } } - let(:encoded_file_content) { Base64.strict_encode64(file_content) } + let(:file) { FactoryBot.build(:file, content: file_content) } + let(:action) { -> { poseidon.copy_files([file]) } } + let(:encoded_file_content) { Base64.strict_encode64(file.content) } let!(:copy_files_stub) do WebMock .stub_request(:patch, "#{Runner::BASE_URL}/runners/#{runner_id}/files") .with( - body: {copy: [{path: filename, content: encoded_file_content}]}, + body: {copy: [{path: file.filepath, content: encoded_file_content}]}, headers: {'Content-Type' => 'application/json'} ) .to_return(body: response_body, status: response_status) diff --git a/spec/models/runner_spec.rb b/spec/models/runner_spec.rb index e8f57966..b9395645 100644 --- a/spec/models/runner_spec.rb +++ b/spec/models/runner_spec.rb @@ -35,10 +35,16 @@ describe Runner do end end - {poseidon: Runner::Strategy::Poseidon, docker: Runner::Strategy::Docker}.each do |strategy, strategy_class| + available_strategies = { + poseidon: Runner::Strategy::Poseidon, + docker_container_pool: Runner::Strategy::DockerContainerPool, + } + available_strategies.each do |strategy, strategy_class| include_examples 'uses the strategy defined in the constant', strategy, strategy_class end + end + describe 'method delegation' do shared_examples 'delegates method sends to its strategy' do |method, *args| context "when sending #{method}" do let(:strategy) { instance_double(strategy_class) } @@ -49,7 +55,7 @@ describe Runner do allow(strategy_class).to receive(:new).and_return(strategy) end - it "delegates the method #{method}" do + it 'delegates to its strategy' do expect(strategy).to receive(method) runner.send(method, *args) end @@ -57,10 +63,48 @@ describe Runner do end include_examples 'delegates method sends to its strategy', :destroy_at_management - include_examples 'delegates method sends to its strategy', :copy_files, nil include_examples 'delegates method sends to its strategy', :attach_to_execution, nil end + describe '#copy_files' do + let(:strategy) { instance_double(strategy_class) } + let(:runner) { described_class.create } + + before do + allow(strategy_class).to receive(:request_from_management).and_return(runner_id) + allow(strategy_class).to receive(:new).and_return(strategy) + end + + context 'when no error is raised' do + it 'delegates to its strategy' do + expect(strategy).to receive(:copy_files).once + runner.copy_files(nil) + end + end + + context 'when a RunnerNotFound exception is raised' do + before do + was_called = false + allow(strategy).to receive(:copy_files) do + unless was_called + was_called = true + raise Runner::Error::RunnerNotFound.new + end + end + end + + it 'requests a new id' do + expect(runner).to receive(:request_new_id) + runner.copy_files(nil) + end + + it 'retries to copy the files' do + expect(strategy).to receive(:copy_files).twice + runner.copy_files(nil) + end + end + end + describe 'creation' do let(:user) { FactoryBot.create :external_user } let(:execution_environment) { FactoryBot.create :ruby }