Rework protocol inside websocket
Co-authored-by Felix Auringer <felix.auringer@student.hpi.uni-potsdam.de>
This commit is contained in:

committed by
Sebastian Serth

parent
6a4e302f4e
commit
347e4728a0
@ -16,7 +16,7 @@ class SubmissionsController < ApplicationController
|
||||
before_action :set_mime_type, only: %i[download_file render_file]
|
||||
skip_before_action :verify_authenticity_token, only: %i[download_file render_file]
|
||||
|
||||
def max_run_output_buffer_size
|
||||
def max_output_buffer_size
|
||||
if @submission.cause == 'requestComments'
|
||||
5000
|
||||
else
|
||||
@ -139,37 +139,53 @@ class SubmissionsController < ApplicationController
|
||||
|
||||
def handle_websockets(tubesock, container, socket)
|
||||
tubesock.send_data JSON.dump({'cmd' => 'status', 'status' => :container_running})
|
||||
@waiting_for_container_time = Time.zone.now - @container_request_time
|
||||
@execution_request_time = Time.zone.now
|
||||
@output = ''
|
||||
|
||||
socket.on :output do |data|
|
||||
Rails.logger.info("#{Time.zone.now.getutc}: Docker sending: #{data}")
|
||||
handle_message(data, tubesock)
|
||||
@output << data if @output.size + data.size <= max_output_buffer_size
|
||||
end
|
||||
|
||||
socket.on :exit do |_exit_code|
|
||||
socket.on :stdout do |data|
|
||||
tubesock.send_data(JSON.dump({cmd: :write, stream: :stdout, data: data}))
|
||||
end
|
||||
|
||||
socket.on :stderr do |data|
|
||||
tubesock.send_data(JSON.dump({cmd: :write, stream: :stderr, data: data}))
|
||||
end
|
||||
|
||||
socket.on :exit do |exit_code|
|
||||
EventMachine.stop_event_loop
|
||||
tubesock.send_data JSON.dump({'cmd' => 'timeout'}) if container.status == 'timeouted'
|
||||
status = container.status
|
||||
if status == :timeouted
|
||||
tubesock.send_data JSON.dump({cmd: :timeout})
|
||||
@output = "timeout: #{@output}"
|
||||
elsif @output.empty?
|
||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: t('exercises.implement.no_output', timestamp: l(Time.now, format: :short))})
|
||||
end
|
||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: t('exercises.implement.exit', exit_code: exit_code)}) unless status == :timeouted
|
||||
kill_socket(tubesock)
|
||||
end
|
||||
|
||||
tubesock.onmessage do |data|
|
||||
Rails.logger.info("#{Time.now.getutc.to_s}: Client sending: #{data}")
|
||||
# Check whether the client send a JSON command and kill container
|
||||
# if the command is 'client_kill', send it to docker otherwise.
|
||||
tubesock.onmessage do |event|
|
||||
begin
|
||||
|
||||
parsed = JSON.parse(data) unless data == "\n"
|
||||
if parsed.instance_of?(Hash) && parsed['cmd'] == 'client_kill'
|
||||
Rails.logger.debug("Client exited container.")
|
||||
event = JSON.parse(event).deep_symbolize_keys
|
||||
case event[:cmd].to_sym
|
||||
when :client_kill
|
||||
EventMachine.stop_event_loop
|
||||
kill_socket(tubesock)
|
||||
container.destroy
|
||||
Rails.logger.debug('Client exited container.')
|
||||
when :result
|
||||
socket.send event[:data]
|
||||
else
|
||||
socket.send data
|
||||
Rails.logger.debug { "Sent the received client data to docker:#{data}" }
|
||||
Rails.logger.info("Unknown command from client: #{event[:cmd]}")
|
||||
end
|
||||
rescue JSON::ParserError => error
|
||||
socket.send data
|
||||
Rails.logger.debug { "Rescued parsing error, sent the received client data to docker:#{data}" }
|
||||
Rails.logger.debug { "Data received from client is not valid json: #{data}" }
|
||||
Sentry.set_extras(data: data)
|
||||
rescue TypeError => error
|
||||
Rails.logger.debug { "JSON data received from client cannot be parsed to hash: #{data}" }
|
||||
Sentry.set_extras(data: data)
|
||||
end
|
||||
end
|
||||
@ -182,14 +198,17 @@ class SubmissionsController < ApplicationController
|
||||
kill_socket(tubesock)
|
||||
return
|
||||
end
|
||||
@container_request_time = Time.zone.now
|
||||
@submission.run(sanitize_filename) do |container, socket|
|
||||
@container_execution_time = @submission.run(sanitize_filename) do |container, socket|
|
||||
@waiting_for_container_time = container.waiting_time
|
||||
handle_websockets(tubesock, container, socket)
|
||||
end
|
||||
end
|
||||
# save the output of this "run" as a "testrun" (scoring runs are saved in submission_scoring.rb)
|
||||
save_run_output
|
||||
ensure
|
||||
ActiveRecord::Base.connection_pool.release_connection
|
||||
end
|
||||
# TODO determine if this is necessary
|
||||
# unless EventMachine.reactor_running? && EventMachine.reactor_thread.alive?
|
||||
# Thread.new do
|
||||
# EventMachine.run
|
||||
@ -200,128 +219,36 @@ class SubmissionsController < ApplicationController
|
||||
end
|
||||
|
||||
def kill_socket(tubesock)
|
||||
@container_execution_time = Time.zone.now - @execution_request_time if @execution_request_time.present?
|
||||
# search for errors and save them as StructuredError (for scoring runs see submission_scoring.rb)
|
||||
errors = extract_errors
|
||||
send_hints(tubesock, errors)
|
||||
|
||||
# save the output of this "run" as a "testrun" (scoring runs are saved in submission_scoring.rb)
|
||||
save_run_output
|
||||
|
||||
# For Python containers, the @run_output is '{"cmd":"exit"}' as a string.
|
||||
# If this is the case, we should consider it as blank
|
||||
if @run_output.blank? || @run_output&.strip == '{"cmd":"exit"}' || @run_output&.strip == 'timeout:'
|
||||
@raw_output ||= ''
|
||||
@run_output ||= ''
|
||||
parse_message t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short)), 'stdout', tubesock
|
||||
end
|
||||
|
||||
# Hijacked connection needs to be notified correctly
|
||||
tubesock.send_data JSON.dump({'cmd' => 'exit'})
|
||||
tubesock.send_data JSON.dump({cmd: :exit})
|
||||
tubesock.close
|
||||
end
|
||||
|
||||
def handle_message(message, tubesock)
|
||||
@raw_output ||= ''
|
||||
@run_output ||= ''
|
||||
# Handle special commands first
|
||||
case message
|
||||
when /^#exit|{"cmd": "exit"}/
|
||||
# Just call exit_container on the docker_client.
|
||||
# Do not call kill_socket for the websocket to the client here.
|
||||
# @docker_client.exit_container closes the socket to the container,
|
||||
# kill_socket is called in the "on close handler" of the websocket to the container
|
||||
# container.destroy
|
||||
when /^#timeout/
|
||||
@run_output = "timeout: #{@run_output}" # add information that this run timed out to the buffer
|
||||
else
|
||||
# Filter out information about run_command, test_command, user or working directory
|
||||
run_command = @submission.execution_environment.run_command % command_substitutions(sanitize_filename)
|
||||
test_command = @submission.execution_environment.test_command % command_substitutions(sanitize_filename)
|
||||
if test_command.blank?
|
||||
# If no test command is set, use the run_command for the RegEx below. Otherwise, no output will be displayed!
|
||||
test_command = run_command
|
||||
end
|
||||
unless %r{root@|:/workspace|#{run_command}|#{test_command}|bash: cmd:canvasevent: command not found}.match?(message)
|
||||
parse_message(message, 'stdout', tubesock)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def parse_message(message, output_stream, socket, container = nil, recursive: true)
|
||||
parsed = ''
|
||||
begin
|
||||
parsed = JSON.parse(message)
|
||||
if parsed.instance_of?(Hash) && parsed.key?('cmd')
|
||||
socket.send_data message
|
||||
Rails.logger.info("parse_message sent: #{message}")
|
||||
# container.destroy if container && parsed['cmd'] == 'exit'
|
||||
else
|
||||
parsed = {'cmd' => 'write', 'stream' => output_stream, 'data' => message}
|
||||
socket.send_data JSON.dump(parsed)
|
||||
Rails.logger.info("parse_message sent: #{JSON.dump(parsed)}")
|
||||
end
|
||||
rescue JSON::ParserError
|
||||
# Check wether the message contains multiple lines, if true try to parse each line
|
||||
if recursive && message.include?("\n")
|
||||
message.split("\n").each do |part|
|
||||
parse_message(part, output_stream, socket, container, recursive: false)
|
||||
end
|
||||
elsif message.include?('<img') || message.start_with?('{"cmd') || message.include?('"turtlebatch"')
|
||||
# Rails.logger.info('img foung')
|
||||
@buffering = true
|
||||
@buffer = ''
|
||||
@buffer += message
|
||||
# Rails.logger.info('Starting to buffer')
|
||||
elsif @buffering && message.include?('/>')
|
||||
@buffer += message
|
||||
parsed = {'cmd' => 'write', 'stream' => output_stream, 'data' => @buffer}
|
||||
socket.send_data JSON.dump(parsed)
|
||||
# socket.send_data @buffer
|
||||
@buffering = false
|
||||
# Rails.logger.info('Sent complete buffer')
|
||||
elsif @buffering && message.end_with?("}\r")
|
||||
@buffer += message
|
||||
socket.send_data @buffer
|
||||
@buffering = false
|
||||
# Rails.logger.info('Sent complete buffer')
|
||||
elsif @buffering
|
||||
@buffer += message
|
||||
# Rails.logger.info('Appending to buffer')
|
||||
else
|
||||
# Rails.logger.info('else')
|
||||
parsed = {'cmd' => 'write', 'stream' => output_stream, 'data' => message}
|
||||
socket.send_data JSON.dump(parsed)
|
||||
Rails.logger.info("parse_message sent: #{JSON.dump(parsed)}")
|
||||
end
|
||||
ensure
|
||||
@raw_output += parsed['data'].to_s if parsed.instance_of?(Hash) && parsed.key?('data')
|
||||
# save the data that was send to the run_output if there is enough space left. this will be persisted as a testrun with cause "run"
|
||||
@run_output += JSON.dump(parsed).to_s if @run_output.size <= max_run_output_buffer_size
|
||||
end
|
||||
end
|
||||
|
||||
def save_run_output
|
||||
if @run_output.present?
|
||||
@run_output = @run_output[(0..max_run_output_buffer_size - 1)] # trim the string to max_message_buffer_size chars
|
||||
return if @output.blank?
|
||||
|
||||
@output = @output[0, max_output_buffer_size] # trim the string to max_output_buffer_size chars
|
||||
Testrun.create(
|
||||
file: @file,
|
||||
cause: 'run',
|
||||
submission: @submission,
|
||||
output: @run_output,
|
||||
output: @output,
|
||||
container_execution_time: @container_execution_time,
|
||||
waiting_for_container_time: @waiting_for_container_time
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def extract_errors
|
||||
results = []
|
||||
if @raw_output.present?
|
||||
if @output.present?
|
||||
@submission.exercise.execution_environment.error_templates.each do |template|
|
||||
pattern = Regexp.new(template.signature).freeze
|
||||
if pattern.match(@raw_output)
|
||||
results << StructuredError.create_from_template(template, @raw_output, @submission)
|
||||
if pattern.match(@output)
|
||||
results << StructuredError.create_from_template(template, @output, @submission)
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -386,22 +313,24 @@ class SubmissionsController < ApplicationController
|
||||
|
||||
def statistics; end
|
||||
|
||||
def test
|
||||
Thread.new do
|
||||
hijack do |tubesock|
|
||||
if @embed_options[:disable_run]
|
||||
kill_socket(tubesock)
|
||||
return
|
||||
end
|
||||
@container_request_time = Time.now
|
||||
@submission.run_tests(sanitize_filename) do |container|
|
||||
handle_websockets(tubesock, container)
|
||||
end
|
||||
end
|
||||
ensure
|
||||
ActiveRecord::Base.connection_pool.release_connection
|
||||
end
|
||||
end
|
||||
# TODO is this needed?
|
||||
# def test
|
||||
# Thread.new do
|
||||
# hijack do |tubesock|
|
||||
# if @embed_options[:disable_run]
|
||||
# kill_socket(tubesock)
|
||||
# return
|
||||
# end
|
||||
# @container_request_time = Time.now
|
||||
# @submission.run_tests(sanitize_filename) do |container|
|
||||
# handle_websockets(tubesock, container)
|
||||
# end
|
||||
# end
|
||||
# ensure
|
||||
# ActiveRecord::Base.connection_pool.release_connection
|
||||
# end
|
||||
# end
|
||||
|
||||
|
||||
def with_server_sent_events
|
||||
response.headers['Content-Type'] = 'text/event-stream'
|
||||
|
@ -147,7 +147,7 @@ class Submission < ApplicationRecord
|
||||
stdout = ""
|
||||
stderr = ""
|
||||
exit_code = 1 # default to error
|
||||
container.execute_interactively(score_command) do |container, socket|
|
||||
execution_time = container.execute_interactively(score_command) do |container, socket|
|
||||
socket.on :stderr do |data|
|
||||
stderr << data
|
||||
end
|
||||
@ -161,8 +161,8 @@ class Submission < ApplicationRecord
|
||||
end
|
||||
output = {
|
||||
file_role: file.role,
|
||||
waiting_for_container_time: 1, # TODO
|
||||
container_execution_time: 1, # TODO
|
||||
waiting_for_container_time: container.waiting_time,
|
||||
container_execution_time: execution_time,
|
||||
status: (exit_code == 0) ? :ok : :failed,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
@ -176,9 +176,11 @@ class Submission < ApplicationRecord
|
||||
|
||||
def run(file, &block)
|
||||
run_command = command_for execution_environment.run_command, file
|
||||
execution_time = 0
|
||||
prepared_container do |container|
|
||||
container.execute_interactively(run_command, &block)
|
||||
execution_time = container.execute_interactively(run_command, &block)
|
||||
end
|
||||
execution_time
|
||||
end
|
||||
|
||||
private
|
||||
@ -187,10 +189,8 @@ class Submission < ApplicationRecord
|
||||
request_time = Time.now
|
||||
container = Container.new(execution_environment, execution_environment.permitted_execution_time)
|
||||
container.copy_submission_files self
|
||||
container_time = Time.now
|
||||
waiting_for_container_time = Time.now - request_time
|
||||
container.waiting_time = Time.now - request_time
|
||||
yield(container) if block_given?
|
||||
execution_time = Time.now - container_time
|
||||
container.destroy
|
||||
end
|
||||
|
||||
|
@ -397,6 +397,7 @@ de:
|
||||
hint: Hinweis
|
||||
no_files: Die Aufgabe umfasst noch keine sichtbaren Dateien.
|
||||
no_output: Die letzte Code-Ausführung terminierte am %{timestamp} ohne Ausgabe.
|
||||
exit: Der Exit-Status war %{exit_code}.
|
||||
no_output_yet: Bisher existiert noch keine Ausgabe.
|
||||
output: Programm-Ausgabe
|
||||
passed_tests: Erfolgreiche Tests
|
||||
|
@ -397,6 +397,7 @@ en:
|
||||
hint: Hint
|
||||
no_files: The exercise does not comprise visible files yet.
|
||||
no_output: The last code run finished on %{timestamp} without any output.
|
||||
exit: The exit status was %{exit_code}.
|
||||
no_output_yet: There is no output yet.
|
||||
output: Program Output
|
||||
passed_tests: Passed Tests
|
||||
|
@ -4,6 +4,9 @@ require 'container_connection'
|
||||
|
||||
class Container
|
||||
BASE_URL = CodeOcean::Config.new(:code_ocean).read[:container_management][:url]
|
||||
HEADERS = {"Content-Type" => "application/json"}
|
||||
|
||||
attr_accessor :waiting_time
|
||||
|
||||
def initialize(execution_environment, time_limit = nil)
|
||||
url = "#{BASE_URL}/execution-environments/#{execution_environment.id}/containers/create"
|
||||
@ -11,7 +14,7 @@ class Container
|
||||
if time_limit
|
||||
body[:time_limit] = time_limit
|
||||
end
|
||||
response = Faraday.post(url, body.to_json, "Content-Type" => "application/json")
|
||||
response = Faraday.post(url, body.to_json, HEADERS)
|
||||
response = parse response
|
||||
@container_id = response[:id]
|
||||
end
|
||||
@ -19,7 +22,7 @@ class Container
|
||||
def copy_files(files)
|
||||
url = container_url + "/files"
|
||||
body = files.map{ |filename, content| { filename => content } }
|
||||
Faraday.post(url, body.to_json, "Content-Type" => "application/json")
|
||||
Faraday.post(url, body.to_json, HEADERS)
|
||||
end
|
||||
|
||||
def copy_submission_files(submission)
|
||||
@ -32,17 +35,19 @@ class Container
|
||||
|
||||
def execute_command(command)
|
||||
url = container_url + "/execute"
|
||||
response = Faraday.patch(url, {command: command}.to_json, "Content-Type" => "application/json")
|
||||
response = Faraday.patch(url, {command: command}.to_json, HEADERS)
|
||||
response = parse response
|
||||
response
|
||||
end
|
||||
|
||||
def execute_interactively(command)
|
||||
starting_time = Time.now
|
||||
websocket_url = execute_command(command)[:websocket_url]
|
||||
EventMachine.run do
|
||||
socket = ContainerConnection.new(websocket_url)
|
||||
yield(self, socket) if block_given?
|
||||
end
|
||||
Time.now - starting_time # execution time
|
||||
end
|
||||
|
||||
def destroy
|
||||
@ -50,7 +55,7 @@ class Container
|
||||
end
|
||||
|
||||
def status
|
||||
parse(Faraday.get(container_url))[:status]
|
||||
parse(Faraday.get(container_url))[:status].to_sym
|
||||
end
|
||||
|
||||
private
|
||||
|
Reference in New Issue
Block a user