diff --git a/app/controllers/submissions_controller.rb b/app/controllers/submissions_controller.rb index 34ad6f53..fe281b29 100644 --- a/app/controllers/submissions_controller.rb +++ b/app/controllers/submissions_controller.rb @@ -152,10 +152,7 @@ class SubmissionsController < ApplicationController socket.on :exit do |exit_code| EventMachine.stop_event_loop status = runner.status - if status == :timeouted - tubesock.send_data JSON.dump({cmd: :timeout}) - @output = "timeout: #{@output}" - elsif @output.empty? + if @output.empty? tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"}) end tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"}) unless status == :timeouted @@ -168,7 +165,6 @@ class SubmissionsController < ApplicationController when :client_kill EventMachine.stop_event_loop kill_socket(tubesock) - runner.destroy Rails.logger.debug('Client exited container.') when :result socket.send event[:data] @@ -189,11 +185,17 @@ class SubmissionsController < ApplicationController if @embed_options[:disable_run] kill_socket(tubesock) else - @container_execution_time = @submission.run(sanitize_filename) do |runner, socket| - @waiting_for_container_time = runner.waiting_time - handle_websockets(tubesock, runner, socket) + begin + @container_execution_time = @submission.run(sanitize_filename) do |runner, socket| + @waiting_for_container_time = runner.waiting_time + handle_websockets(tubesock, runner, socket) + end + save_run_output + rescue RunnerNotAvailableError + tubesock.send_data JSON.dump({cmd: :timeout}) + kill_socket(tubesock) + Rails.logger.debug('Runner not available') end - save_run_output end end end diff --git a/app/errors/application_error.rb b/app/errors/application_error.rb new file mode 100644 index 00000000..4adca849 --- /dev/null +++ b/app/errors/application_error.rb @@ -0,0 +1,2 @@ +class ApplicationError < StandardError +end \ No newline at end of file diff --git a/app/errors/runner_not_available_error.rb b/app/errors/runner_not_available_error.rb new file mode 100644 index 00000000..ac595539 --- /dev/null +++ b/app/errors/runner_not_available_error.rb @@ -0,0 +1,2 @@ +class RunnerNotAvailableError < ApplicationError +end \ No newline at end of file diff --git a/app/jobs/application_job.rb b/app/jobs/application_job.rb new file mode 100644 index 00000000..06752606 --- /dev/null +++ b/app/jobs/application_job.rb @@ -0,0 +1,3 @@ +class ApplicationJob < ActiveJob::Base + queue_as :default +end \ No newline at end of file diff --git a/app/jobs/runner_cleanup_job.rb b/app/jobs/runner_cleanup_job.rb new file mode 100644 index 00000000..4a5c172f --- /dev/null +++ b/app/jobs/runner_cleanup_job.rb @@ -0,0 +1,16 @@ +class RunnerCleanupJob < ApplicationJob + CLEANUP_INTERVAL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:cleanup_interval].seconds + + after_perform do |job| + # re-schedule job + self.class.set(wait: CLEANUP_INTERVAL).perform_later + end + + def perform + Rails.logger.debug(Time.now) + Runner.inactive_runners.each do |runner| + Rails.logger.debug("Destroying runner #{runner.runner_id}, unused since #{runner.last_used}") + runner.destroy + end + end +end diff --git a/app/models/runner.rb b/app/models/runner.rb new file mode 100644 index 00000000..22e8bd85 --- /dev/null +++ b/app/models/runner.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require 'runner/runner_connection' + +class Runner < ApplicationRecord + BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url] + HEADERS = {'Content-Type' => 'application/json'}.freeze + UNUSED_EXPIRATION_TIME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:unused_runner_expiration_time].seconds + + belongs_to :execution_environment + belongs_to :user, polymorphic: true + + before_create :get_runner + before_destroy :destroy_runner + + validates :execution_environment_id, presence: true + validates :user, presence: true + validates :time_limit, presence: true + + scope :inactive_runners, -> { where('last_used < ?', Time.now - UNUSED_EXPIRATION_TIME) } + + def self.for(user, exercise, time_limit = 0) + execution_environment = ExecutionEnvironment.find(exercise.execution_environment_id) + runner = Runner.find_or_create_by(user: user, execution_environment: execution_environment, time_limit: time_limit) + + return runner if runner.save + raise(RunnerNotAvailableError, 'No runner available') + end + + def copy_files(files) + url = "#{runner_url}/files" + body = {files: files.map { |filename, content| {filepath: filename, content: content} }} + response = Faraday.patch(url, body.to_json, HEADERS) + if response.status == 404 + # runner has disappeared for some reason + self.destroy + raise(RunnerNotAvailableError, "Runner unavailable") + end + end + + def execute_command(command) + url = "#{runner_url}/execute" + response = Faraday.post(url, {command: command}.to_json, HEADERS) + if response.status == 404 + # runner has disappeared for some reason + self.destroy + raise(RunnerNotAvailableError, "Runner unavailable") + end + used_now + parse response + end + + def execute_interactively(command) + starting_time = Time.zone.now + websocket_url = execute_command(command)[:websocketUrl] + EventMachine.run do + socket = RunnerConnection.new(websocket_url) + yield(self, socket) if block_given? + end + Time.zone.now - starting_time # execution time + end + + def destroy_runner + Faraday.delete runner_url + end + + def status + # TODO: return actual state retrieved via websocket + :timeouted + end + + private + + def get_runner + url = "#{BASE_URL}/runners" + body = {executionEnvironmentId: execution_environment.id} + body[:timeLimit] = time_limit + response = Faraday.post(url, body.to_json, HEADERS) + response_body = parse response + self.runner_id = response_body[:runnerId] + throw :abort unless response.status == 200 + end + + def runner_url + "#{BASE_URL}/runners/#{runner_id}" + end + + def parse(response) + JSON.parse(response.body).deep_symbolize_keys + end + + def used_now + self.last_used = Time.now + save + end +end diff --git a/app/models/submission.rb b/app/models/submission.rb index 90addbf8..7d14e87d 100644 --- a/app/models/submission.rb +++ b/app/models/submission.rb @@ -195,11 +195,10 @@ class Submission < ApplicationRecord def prepared_runner request_time = Time.now - runner = Runner.new(execution_environment, execution_environment.permitted_execution_time) + runner = Runner.for(user, exercise, execution_environment.permitted_execution_time) copy_files_to runner runner.waiting_time = Time.now - request_time yield(runner) if block_given? - runner.destroy end def command_for(template, file) diff --git a/config/code_ocean.yml.example b/config/code_ocean.yml.example index eb152f8f..b2413754 100644 --- a/config/code_ocean.yml.example +++ b/config/code_ocean.yml.example @@ -8,6 +8,10 @@ default: &default enabled: false codeocean_events: enabled: false + runner_management: + url: https://runners.example.org + cleanup_interval: 60 + unused_runner_expiration_time: 180 development: flowr: @@ -21,8 +25,6 @@ development: url: https://codeharbor.openhpi.de prometheus_exporter: enabled: false - runner_management: - url: https://runners.example.org production: <<: *default diff --git a/config/initializers/jobs.rb b/config/initializers/jobs.rb new file mode 100644 index 00000000..40fe6ab9 --- /dev/null +++ b/config/initializers/jobs.rb @@ -0,0 +1 @@ +RunnerCleanupJob.perform_now unless Rake.application.top_level_tasks.to_s.include?('db:') diff --git a/db/migrate/20210415064948_create_runners.rb b/db/migrate/20210415064948_create_runners.rb new file mode 100644 index 00000000..3ebebb63 --- /dev/null +++ b/db/migrate/20210415064948_create_runners.rb @@ -0,0 +1,14 @@ +class CreateRunners < ActiveRecord::Migration[5.2] + def change + create_table :runners do |t| + t.string :runner_id + t.references :execution_environment + t.references :user, polymorphic: true + t.integer :time_limit + t.float :waiting_time + t.datetime :last_used + + t.timestamps + end + end +end diff --git a/db/schema.rb b/db/schema.rb index d6e6d60e..b899981c 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -339,6 +339,20 @@ ActiveRecord::Schema.define(version: 2021_05_12_133612) do t.index ["user_id", "user_type", "created_at"], name: "index_rfc_on_user_and_created_at", order: { created_at: :desc } end + create_table "runners", force: :cascade do |t| + t.string "runner_id" + t.bigint "execution_environment_id" + t.string "user_type" + t.bigint "user_id" + t.integer "time_limit" + t.float "waiting_time" + t.datetime "last_used" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["execution_environment_id"], name: "index_runners_on_execution_environment_id" + t.index ["user_type", "user_id"], name: "index_runners_on_user_type_and_user_id" + end + create_table "searches", id: :serial, force: :cascade do |t| t.integer "exercise_id", null: false t.integer "user_id", null: false diff --git a/lib/runner/runner.rb b/lib/runner/runner.rb deleted file mode 100644 index e304a7df..00000000 --- a/lib/runner/runner.rb +++ /dev/null @@ -1,58 +0,0 @@ -# frozen_string_literal: true - -class Runner - BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url] - HEADERS = {'Content-Type' => 'application/json'}.freeze - - attr_accessor :waiting_time - - def initialize(execution_environment, time_limit = nil) - url = "#{BASE_URL}/runners" - body = {executionEnvironmentId: execution_environment.id} - body[:timeLimit] = time_limit if time_limit - response = Faraday.post(url, body.to_json, HEADERS) - response = parse response - @id = response[:runnerId] - end - - def copy_files(files) - url = "#{runner_url}/files" - body = {files: files.map { |filename, content| {filepath: filename, content: content} }} - Faraday.patch(url, body.to_json, HEADERS) - end - - def execute_command(command) - url = "#{runner_url}/execute" - response = Faraday.post(url, {command: command}.to_json, HEADERS) - parse response - end - - def execute_interactively(command) - starting_time = Time.zone.now - websocket_url = execute_command(command)[:websocketUrl] - EventMachine.run do - socket = RunnerConnection.new(websocket_url) - yield(self, socket) if block_given? - end - Time.zone.now - starting_time # execution time - end - - def destroy - Faraday.delete runner_url - end - - def status - # TODO: return actual state retrieved via websocket - :timeouted - end - - private - - def runner_url - "#{BASE_URL}/runners/#{@id}" - end - - def parse(response) - JSON.parse(response.body).deep_symbolize_keys - end -end