Skip to content

Commit ebd9b02

Browse files
committed
Configure multiple dispatchers, just like workers
This will be useful later when we support cron jobs and need a specific dispatcher to handle these.
1 parent 8c42242 commit ebd9b02

File tree

4 files changed

+19
-15
lines changed

4 files changed

+19
-15
lines changed

lib/solid_queue/configuration.rb

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ def initialize(mode: :work, load_from: nil)
2222

2323
def processes
2424
case mode
25-
when :dispatch then dispatcher
25+
when :dispatch then dispatchers
2626
when :work then workers
27-
when :all then [ dispatcher ] + workers
27+
when :all then dispatchers + workers
2828
else raise "Invalid mode #{mode}"
2929
end
3030
end
@@ -40,9 +40,11 @@ def workers
4040
end
4141
end
4242

43-
def dispatcher
43+
def dispatchers
4444
if mode.in? %i[ dispatch all]
45-
SolidQueue::Dispatcher.new(**dispatcher_options)
45+
dispatchers_options.flat_map do |dispatcher_options|
46+
SolidQueue::Dispatcher.new(**dispatcher_options)
47+
end
4648
end
4749
end
4850

@@ -62,11 +64,13 @@ def config_from(file_or_hash, env: Rails.env)
6264
end
6365

6466
def workers_options
65-
@workers_options ||= (raw_config[:workers] || [ WORKER_DEFAULTS ]).map { |options| options.dup.symbolize_keys }
67+
@workers_options ||= (raw_config[:workers] || [ WORKER_DEFAULTS ])
68+
.map { |options| options.dup.symbolize_keys }
6669
end
6770

68-
def dispatcher_options
69-
(raw_config[:dispatcher] || {}).with_defaults(DISPATCHER_DEFAULTS)
71+
def dispatchers_options
72+
@dispatchers_options ||= (raw_config[:dispatchers] || [ DISPATCHER_DEFAULTS ])
73+
.map { |options| options.dup.symbolize_keys }
7074
end
7175

7276
def load_config_from(file_or_hash)

test/dummy/config/solid_queue.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ default: &default
44
threads: 3
55
- queues: default
66
threads: 5
7-
dispatcher:
8-
polling_interval: 1
9-
batch_size: 500
7+
dispatchers:
8+
- polling_interval: 1
9+
batch_size: 500
1010

1111
development:
1212
<<: *default

test/integration/concurrency_controls_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
1010
default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 }
1111
dispatcher = { polling_interval: 1, batch_size: 200, concurrency_maintenance_interval: 1 }
1212

13-
@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], dispatcher: dispatcher })
13+
@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], dispatchers: [ dispatcher ] })
1414

1515
wait_for_registered_processes(5, timeout: 0.5.second) # 3 workers working the default queue + dispatcher + supervisor
1616
end

test/unit/configuration_test.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,25 @@ class ConfigurationTest < ActiveSupport::TestCase
66
assert_equal 2, configuration.processes.count
77

88
assert_equal 1, configuration.workers.count
9-
assert configuration.dispatcher.present?
9+
assert_equal 1, configuration.dispatchers.count
1010

1111
assert_equal [ "*" ], configuration.workers.first.queues
12-
assert_equal SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size], configuration.dispatcher.batch_size
12+
assert_equal SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size], configuration.dispatchers.first.batch_size
1313
end
1414

1515
test "read configuration from default file" do
1616
configuration = SolidQueue::Configuration.new(mode: :all)
1717
assert 3, configuration.processes.count
1818
assert_equal 2, configuration.workers.count
19-
assert configuration.dispatcher.present?
19+
assert_equal 1, configuration.dispatchers.count
2020
end
2121

2222
test "provide configuration as a hash and fill defaults" do
2323
background_worker = { queues: "background", polling_interval: 10 }
2424
config_as_hash = { workers: [ background_worker, background_worker ] }
2525
configuration = SolidQueue::Configuration.new(mode: :all, load_from: config_as_hash)
2626

27-
assert_equal SolidQueue::Configuration::DISPATCHER_DEFAULTS[:polling_interval], configuration.dispatcher.polling_interval
27+
assert_equal SolidQueue::Configuration::DISPATCHER_DEFAULTS[:polling_interval], configuration.dispatchers.first.polling_interval
2828
assert_equal 2, configuration.workers.count
2929
assert_equal [ "background" ], configuration.workers.flat_map(&:queues).uniq
3030
assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq

0 commit comments

Comments
 (0)