Ensure that only one EventMachine is running
This commit is contained in:

committed by
Sebastian Serth

parent
5cc180d0e9
commit
c7369366d5
@ -122,76 +122,85 @@ class SubmissionsController < ApplicationController
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_websockets(tubesock, socket)
|
|
||||||
tubesock.send_data JSON.dump({cmd: :status, status: :container_running})
|
|
||||||
|
|
||||||
socket.on :stdout do |data|
|
|
||||||
json_data = JSON.dump({cmd: :write, stream: :stdout, data: data})
|
|
||||||
@output << json_data[0, max_output_buffer_size - @output.size]
|
|
||||||
tubesock.send_data(json_data)
|
|
||||||
end
|
|
||||||
|
|
||||||
socket.on :stderr do |data|
|
|
||||||
json_data = JSON.dump({cmd: :write, stream: :stderr, data: data})
|
|
||||||
@output << json_data[0, max_output_buffer_size - @output.size]
|
|
||||||
tubesock.send_data(json_data)
|
|
||||||
end
|
|
||||||
|
|
||||||
socket.on :exit do |exit_code|
|
|
||||||
EventMachine.stop_event_loop
|
|
||||||
if @output.empty?
|
|
||||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"})
|
|
||||||
end
|
|
||||||
tubesock.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"})
|
|
||||||
kill_socket(tubesock)
|
|
||||||
end
|
|
||||||
|
|
||||||
tubesock.onmessage do |event|
|
|
||||||
event = JSON.parse(event).deep_symbolize_keys
|
|
||||||
case event[:cmd].to_sym
|
|
||||||
when :client_kill
|
|
||||||
EventMachine.stop_event_loop
|
|
||||||
kill_socket(tubesock)
|
|
||||||
Rails.logger.debug('Client exited container.')
|
|
||||||
when :result
|
|
||||||
socket.send event[:data]
|
|
||||||
else
|
|
||||||
Rails.logger.info("Unknown command from client: #{event[:cmd]}")
|
|
||||||
end
|
|
||||||
|
|
||||||
rescue JSON::ParserError
|
|
||||||
Rails.logger.debug { "Data received from client is not valid json: #{data}" }
|
|
||||||
Sentry.set_extras(data: data)
|
|
||||||
rescue TypeError
|
|
||||||
Rails.logger.debug { "JSON data received from client cannot be parsed to hash: #{data}" }
|
|
||||||
Sentry.set_extras(data: data)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def run
|
def run
|
||||||
@output = +''
|
# These method-local socket variables are required in order to use one socket
|
||||||
hijack do |tubesock|
|
# in the callbacks of the other socket. As the callbacks for the client socket
|
||||||
return kill_socket(tubesock) if @embed_options[:disable_run]
|
# are registered first, the runner socket may still be nil.
|
||||||
|
client_socket, runner_socket = nil
|
||||||
|
|
||||||
durations = @submission.run(sanitize_filename) do |socket|
|
hijack do |tubesock|
|
||||||
handle_websockets(tubesock, socket)
|
client_socket = tubesock
|
||||||
|
return kill_client_socket(client_socket) if @embed_options[:disable_run]
|
||||||
|
|
||||||
|
client_socket.onclose do |_event|
|
||||||
|
runner_socket&.close(:terminated_by_client)
|
||||||
|
end
|
||||||
|
|
||||||
|
client_socket.onmessage do |event|
|
||||||
|
event = JSON.parse(event).deep_symbolize_keys
|
||||||
|
case event[:cmd].to_sym
|
||||||
|
when :client_kill
|
||||||
|
close_client_connection(client_socket)
|
||||||
|
Rails.logger.debug('Client exited container.')
|
||||||
|
when :result
|
||||||
|
# The client cannot send something before the runner connection is established.
|
||||||
|
if runner_socket.present?
|
||||||
|
runner_socket.send event[:data]
|
||||||
|
else
|
||||||
|
Rails.logger.info("Could not forward data from client because runner connection was not established yet: #{event[:data].inspect}")
|
||||||
|
end
|
||||||
|
else
|
||||||
|
Rails.logger.info("Unknown command from client: #{event[:cmd]}")
|
||||||
|
end
|
||||||
|
rescue JSON::ParserError
|
||||||
|
Rails.logger.info("Data received from client is not valid json: #{data.inspect}")
|
||||||
|
Sentry.set_extras(data: data)
|
||||||
|
rescue TypeError
|
||||||
|
Rails.logger.info("JSON data received from client cannot be parsed as hash: #{data.inspect}")
|
||||||
|
Sentry.set_extras(data: data)
|
||||||
end
|
end
|
||||||
@container_execution_time = durations[:execution_duration]
|
|
||||||
@waiting_for_container_time = durations[:waiting_duration]
|
|
||||||
rescue Runner::Error::ExecutionTimeout => e
|
|
||||||
tubesock.send_data JSON.dump({cmd: :status, status: :timeout})
|
|
||||||
kill_socket(tubesock)
|
|
||||||
Rails.logger.debug { "Running a submission timed out: #{e.message}" }
|
|
||||||
@output = "timeout: #{@output}"
|
|
||||||
extract_durations(e)
|
|
||||||
rescue Runner::Error => e
|
|
||||||
tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted})
|
|
||||||
kill_socket(tubesock)
|
|
||||||
Rails.logger.debug { "Runner error while running a submission: #{e.message}" }
|
|
||||||
extract_durations(e)
|
|
||||||
ensure
|
|
||||||
save_run_output
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@output = +''
|
||||||
|
durations = @submission.run(sanitize_filename) do |socket|
|
||||||
|
runner_socket = socket
|
||||||
|
client_socket.send_data JSON.dump({cmd: :status, status: :container_running})
|
||||||
|
|
||||||
|
runner_socket.on :stdout do |data|
|
||||||
|
json_data = JSON.dump({cmd: :write, stream: :stdout, data: data})
|
||||||
|
@output << json_data[0, max_output_buffer_size - @output.size]
|
||||||
|
client_socket.send_data(json_data)
|
||||||
|
end
|
||||||
|
|
||||||
|
runner_socket.on :stderr do |data|
|
||||||
|
json_data = JSON.dump({cmd: :write, stream: :stderr, data: data})
|
||||||
|
@output << json_data[0, max_output_buffer_size - @output.size]
|
||||||
|
client_socket.send_data(json_data)
|
||||||
|
end
|
||||||
|
|
||||||
|
runner_socket.on :exit do |exit_code|
|
||||||
|
if @output.empty?
|
||||||
|
client_socket.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.no_output', timestamp: l(Time.zone.now, format: :short))}\n"})
|
||||||
|
end
|
||||||
|
client_socket.send_data JSON.dump({cmd: :write, stream: :stdout, data: "#{t('exercises.implement.exit', exit_code: exit_code)}\n"})
|
||||||
|
close_client_connection(client_socket)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
@container_execution_time = durations[:execution_duration]
|
||||||
|
@waiting_for_container_time = durations[:waiting_duration]
|
||||||
|
rescue Runner::Error::ExecutionTimeout => e
|
||||||
|
client_socket.send_data JSON.dump({cmd: :status, status: :timeout})
|
||||||
|
close_client_connection(client_socket)
|
||||||
|
Rails.logger.debug { "Running a submission timed out: #{e.message}" }
|
||||||
|
@output = "timeout: #{@output}"
|
||||||
|
extract_durations(e)
|
||||||
|
rescue Runner::Error => e
|
||||||
|
client_socket.send_data JSON.dump({cmd: :status, status: :container_depleted})
|
||||||
|
close_client_connection(client_socket)
|
||||||
|
Rails.logger.debug { "Runner error while running a submission: #{e.message}" }
|
||||||
|
extract_durations(e)
|
||||||
|
ensure
|
||||||
|
save_run_output
|
||||||
end
|
end
|
||||||
|
|
||||||
def extract_durations(error)
|
def extract_durations(error)
|
||||||
@ -200,14 +209,16 @@ class SubmissionsController < ApplicationController
|
|||||||
end
|
end
|
||||||
private :extract_durations
|
private :extract_durations
|
||||||
|
|
||||||
def kill_socket(tubesock)
|
def close_client_connection(client_socket)
|
||||||
# search for errors and save them as StructuredError (for scoring runs see submission.rb)
|
# search for errors and save them as StructuredError (for scoring runs see submission.rb)
|
||||||
errors = extract_errors
|
errors = extract_errors
|
||||||
send_hints(tubesock, errors)
|
send_hints(client_socket, errors)
|
||||||
|
kill_client_socket(client_socket)
|
||||||
|
end
|
||||||
|
|
||||||
# Hijacked connection needs to be notified correctly
|
def kill_client_socket(client_socket)
|
||||||
tubesock.send_data JSON.dump({cmd: :exit})
|
client_socket.send_data JSON.dump({cmd: :exit})
|
||||||
tubesock.close
|
client_socket.close
|
||||||
end
|
end
|
||||||
|
|
||||||
# save the output of this "run" as a "testrun" (scoring runs are saved in submission.rb)
|
# save the output of this "run" as a "testrun" (scoring runs are saved in submission.rb)
|
||||||
@ -235,7 +246,7 @@ class SubmissionsController < ApplicationController
|
|||||||
|
|
||||||
def score
|
def score
|
||||||
hijack do |tubesock|
|
hijack do |tubesock|
|
||||||
return kill_socket(tubesock) if @embed_options[:disable_run]
|
return if @embed_options[:disable_run]
|
||||||
|
|
||||||
tubesock.send_data(JSON.dump(@submission.calculate_score))
|
tubesock.send_data(JSON.dump(@submission.calculate_score))
|
||||||
# To enable hints when scoring a submission, uncomment the next line:
|
# To enable hints when scoring a submission, uncomment the next line:
|
||||||
@ -244,8 +255,7 @@ class SubmissionsController < ApplicationController
|
|||||||
tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted})
|
tubesock.send_data JSON.dump({cmd: :status, status: :container_depleted})
|
||||||
Rails.logger.debug { "Runner error while scoring submission #{@submission.id}: #{e.message}" }
|
Rails.logger.debug { "Runner error while scoring submission #{@submission.id}: #{e.message}" }
|
||||||
ensure
|
ensure
|
||||||
tubesock.send_data JSON.dump({cmd: :exit})
|
kill_client_socket(tubesock)
|
||||||
tubesock.close
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -41,9 +41,17 @@ class Runner < ApplicationRecord
|
|||||||
end
|
end
|
||||||
|
|
||||||
def attach_to_execution(command, &block)
|
def attach_to_execution(command, &block)
|
||||||
|
ensure_event_machine
|
||||||
starting_time = Time.zone.now
|
starting_time = Time.zone.now
|
||||||
begin
|
begin
|
||||||
@strategy.attach_to_execution(command, &block)
|
# As the EventMachine reactor is probably shared with other threads, we cannot use EventMachine.run with
|
||||||
|
# stop_event_loop to wait for the WebSocket connection to terminate. Instead we use a self built event
|
||||||
|
# loop for that: Runner::EventLoop. The attach_to_execution method of the strategy is responsible for
|
||||||
|
# initializing its Runner::Connection with the given event loop. The Runner::Connection class ensures that
|
||||||
|
# this event loop is stopped after the socket was closed.
|
||||||
|
event_loop = Runner::EventLoop.new
|
||||||
|
@strategy.attach_to_execution(command, event_loop, &block)
|
||||||
|
event_loop.wait
|
||||||
rescue Runner::Error => e
|
rescue Runner::Error => e
|
||||||
e.execution_duration = Time.zone.now - starting_time
|
e.execution_duration = Time.zone.now - starting_time
|
||||||
raise
|
raise
|
||||||
@ -57,6 +65,22 @@ class Runner < ApplicationRecord
|
|||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
|
# If there are multiple threads trying to connect to the WebSocket of their execution at the same time,
|
||||||
|
# the Faye WebSocket connections will use the same reactor. We therefore only need to start an EventMachine
|
||||||
|
# if there isn't a running reactor yet.
|
||||||
|
# See this StackOverflow answer: https://stackoverflow.com/a/8247947
|
||||||
|
def ensure_event_machine
|
||||||
|
unless EventMachine.reactor_running? && EventMachine.reactor_thread.alive?
|
||||||
|
event_loop = Runner::EventLoop.new
|
||||||
|
Thread.new do
|
||||||
|
EventMachine.run { event_loop.stop }
|
||||||
|
ensure
|
||||||
|
ActiveRecord::Base.connection_pool.release_connection
|
||||||
|
end
|
||||||
|
event_loop.wait
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def request_id
|
def request_id
|
||||||
request_new_id if runner_id.blank?
|
request_new_id if runner_id.blank?
|
||||||
end
|
end
|
||||||
|
@ -158,7 +158,6 @@ class Submission < ApplicationRecord
|
|||||||
end
|
end
|
||||||
socket.on :exit do |received_exit_code|
|
socket.on :exit do |received_exit_code|
|
||||||
exit_code = received_exit_code
|
exit_code = received_exit_code
|
||||||
EventMachine.stop_event_loop
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
output.merge!(container_execution_time: execution_time, status: exit_code.zero? ? :ok : :failed)
|
output.merge!(container_execution_time: execution_time, status: exit_code.zero? ? :ok : :failed)
|
||||||
|
@ -11,12 +11,13 @@ class Runner::Connection
|
|||||||
|
|
||||||
attr_writer :status
|
attr_writer :status
|
||||||
|
|
||||||
def initialize(url, strategy)
|
def initialize(url, strategy, event_loop)
|
||||||
@socket = Faye::WebSocket::Client.new(url, [], ping: 5)
|
@socket = Faye::WebSocket::Client.new(url, [], ping: 5)
|
||||||
@strategy = strategy
|
@strategy = strategy
|
||||||
@status = :established
|
@status = :established
|
||||||
|
@event_loop = event_loop
|
||||||
|
|
||||||
# For every event type of faye websockets, the corresponding
|
# For every event type of Faye WebSockets, the corresponding
|
||||||
# RunnerConnection method starting with `on_` is called.
|
# RunnerConnection method starting with `on_` is called.
|
||||||
%i[open message error close].each do |event_type|
|
%i[open message error close].each do |event_type|
|
||||||
@socket.on(event_type) {|event| __send__(:"on_#{event_type}", event) }
|
@socket.on(event_type) {|event| __send__(:"on_#{event_type}", event) }
|
||||||
@ -41,6 +42,17 @@ class Runner::Connection
|
|||||||
@socket.send(encoded_message)
|
@socket.send(encoded_message)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def close(status)
|
||||||
|
return unless active?
|
||||||
|
|
||||||
|
@status = status
|
||||||
|
@socket.close
|
||||||
|
end
|
||||||
|
|
||||||
|
def active?
|
||||||
|
@status == :established
|
||||||
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def decode(_raw_event)
|
def decode(_raw_event)
|
||||||
@ -61,7 +73,7 @@ class Runner::Connection
|
|||||||
if WEBSOCKET_MESSAGE_TYPES.include?(message_type)
|
if WEBSOCKET_MESSAGE_TYPES.include?(message_type)
|
||||||
__send__("handle_#{message_type}", event)
|
__send__("handle_#{message_type}", event)
|
||||||
else
|
else
|
||||||
raise Runner::Error::UnexpectedResponse.new("Unknown websocket message type: #{message_type}")
|
raise Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -78,6 +90,9 @@ class Runner::Connection
|
|||||||
raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
|
raise Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
|
||||||
when :terminated_by_codeocean, :terminated_by_management
|
when :terminated_by_codeocean, :terminated_by_management
|
||||||
@exit_callback.call @exit_code
|
@exit_callback.call @exit_code
|
||||||
|
@event_loop.stop
|
||||||
|
when :terminated_by_client
|
||||||
|
@event_loop.stop
|
||||||
else # :established
|
else # :established
|
||||||
# If the runner is killed by the DockerContainerPool after the maximum allowed time per user and
|
# If the runner is killed by the DockerContainerPool after the maximum allowed time per user and
|
||||||
# while the owning user is running an execution, the command execution stops and log output is incomplete.
|
# while the owning user is running an execution, the command execution stops and log output is incomplete.
|
||||||
|
22
lib/runner/event_loop.rb
Normal file
22
lib/runner/event_loop.rb
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
# EventLoop is an abstraction around Ruby's queue so that its usage is better
|
||||||
|
# understandable in our context.
|
||||||
|
class Runner::EventLoop
|
||||||
|
def initialize
|
||||||
|
@queue = Queue.new
|
||||||
|
end
|
||||||
|
|
||||||
|
# wait waits until another thread calls stop on this EventLoop.
|
||||||
|
# There may only be one active wait call per loop at a time, otherwise it is not
|
||||||
|
# deterministic which one will be unblocked if stop is called.
|
||||||
|
def wait
|
||||||
|
@queue.pop
|
||||||
|
end
|
||||||
|
|
||||||
|
# stop unblocks the currently active wait call. If there is none, the
|
||||||
|
# next call to wait will not be blocking.
|
||||||
|
def stop
|
||||||
|
@queue.push nil if @queue.empty?
|
||||||
|
end
|
||||||
|
end
|
@ -51,19 +51,22 @@ class Runner::Strategy::DockerContainerPool < Runner::Strategy
|
|||||||
raise Runner::Error::FaradayError.new("Request to DockerContainerPool failed: #{e.inspect}")
|
raise Runner::Error::FaradayError.new("Request to DockerContainerPool failed: #{e.inspect}")
|
||||||
end
|
end
|
||||||
|
|
||||||
def attach_to_execution(command)
|
def attach_to_execution(command, event_loop)
|
||||||
@command = command
|
@command = command
|
||||||
query_params = 'logs=0&stream=1&stderr=1&stdout=1&stdin=1'
|
query_params = 'logs=0&stream=1&stderr=1&stdout=1&stdin=1'
|
||||||
websocket_url = "#{self.class.config[:ws_host]}/v1.27/containers/#{@container_id}/attach/ws?#{query_params}"
|
websocket_url = "#{self.class.config[:ws_host]}/v1.27/containers/#{@container_id}/attach/ws?#{query_params}"
|
||||||
|
|
||||||
EventMachine.run do
|
socket = Connection.new(websocket_url, self, event_loop)
|
||||||
socket = Connection.new(websocket_url, self)
|
begin
|
||||||
EventMachine.add_timer(@execution_environment.permitted_execution_time) do
|
Timeout.timeout(@execution_environment.permitted_execution_time) do
|
||||||
socket.status = :timeout
|
socket.send(command)
|
||||||
destroy_at_management
|
yield(socket)
|
||||||
|
event_loop.wait
|
||||||
|
event_loop.stop
|
||||||
end
|
end
|
||||||
socket.send(command)
|
rescue Timeout::Error
|
||||||
yield(socket)
|
socket.close(:timeout)
|
||||||
|
destroy_at_management
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -118,8 +121,7 @@ class Runner::Strategy::DockerContainerPool < Runner::Strategy
|
|||||||
# Assume correct termination for now and return exit code 0
|
# Assume correct termination for now and return exit code 0
|
||||||
# TODO: Can we use the actual exit code here?
|
# TODO: Can we use the actual exit code here?
|
||||||
@exit_code = 0
|
@exit_code = 0
|
||||||
@status = :terminated_by_codeocean
|
close(:terminated_by_codeocean)
|
||||||
@socket.close
|
|
||||||
when /#{format(@strategy.execution_environment.test_command, class_name: '.*', filename: '.*', module_name: '.*')}/
|
when /#{format(@strategy.execution_environment.test_command, class_name: '.*', filename: '.*', module_name: '.*')}/
|
||||||
# TODO: Super dirty hack to redirect test output to stderr (remove attr_reader afterwards)
|
# TODO: Super dirty hack to redirect test output to stderr (remove attr_reader afterwards)
|
||||||
@stream = 'stderr'
|
@stream = 'stderr'
|
||||||
|
@ -85,12 +85,10 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
|||||||
raise Runner::Error::FaradayError.new("Request to Poseidon failed: #{e.inspect}")
|
raise Runner::Error::FaradayError.new("Request to Poseidon failed: #{e.inspect}")
|
||||||
end
|
end
|
||||||
|
|
||||||
def attach_to_execution(command)
|
def attach_to_execution(command, event_loop)
|
||||||
websocket_url = execute_command(command)
|
websocket_url = execute_command(command)
|
||||||
EventMachine.run do
|
socket = Connection.new(websocket_url, self, event_loop)
|
||||||
socket = Connection.new(websocket_url, self)
|
yield(socket)
|
||||||
yield(socket)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def destroy_at_management
|
def destroy_at_management
|
||||||
@ -113,7 +111,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
|||||||
if websocket_url.present?
|
if websocket_url.present?
|
||||||
return websocket_url
|
return websocket_url
|
||||||
else
|
else
|
||||||
raise Runner::Error::UnexpectedResponse.new('Poseidon did not send websocket url')
|
raise Runner::Error::UnexpectedResponse.new('Poseidon did not send a WebSocket URL')
|
||||||
end
|
end
|
||||||
when 400
|
when 400
|
||||||
Runner.destroy(@allocation_id)
|
Runner.destroy(@allocation_id)
|
||||||
@ -132,7 +130,7 @@ class Runner::Strategy::Poseidon < Runner::Strategy
|
|||||||
def decode(raw_event)
|
def decode(raw_event)
|
||||||
JSON.parse(raw_event.data)
|
JSON.parse(raw_event.data)
|
||||||
rescue JSON::ParserError => e
|
rescue JSON::ParserError => e
|
||||||
raise Runner::Error::UnexpectedResponse.new("The websocket message from Poseidon could not be decoded to JSON: #{e.inspect}")
|
raise Runner::Error::UnexpectedResponse.new("The WebSocket message from Poseidon could not be decoded to JSON: #{e.inspect}")
|
||||||
end
|
end
|
||||||
|
|
||||||
def encode(data)
|
def encode(data)
|
||||||
|
Reference in New Issue
Block a user