
It seems like we are still not setting the Sentry hub correctly. Therefore, we just use the hub passed with the span to add the breadcrumbs.
297 lines
11 KiB
Ruby
297 lines
11 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'faye/websocket/client'
|
|
require 'json_schemer'
|
|
|
|
class Runner::Connection
|
|
# These are events for which callbacks can be registered.
|
|
EVENTS = %i[start exit stdout stderr files].freeze
|
|
WEBSOCKET_MESSAGE_TYPES = %i[start stdout stderr error timeout exit].freeze
|
|
BACKEND_OUTPUT_SCHEMA = JSONSchemer.schema(JSON.parse(File.read('lib/runner/backend-output.schema.json')))
|
|
SENTRY_OP_NAME = 'websocket.client'
|
|
SENTRY_BREADCRUMB_CATEGORY = 'net.websocket'
|
|
|
|
# @!attribute start_callback
|
|
# @!attribute exit_callback
|
|
# @!attribute stdout_callback
|
|
# @!attribute stderr_callback
|
|
# @!attribute files_callback
|
|
attr_writer :status
|
|
attr_reader :error
|
|
|
|
def initialize(url, strategy, event_loop, locale = I18n.locale)
|
|
Rails.logger.debug { "#{Time.zone.now.getutc.inspect}: Opening connection to #{url}" }
|
|
|
|
sentry_transaction = Sentry.get_current_scope&.get_span
|
|
sentry_span = sentry_transaction&.start_child(op: SENTRY_OP_NAME, start_timestamp: Sentry.utc_now.to_f)
|
|
http_headers = strategy.class.websocket_header.deep_merge sentry_trace_header(sentry_span)
|
|
|
|
@socket = Faye::WebSocket::Client.new(url, [], http_headers)
|
|
@strategy = strategy
|
|
@status = :new
|
|
@event_loop = event_loop
|
|
@locale = locale
|
|
@stdout_buffer = Buffer.new
|
|
@stderr_buffer = Buffer.new
|
|
|
|
# For every event type of Faye WebSockets, the corresponding
|
|
# RunnerConnection method starting with `on_` is called.
|
|
%i[open message error close].each do |event_type|
|
|
@socket.on(event_type) do |event|
|
|
# The initial locale when establishing the connection is used for all callbacks
|
|
I18n.with_locale(@locale) do
|
|
clone_sentry_hub_from_span(sentry_span)
|
|
__send__(:"on_#{event_type}", event, sentry_span)
|
|
rescue StandardError => e
|
|
Sentry.capture_exception(e)
|
|
raise e
|
|
end
|
|
end
|
|
end
|
|
|
|
# This registers empty default callbacks.
|
|
EVENTS.each {|event_type| instance_variable_set(:"@#{event_type}_callback", ->(e) {}) }
|
|
@start_callback = -> {}
|
|
# Fail if no exit status was returned.
|
|
@exit_code = 1
|
|
end
|
|
|
|
# Register a callback based on the event type received from runner management
|
|
def on(event, &block)
|
|
return unless EVENTS.include? event
|
|
|
|
instance_variable_set(:"@#{event}_callback", block)
|
|
end
|
|
|
|
# Send arbitrary data in the WebSocket connection
|
|
def send_data(raw_data)
|
|
encoded_message = encode(raw_data)
|
|
Rails.logger.debug { "#{Time.zone.now.getutc.inspect}: Sending to #{@socket.url}: #{encoded_message.inspect}" }
|
|
# Send the message to the WebSocket connection _immediately_
|
|
# by scheduling it for the next execution of the EventMachine reactor run.
|
|
# Otherwise, the message might be queued causing delays for users.
|
|
EventMachine.next_tick do
|
|
@socket.send(encoded_message)
|
|
end
|
|
end
|
|
|
|
# Close the WebSocket connection
|
|
def close(status)
|
|
return unless active?
|
|
|
|
@status = status
|
|
@socket.close
|
|
end
|
|
|
|
# Check if the WebSocket connection is currently established
|
|
def active?
|
|
@status == :established
|
|
end
|
|
|
|
private
|
|
|
|
def decode(_raw_event)
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def encode(_data)
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def flush_buffers
|
|
@stdout_callback.call @stdout_buffer.flush unless @stdout_buffer.empty?
|
|
@stderr_callback.call @stderr_buffer.flush unless @stderr_buffer.empty?
|
|
end
|
|
|
|
def ignored_sequence?(event_data)
|
|
case event_data
|
|
when /#exit\r/, /\s*{"cmd": "exit"}\r/
|
|
# Do not forward. We will wait for the confirmed exit sent by the runner management.
|
|
true
|
|
else
|
|
false
|
|
end
|
|
end
|
|
|
|
# === WebSocket Callbacks
|
|
# These callbacks are executed based on events indicated by Faye WebSockets and are
|
|
# independent of the JSON specification that is used within the WebSocket once established.
|
|
|
|
def on_message(raw_event, _sentry_span)
|
|
Rails.logger.debug { "#{Time.zone.now.getutc.inspect}: Receiving from #{@socket.url}: #{raw_event.data.inspect}" }
|
|
event = decode(raw_event.data)
|
|
return unless BACKEND_OUTPUT_SCHEMA.valid?(event)
|
|
|
|
event = event.deep_symbolize_keys
|
|
message_type = event[:type].to_sym
|
|
if WEBSOCKET_MESSAGE_TYPES.include?(message_type)
|
|
__send__("handle_#{message_type}", event)
|
|
else
|
|
@error = Runner::Error::UnexpectedResponse.new("Unknown WebSocket message type: #{message_type}")
|
|
close(:error)
|
|
end
|
|
end
|
|
|
|
def on_open(_event, _sentry_span)
|
|
Rails.logger.debug { "#{Time.zone.now.getutc.inspect}: Established connection to #{@socket.url}" }
|
|
@status = :established
|
|
@start_callback.call
|
|
end
|
|
|
|
def on_error(event, _sentry_span)
|
|
# In case of an WebSocket error, the connection will be closed by Faye::WebSocket::Client automatically.
|
|
# Thus, no further handling is required here (the user will get notified).
|
|
@status = :error
|
|
@error = Runner::Error::Unknown.new("The WebSocket connection to #{@socket.url} was closed with an error: #{event.message}")
|
|
end
|
|
|
|
def on_close(event, sentry_span)
|
|
Rails.logger.debug { "#{Time.zone.now.getutc.inspect}: Closing connection to #{@socket.url} with status: #{@status}" }
|
|
record_sentry_breadcrumb(sentry_span, event)
|
|
end_sentry_span(sentry_span, event)
|
|
flush_buffers
|
|
|
|
# Depending on the status, we might want to destroy the runner at management.
|
|
# This ensures we get a new runner on the next request.
|
|
# All failing runs, those cancelled by the user or those hitting a timeout or error are subject to this mechanism.
|
|
|
|
case @status
|
|
when :timeout
|
|
# The runner will destroyed. For the DockerContainerPool, this mechanism is necessary.
|
|
# However, it might not be required for Poseidon.
|
|
@strategy.destroy_at_management
|
|
@error = Runner::Error::ExecutionTimeout.new('Execution exceeded its time limit')
|
|
when :out_of_memory
|
|
# This status is only used by Poseidon (with gVisor).
|
|
# The runner will be destroyed (and recreated) automatically.
|
|
@error = Runner::Error::OutOfMemory.new('Execution exceeded its memory limit')
|
|
when :terminated_by_codeocean, :terminated_by_management
|
|
# Poseidon (without gVisor) and DockerContainerPool do not handle memory limits explicitly.
|
|
# Instead, they signal that the program was terminated with exit code 137 (128 + 9).
|
|
if @exit_code == 137
|
|
@error = Runner::Error::OutOfMemory.new('Execution exceeded its memory limit')
|
|
else
|
|
@exit_callback.call @exit_code
|
|
list_filesystem
|
|
end
|
|
when :terminated_by_client, :error
|
|
@strategy.destroy_at_management
|
|
else # :established
|
|
# If the runner is killed by the DockerContainerPool after the maximum allowed time per user and
|
|
# while the owning user is running an execution, the command execution stops and log output is incomplete.
|
|
@strategy.destroy_at_management
|
|
@error = Runner::Error::Unknown.new('Execution terminated with an unknown reason')
|
|
end
|
|
rescue Runner::Error::FaradayError, Runner::Error::UnexpectedResponse => e
|
|
# In some circumstances, the runner might be destroyed which could fail.
|
|
# In these cases, we catch the error to pass it to the callee through the existing error handling.
|
|
@error = e
|
|
ensure
|
|
Rails.logger.debug { "#{Time.zone.now.getutc.inspect}: Closed connection to #{@socket.url} with status: #{@status}" }
|
|
@event_loop.stop
|
|
end
|
|
|
|
def list_filesystem
|
|
files = {'files' => []}
|
|
begin
|
|
# Retrieve files from runner management ONLY IF the callback was defined outside of this class.
|
|
# Otherwise, we would call our default callback and retrieve the files without any further processing.
|
|
files = @strategy.retrieve_files if @files_callback.source_location.first != __FILE__
|
|
rescue Runner::Error::RunnerNotFound, Runner::Error::WorkspaceError
|
|
# Ignore errors when retrieving files. This is not critical and a suitable default is already provided.
|
|
ensure
|
|
@files_callback.call files
|
|
end
|
|
end
|
|
private :list_filesystem
|
|
|
|
# === Message Handlers
|
|
# Each message type indicated by the +type+ attribute in the JSON
|
|
# sent be the runner management has a dedicated method.
|
|
# Context:: All registered handlers are executed in the scope of
|
|
# the bindings they had where they were registered.
|
|
# Information not stored in the binding, such as the
|
|
# locale or call stack are not available during execution!
|
|
|
|
def handle_exit(event)
|
|
@status = :terminated_by_management
|
|
@exit_code = event[:data]
|
|
end
|
|
|
|
def handle_stdout(event)
|
|
@stdout_buffer.store event[:data]
|
|
@stdout_buffer.events.each do |event_data|
|
|
@stdout_callback.call event_data unless ignored_sequence? event_data
|
|
end
|
|
end
|
|
|
|
def handle_stderr(event)
|
|
@stderr_buffer.store event[:data]
|
|
@stderr_buffer.events.each do |event_data|
|
|
@stderr_callback.call event_data unless ignored_sequence? event_data
|
|
end
|
|
end
|
|
|
|
def handle_error(event)
|
|
# Poseidon (with gVisor enabled!) sends an error message when the execution exceeds its memory limit.
|
|
# This is not an error in the sense of the runner management but rather a message.
|
|
# We handle it here to avoid the error handling in the default case.
|
|
return @status = :out_of_memory if event[:data] == 'the allocation was OOM Killed'
|
|
|
|
# In case of a (Nomad) error during execution, the runner management will notify us with an error message here.
|
|
# This shouldn't happen too often and can be considered an internal server error by the runner management.
|
|
# More information is available in the logs of the runner management or the orchestrator (e.g., Nomad).
|
|
Sentry.set_extras(event: event.inspect)
|
|
Sentry.capture_message("An error occurred during code execution while being connected to #{@socket.url}.")
|
|
end
|
|
|
|
def handle_start(_event)
|
|
# The execution just started as requested. This is an informal message and no further processing is required.
|
|
end
|
|
|
|
def handle_timeout(_event)
|
|
@status = :timeout
|
|
# The runner management stopped the execution as the permitted execution time was exceeded.
|
|
# We set the status here and wait for the connection to be closed (by the runner management).
|
|
end
|
|
|
|
# The methods below are inspired by the Sentry::Net:HTTP class
|
|
# and adapted to the Websocket protocol running with EventMachine.
|
|
|
|
def clone_sentry_hub_from_span(sentry_span)
|
|
Thread.current.thread_variable_set(Sentry::THREAD_LOCAL, sentry_span.transaction.hub.clone) if sentry_span
|
|
end
|
|
|
|
def sentry_trace_header(sentry_span)
|
|
return {} unless sentry_span
|
|
|
|
{
|
|
headers: Sentry.get_trace_propagation_headers,
|
|
}
|
|
end
|
|
|
|
def end_sentry_span(sentry_span, event)
|
|
return unless sentry_span
|
|
|
|
sentry_span.set_description("WebSocket #{@socket.url}")
|
|
sentry_span.set_data(:status, event.code.to_i)
|
|
sentry_span.finish(end_timestamp: Sentry.utc_now.to_f)
|
|
end
|
|
|
|
def record_sentry_breadcrumb(sentry_span, event)
|
|
return unless Sentry.initialized? && Sentry.configuration.breadcrumbs_logger.include?(:http_logger)
|
|
|
|
crumb = Sentry::Breadcrumb.new(
|
|
level: :info,
|
|
category: SENTRY_BREADCRUMB_CATEGORY,
|
|
type: :info,
|
|
data: {
|
|
close_status: event.code.to_i,
|
|
connection_status: @status,
|
|
url: @socket.url,
|
|
}
|
|
)
|
|
sentry_span.transaction.hub.add_breadcrumb(crumb)
|
|
end
|
|
end
|