Reuse runners per user and execution environment
Co-authored-by: Jan-Eric Hellenberg <jan-eric.hellenberg@student.hpi.uni-potsdam.de> Co-authored-by: Maximilian Pass <maximilian.pass@student.hpi.uni-potsdam.de>
This commit is contained in:

committed by
Sebastian Serth

parent
3017e46006
commit
17bd2d8726
@ -152,10 +152,7 @@ class SubmissionsController < ApplicationController
|
|||||||
socket.on :exit do |exit_code|
|
socket.on :exit do |exit_code|
|
||||||
EventMachine.stop_event_loop
|
EventMachine.stop_event_loop
|
||||||
status = runner.status
|
status = runner.status
|
||||||
if status == :timeouted
|
if @output.empty?
|
||||||
tubesock.send_data JSON.dump({cmd: :timeout})
|
|
||||||
@output = "timeout: #{@output}"
|
|
||||||
elsif @output.empty?
|
|
||||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"})
|
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"})
|
||||||
end
|
end
|
||||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"}) unless status == :timeouted
|
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"}) unless status == :timeouted
|
||||||
@ -168,7 +165,6 @@ class SubmissionsController < ApplicationController
|
|||||||
when :client_kill
|
when :client_kill
|
||||||
EventMachine.stop_event_loop
|
EventMachine.stop_event_loop
|
||||||
kill_socket(tubesock)
|
kill_socket(tubesock)
|
||||||
runner.destroy
|
|
||||||
Rails.logger.debug('Client exited container.')
|
Rails.logger.debug('Client exited container.')
|
||||||
when :result
|
when :result
|
||||||
socket.send event[:data]
|
socket.send event[:data]
|
||||||
@ -189,11 +185,17 @@ class SubmissionsController < ApplicationController
|
|||||||
if @embed_options[:disable_run]
|
if @embed_options[:disable_run]
|
||||||
kill_socket(tubesock)
|
kill_socket(tubesock)
|
||||||
else
|
else
|
||||||
@container_execution_time = @submission.run(sanitize_filename) do |runner, socket|
|
begin
|
||||||
@waiting_for_container_time = runner.waiting_time
|
@container_execution_time = @submission.run(sanitize_filename) do |runner, socket|
|
||||||
handle_websockets(tubesock, runner, socket)
|
@waiting_for_container_time = runner.waiting_time
|
||||||
|
handle_websockets(tubesock, runner, socket)
|
||||||
|
end
|
||||||
|
save_run_output
|
||||||
|
rescue RunnerNotAvailableError
|
||||||
|
tubesock.send_data JSON.dump({cmd: :timeout})
|
||||||
|
kill_socket(tubesock)
|
||||||
|
Rails.logger.debug('Runner not available')
|
||||||
end
|
end
|
||||||
save_run_output
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
2
app/errors/application_error.rb
Normal file
2
app/errors/application_error.rb
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
class ApplicationError < StandardError
|
||||||
|
end
|
2
app/errors/runner_not_available_error.rb
Normal file
2
app/errors/runner_not_available_error.rb
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
class RunnerNotAvailableError < ApplicationError
|
||||||
|
end
|
3
app/jobs/application_job.rb
Normal file
3
app/jobs/application_job.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
class ApplicationJob < ActiveJob::Base
|
||||||
|
queue_as :default
|
||||||
|
end
|
16
app/jobs/runner_cleanup_job.rb
Normal file
16
app/jobs/runner_cleanup_job.rb
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
class RunnerCleanupJob < ApplicationJob
|
||||||
|
CLEANUP_INTERVAL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:cleanup_interval].seconds
|
||||||
|
|
||||||
|
after_perform do |job|
|
||||||
|
# re-schedule job
|
||||||
|
self.class.set(wait: CLEANUP_INTERVAL).perform_later
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform
|
||||||
|
Rails.logger.debug(Time.now)
|
||||||
|
Runner.inactive_runners.each do |runner|
|
||||||
|
Rails.logger.debug("Destroying runner #{runner.runner_id}, unused since #{runner.last_used}")
|
||||||
|
runner.destroy
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
96
app/models/runner.rb
Normal file
96
app/models/runner.rb
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require 'runner/runner_connection'
|
||||||
|
|
||||||
|
class Runner < ApplicationRecord
|
||||||
|
BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url]
|
||||||
|
HEADERS = {'Content-Type' => 'application/json'}.freeze
|
||||||
|
UNUSED_EXPIRATION_TIME = CodeOcean::Config.new(:code_ocean).read[:runner_management][:unused_runner_expiration_time].seconds
|
||||||
|
|
||||||
|
belongs_to :execution_environment
|
||||||
|
belongs_to :user, polymorphic: true
|
||||||
|
|
||||||
|
before_create :get_runner
|
||||||
|
before_destroy :destroy_runner
|
||||||
|
|
||||||
|
validates :execution_environment_id, presence: true
|
||||||
|
validates :user, presence: true
|
||||||
|
validates :time_limit, presence: true
|
||||||
|
|
||||||
|
scope :inactive_runners, -> { where('last_used < ?', Time.now - UNUSED_EXPIRATION_TIME) }
|
||||||
|
|
||||||
|
def self.for(user, exercise, time_limit = 0)
|
||||||
|
execution_environment = ExecutionEnvironment.find(exercise.execution_environment_id)
|
||||||
|
runner = Runner.find_or_create_by(user: user, execution_environment: execution_environment, time_limit: time_limit)
|
||||||
|
|
||||||
|
return runner if runner.save
|
||||||
|
raise(RunnerNotAvailableError, 'No runner available')
|
||||||
|
end
|
||||||
|
|
||||||
|
def copy_files(files)
|
||||||
|
url = "#{runner_url}/files"
|
||||||
|
body = {files: files.map { |filename, content| {filepath: filename, content: content} }}
|
||||||
|
response = Faraday.patch(url, body.to_json, HEADERS)
|
||||||
|
if response.status == 404
|
||||||
|
# runner has disappeared for some reason
|
||||||
|
self.destroy
|
||||||
|
raise(RunnerNotAvailableError, "Runner unavailable")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def execute_command(command)
|
||||||
|
url = "#{runner_url}/execute"
|
||||||
|
response = Faraday.post(url, {command: command}.to_json, HEADERS)
|
||||||
|
if response.status == 404
|
||||||
|
# runner has disappeared for some reason
|
||||||
|
self.destroy
|
||||||
|
raise(RunnerNotAvailableError, "Runner unavailable")
|
||||||
|
end
|
||||||
|
used_now
|
||||||
|
parse response
|
||||||
|
end
|
||||||
|
|
||||||
|
def execute_interactively(command)
|
||||||
|
starting_time = Time.zone.now
|
||||||
|
websocket_url = execute_command(command)[:websocketUrl]
|
||||||
|
EventMachine.run do
|
||||||
|
socket = RunnerConnection.new(websocket_url)
|
||||||
|
yield(self, socket) if block_given?
|
||||||
|
end
|
||||||
|
Time.zone.now - starting_time # execution time
|
||||||
|
end
|
||||||
|
|
||||||
|
def destroy_runner
|
||||||
|
Faraday.delete runner_url
|
||||||
|
end
|
||||||
|
|
||||||
|
def status
|
||||||
|
# TODO: return actual state retrieved via websocket
|
||||||
|
:timeouted
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def get_runner
|
||||||
|
url = "#{BASE_URL}/runners"
|
||||||
|
body = {executionEnvironmentId: execution_environment.id}
|
||||||
|
body[:timeLimit] = time_limit
|
||||||
|
response = Faraday.post(url, body.to_json, HEADERS)
|
||||||
|
response_body = parse response
|
||||||
|
self.runner_id = response_body[:runnerId]
|
||||||
|
throw :abort unless response.status == 200
|
||||||
|
end
|
||||||
|
|
||||||
|
def runner_url
|
||||||
|
"#{BASE_URL}/runners/#{runner_id}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def parse(response)
|
||||||
|
JSON.parse(response.body).deep_symbolize_keys
|
||||||
|
end
|
||||||
|
|
||||||
|
def used_now
|
||||||
|
self.last_used = Time.now
|
||||||
|
save
|
||||||
|
end
|
||||||
|
end
|
@ -195,11 +195,10 @@ class Submission < ApplicationRecord
|
|||||||
|
|
||||||
def prepared_runner
|
def prepared_runner
|
||||||
request_time = Time.now
|
request_time = Time.now
|
||||||
runner = Runner.new(execution_environment, execution_environment.permitted_execution_time)
|
runner = Runner.for(user, exercise, execution_environment.permitted_execution_time)
|
||||||
copy_files_to runner
|
copy_files_to runner
|
||||||
runner.waiting_time = Time.now - request_time
|
runner.waiting_time = Time.now - request_time
|
||||||
yield(runner) if block_given?
|
yield(runner) if block_given?
|
||||||
runner.destroy
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def command_for(template, file)
|
def command_for(template, file)
|
||||||
|
@ -8,6 +8,10 @@ default: &default
|
|||||||
enabled: false
|
enabled: false
|
||||||
codeocean_events:
|
codeocean_events:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
runner_management:
|
||||||
|
url: https://runners.example.org
|
||||||
|
cleanup_interval: 60
|
||||||
|
unused_runner_expiration_time: 180
|
||||||
|
|
||||||
development:
|
development:
|
||||||
flowr:
|
flowr:
|
||||||
@ -21,8 +25,6 @@ development:
|
|||||||
url: https://codeharbor.openhpi.de
|
url: https://codeharbor.openhpi.de
|
||||||
prometheus_exporter:
|
prometheus_exporter:
|
||||||
enabled: false
|
enabled: false
|
||||||
runner_management:
|
|
||||||
url: https://runners.example.org
|
|
||||||
|
|
||||||
production:
|
production:
|
||||||
<<: *default
|
<<: *default
|
||||||
|
1
config/initializers/jobs.rb
Normal file
1
config/initializers/jobs.rb
Normal file
@ -0,0 +1 @@
|
|||||||
|
RunnerCleanupJob.perform_now unless Rake.application.top_level_tasks.to_s.include?('db:')
|
14
db/migrate/20210415064948_create_runners.rb
Normal file
14
db/migrate/20210415064948_create_runners.rb
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
class CreateRunners < ActiveRecord::Migration[5.2]
|
||||||
|
def change
|
||||||
|
create_table :runners do |t|
|
||||||
|
t.string :runner_id
|
||||||
|
t.references :execution_environment
|
||||||
|
t.references :user, polymorphic: true
|
||||||
|
t.integer :time_limit
|
||||||
|
t.float :waiting_time
|
||||||
|
t.datetime :last_used
|
||||||
|
|
||||||
|
t.timestamps
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
14
db/schema.rb
14
db/schema.rb
@ -339,6 +339,20 @@ ActiveRecord::Schema.define(version: 2021_05_12_133612) do
|
|||||||
t.index ["user_id", "user_type", "created_at"], name: "index_rfc_on_user_and_created_at", order: { created_at: :desc }
|
t.index ["user_id", "user_type", "created_at"], name: "index_rfc_on_user_and_created_at", order: { created_at: :desc }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
create_table "runners", force: :cascade do |t|
|
||||||
|
t.string "runner_id"
|
||||||
|
t.bigint "execution_environment_id"
|
||||||
|
t.string "user_type"
|
||||||
|
t.bigint "user_id"
|
||||||
|
t.integer "time_limit"
|
||||||
|
t.float "waiting_time"
|
||||||
|
t.datetime "last_used"
|
||||||
|
t.datetime "created_at", null: false
|
||||||
|
t.datetime "updated_at", null: false
|
||||||
|
t.index ["execution_environment_id"], name: "index_runners_on_execution_environment_id"
|
||||||
|
t.index ["user_type", "user_id"], name: "index_runners_on_user_type_and_user_id"
|
||||||
|
end
|
||||||
|
|
||||||
create_table "searches", id: :serial, force: :cascade do |t|
|
create_table "searches", id: :serial, force: :cascade do |t|
|
||||||
t.integer "exercise_id", null: false
|
t.integer "exercise_id", null: false
|
||||||
t.integer "user_id", null: false
|
t.integer "user_id", null: false
|
||||||
|
@ -1,58 +0,0 @@
|
|||||||
# frozen_string_literal: true
|
|
||||||
|
|
||||||
class Runner
|
|
||||||
BASE_URL = CodeOcean::Config.new(:code_ocean).read[:runner_management][:url]
|
|
||||||
HEADERS = {'Content-Type' => 'application/json'}.freeze
|
|
||||||
|
|
||||||
attr_accessor :waiting_time
|
|
||||||
|
|
||||||
def initialize(execution_environment, time_limit = nil)
|
|
||||||
url = "#{BASE_URL}/runners"
|
|
||||||
body = {executionEnvironmentId: execution_environment.id}
|
|
||||||
body[:timeLimit] = time_limit if time_limit
|
|
||||||
response = Faraday.post(url, body.to_json, HEADERS)
|
|
||||||
response = parse response
|
|
||||||
@id = response[:runnerId]
|
|
||||||
end
|
|
||||||
|
|
||||||
def copy_files(files)
|
|
||||||
url = "#{runner_url}/files"
|
|
||||||
body = {files: files.map { |filename, content| {filepath: filename, content: content} }}
|
|
||||||
Faraday.patch(url, body.to_json, HEADERS)
|
|
||||||
end
|
|
||||||
|
|
||||||
def execute_command(command)
|
|
||||||
url = "#{runner_url}/execute"
|
|
||||||
response = Faraday.post(url, {command: command}.to_json, HEADERS)
|
|
||||||
parse response
|
|
||||||
end
|
|
||||||
|
|
||||||
def execute_interactively(command)
|
|
||||||
starting_time = Time.zone.now
|
|
||||||
websocket_url = execute_command(command)[:websocketUrl]
|
|
||||||
EventMachine.run do
|
|
||||||
socket = RunnerConnection.new(websocket_url)
|
|
||||||
yield(self, socket) if block_given?
|
|
||||||
end
|
|
||||||
Time.zone.now - starting_time # execution time
|
|
||||||
end
|
|
||||||
|
|
||||||
def destroy
|
|
||||||
Faraday.delete runner_url
|
|
||||||
end
|
|
||||||
|
|
||||||
def status
|
|
||||||
# TODO: return actual state retrieved via websocket
|
|
||||||
:timeouted
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
|
||||||
|
|
||||||
def runner_url
|
|
||||||
"#{BASE_URL}/runners/#{@id}"
|
|
||||||
end
|
|
||||||
|
|
||||||
def parse(response)
|
|
||||||
JSON.parse(response.body).deep_symbolize_keys
|
|
||||||
end
|
|
||||||
end
|
|
Reference in New Issue
Block a user