Move EventMachine initialization to Runner::EventLoop
This commit is contained in:

committed by
Sebastian Serth

parent
9e2cff7558
commit
e752df1b3c
@ -41,7 +41,6 @@ class Runner < ApplicationRecord
|
|||||||
end
|
end
|
||||||
|
|
||||||
def attach_to_execution(command, &block)
|
def attach_to_execution(command, &block)
|
||||||
ensure_event_machine
|
|
||||||
starting_time = Time.zone.now
|
starting_time = Time.zone.now
|
||||||
begin
|
begin
|
||||||
# As the EventMachine reactor is probably shared with other threads, we cannot use EventMachine.run with
|
# As the EventMachine reactor is probably shared with other threads, we cannot use EventMachine.run with
|
||||||
@ -66,22 +65,6 @@ class Runner < ApplicationRecord
|
|||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
# If there are multiple threads trying to connect to the WebSocket of their execution at the same time,
|
|
||||||
# the Faye WebSocket connections will use the same reactor. We therefore only need to start an EventMachine
|
|
||||||
# if there isn't a running reactor yet.
|
|
||||||
# See this StackOverflow answer: https://stackoverflow.com/a/8247947
|
|
||||||
def ensure_event_machine
|
|
||||||
unless EventMachine.reactor_running? && EventMachine.reactor_thread.alive?
|
|
||||||
event_loop = Runner::EventLoop.new
|
|
||||||
Thread.new do
|
|
||||||
EventMachine.run { event_loop.stop }
|
|
||||||
ensure
|
|
||||||
ActiveRecord::Base.connection_pool.release_connection
|
|
||||||
end
|
|
||||||
event_loop.wait
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def request_id
|
def request_id
|
||||||
request_new_id if runner_id.blank?
|
request_new_id if runner_id.blank?
|
||||||
end
|
end
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
class Runner::EventLoop
|
class Runner::EventLoop
|
||||||
def initialize
|
def initialize
|
||||||
@queue = Queue.new
|
@queue = Queue.new
|
||||||
|
ensure_event_machine
|
||||||
end
|
end
|
||||||
|
|
||||||
# wait waits until another thread calls stop on this EventLoop.
|
# wait waits until another thread calls stop on this EventLoop.
|
||||||
@ -19,4 +20,22 @@ class Runner::EventLoop
|
|||||||
def stop
|
def stop
|
||||||
@queue.push nil if @queue.empty?
|
@queue.push nil if @queue.empty?
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
# If there are multiple threads trying to connect to the WebSocket of their execution at the same time,
|
||||||
|
# the Faye WebSocket connections will use the same reactor. We therefore only need to start an EventMachine
|
||||||
|
# if there isn't a running reactor yet.
|
||||||
|
# See this StackOverflow answer: https://stackoverflow.com/a/8247947
|
||||||
|
def ensure_event_machine
|
||||||
|
unless EventMachine.reactor_running? && EventMachine.reactor_thread.alive?
|
||||||
|
queue = Queue.new
|
||||||
|
Thread.new do
|
||||||
|
EventMachine.run { queue.push nil }
|
||||||
|
ensure
|
||||||
|
ActiveRecord::Base.connection_pool.release_connection
|
||||||
|
end
|
||||||
|
queue.pop
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user