refactored refill of Docker container pool
This commit is contained in:
@ -13,8 +13,11 @@ production:
|
|||||||
<<: *default
|
<<: *default
|
||||||
pool:
|
pool:
|
||||||
active: true
|
active: true
|
||||||
interval: 30
|
refill:
|
||||||
maximum_refill_count: 30
|
async: false
|
||||||
|
batch_size: 32
|
||||||
|
interval: 30
|
||||||
|
timeout: 60
|
||||||
workspace_root: <%= Rails.root.join('tmp', 'files', Rails.env) %>
|
workspace_root: <%= Rails.root.join('tmp', 'files', Rails.env) %>
|
||||||
|
|
||||||
test:
|
test:
|
||||||
|
@ -7,10 +7,8 @@ class DockerContainerPool
|
|||||||
|
|
||||||
def self.clean_up
|
def self.clean_up
|
||||||
@refill_task.try(:shutdown)
|
@refill_task.try(:shutdown)
|
||||||
@containers.each do |key, value|
|
@containers.values.each do |containers|
|
||||||
while !value.empty? do
|
DockerClient.destroy_container(containers.shift) until containers.empty?
|
||||||
DockerClient.destroy_container(value.shift)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -35,18 +33,22 @@ class DockerContainerPool
|
|||||||
end
|
end
|
||||||
|
|
||||||
def self.refill
|
def self.refill
|
||||||
ExecutionEnvironment.all.each do |execution_environment|
|
ExecutionEnvironment.where('pool_size > 0').each do |execution_environment|
|
||||||
refill_count = [execution_environment.pool_size - @containers[execution_environment.id].length, config[:maximum_refill_count]].min
|
if config[:refill][:async]
|
||||||
if refill_count > 0
|
Concurrent::Future.execute { refill_for_execution_environment(execution_environment) }
|
||||||
Concurrent::Future.execute do
|
else
|
||||||
@containers[execution_environment.id] += refill_count.times.map { create_container(execution_environment) }
|
refill_for_execution_environment(execution_environment)
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.refill_for_execution_environment(execution_environment)
|
||||||
|
refill_count = [execution_environment.pool_size - @containers[execution_environment.id].length, config[:refill][:batch_size]].min
|
||||||
|
@containers[execution_environment.id] += refill_count.times.map { create_container(execution_environment) }
|
||||||
|
end
|
||||||
|
|
||||||
def self.start_refill_task
|
def self.start_refill_task
|
||||||
@refill_task = Concurrent::TimerTask.new(execution_interval: config[:interval], run_now: true) { refill }
|
@refill_task = Concurrent::TimerTask.new(execution_interval: config[:refill][:interval], run_now: true, timeout_interval: config[:refill][:timeout]) { refill }
|
||||||
@refill_task.execute
|
@refill_task.execute
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -80,34 +80,47 @@ describe DockerContainerPool do
|
|||||||
end
|
end
|
||||||
|
|
||||||
describe '.refill' do
|
describe '.refill' do
|
||||||
let(:config) { double }
|
before(:each) { @execution_environment.update(pool_size: 10) }
|
||||||
let(:maximum_refill_count) { 5 }
|
|
||||||
|
|
||||||
before(:each) do
|
|
||||||
expect(described_class).to receive(:config).and_return(config)
|
|
||||||
expect(config).to receive(:[]).with(:maximum_refill_count).and_return(maximum_refill_count)
|
|
||||||
end
|
|
||||||
|
|
||||||
after(:each) { described_class.refill }
|
after(:each) { described_class.refill }
|
||||||
|
|
||||||
it 'regards all execution environments' do
|
context 'when configured to work synchronously' do
|
||||||
ExecutionEnvironment.all.each do |execution_environment|
|
before(:each) do
|
||||||
expect(described_class.instance_variable_get(:@containers)).to receive(:[]).with(execution_environment.id).and_call_original
|
expect(described_class).to receive(:config).and_return(refill: {async: false})
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'works synchronously' do
|
||||||
|
expect(described_class).to receive(:refill_for_execution_environment)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context 'when configured to work asynchronously' do
|
||||||
|
before(:each) do
|
||||||
|
expect(described_class).to receive(:config).and_return(refill: {async: true})
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'works asynchronously' do
|
||||||
|
expect_any_instance_of(Concurrent::Future).to receive(:execute) do |future|
|
||||||
|
expect(described_class).to receive(:refill_for_execution_environment)
|
||||||
|
future.instance_variable_get(:@task).call
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '.refill_for_execution_environment' do
|
||||||
|
let(:batch_size) { 5 }
|
||||||
|
|
||||||
|
before(:each) do
|
||||||
|
expect(described_class).to receive(:config).and_return(refill: {batch_size: batch_size})
|
||||||
|
end
|
||||||
|
|
||||||
|
after(:each) { described_class.refill_for_execution_environment(@execution_environment) }
|
||||||
|
|
||||||
context 'with something to refill' do
|
context 'with something to refill' do
|
||||||
before(:each) { @execution_environment.update(pool_size: 10) }
|
before(:each) { @execution_environment.update(pool_size: 10) }
|
||||||
|
|
||||||
it 'complies with the maximum batch size' do
|
it 'complies with the maximum batch size' do
|
||||||
expect_any_instance_of(Concurrent::Future).to receive(:execute) do |future|
|
expect(described_class).to receive(:create_container).with(@execution_environment).exactly(batch_size).times
|
||||||
expect(described_class).to receive(:create_container).with(@execution_environment).exactly(maximum_refill_count).times
|
|
||||||
future.instance_variable_get(:@task).call
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'works asynchronously' do
|
|
||||||
expect(Concurrent::Future).to receive(:execute)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -115,16 +128,23 @@ describe DockerContainerPool do
|
|||||||
before(:each) { @execution_environment.update(pool_size: 0) }
|
before(:each) { @execution_environment.update(pool_size: 0) }
|
||||||
|
|
||||||
it 'does nothing' do
|
it 'does nothing' do
|
||||||
expect(Concurrent::Future).not_to receive(:execute)
|
expect(described_class).not_to receive(:create_container)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe '.start_refill_task' do
|
describe '.start_refill_task' do
|
||||||
|
let(:interval) { 30 }
|
||||||
|
let(:timeout) { 60 }
|
||||||
|
|
||||||
|
before(:each) do
|
||||||
|
expect(described_class).to receive(:config).at_least(:once).and_return(refill: {interval: interval, timeout: timeout})
|
||||||
|
end
|
||||||
|
|
||||||
after(:each) { described_class.start_refill_task }
|
after(:each) { described_class.start_refill_task }
|
||||||
|
|
||||||
it 'creates an asynchronous task' do
|
it 'creates an asynchronous task' do
|
||||||
expect(Concurrent::TimerTask).to receive(:new).and_call_original
|
expect(Concurrent::TimerTask).to receive(:new).with(execution_interval: interval, run_now: true, timeout_interval: timeout).and_call_original
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'executes the task' do
|
it 'executes the task' do
|
||||||
|
Reference in New Issue
Block a user