Skip to content

Commit 1ddfcb3

Browse files
committed
Remove mode option from configuration and rely on the config values
That is, no more "work" or "dispatch". The behaviour is the same as "all", and to get work only or dispatch only it is enough with passing an empty array for dispatchers and workers.
1 parent 654317b commit 1ddfcb3

File tree

9 files changed

+49
-66
lines changed

9 files changed

+49
-66
lines changed

lib/puma/plugin/solid_queue.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def start(launcher)
1414
launcher.events.on_booted do
1515
@solid_queue_pid = fork do
1616
Thread.new { monitor_puma }
17-
SolidQueue::Supervisor.start(mode: :all)
17+
SolidQueue::Supervisor.start
1818
end
1919
end
2020

lib/solid_queue/configuration.rb

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,25 @@ class Configuration
1717
recurring_tasks: []
1818
}
1919

20-
def initialize(mode: :work, load_from: nil)
21-
@mode = mode
20+
def initialize(load_from: nil)
2221
@raw_config = config_from(load_from)
2322
end
2423

2524
def processes
26-
case mode
27-
when :dispatch then dispatchers
28-
when :work then workers
29-
when :all then dispatchers + workers
30-
else raise "Invalid mode #{mode}"
31-
end
25+
dispatchers + workers
3226
end
3327

3428
def workers
35-
if mode.in? %i[ work all]
36-
workers_options.flat_map do |worker_options|
37-
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
38-
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
39-
end
40-
else
41-
[]
29+
workers_options.flat_map do |worker_options|
30+
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
31+
processes.times.map { Worker.new(**worker_options.with_defaults(WORKER_DEFAULTS)) }
4232
end
4333
end
4434

4535
def dispatchers
46-
if mode.in? %i[ dispatch all]
47-
dispatchers_options.map do |dispatcher_options|
48-
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
49-
50-
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
51-
end
36+
dispatchers_options.map do |dispatcher_options|
37+
recurring_tasks = parse_recurring_tasks dispatcher_options[:recurring_tasks]
38+
Dispatcher.new **dispatcher_options.merge(recurring_tasks: recurring_tasks).with_defaults(DISPATCHER_DEFAULTS)
5239
end
5340
end
5441

@@ -58,7 +45,7 @@ def max_number_of_threads
5845
end
5946

6047
private
61-
attr_reader :raw_config, :mode
48+
attr_reader :raw_config
6249

6350
DEFAULT_CONFIG_FILE_PATH = "config/solid_queue.yml"
6451

lib/solid_queue/supervisor.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ class Supervisor < Processes::Base
55
include Processes::Signals
66

77
class << self
8-
def start(mode: :work, load_configuration_from: nil)
8+
def start(mode: :fork, load_configuration_from: nil)
99
SolidQueue.supervisor = true
10-
configuration = Configuration.new(mode: mode, load_from: load_configuration_from)
10+
configuration = Configuration.new(load_from: load_configuration_from)
1111

1212
new(*configuration.processes).start
1313
end

lib/solid_queue/tasks.rb

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,6 @@
11
namespace :solid_queue do
22
desc "start solid_queue supervisor to dispatch and process jobs"
33
task start: :environment do
4-
SolidQueue::Supervisor.start(mode: :all)
5-
end
6-
7-
desc "start solid_queue supervisor to process jobs"
8-
task work: :environment do
9-
SolidQueue::Supervisor.start(mode: :work)
10-
end
11-
12-
desc "start solid_queue dispatcher to enqueue scheduled jobs"
13-
task dispatch: :environment do
14-
SolidQueue::Supervisor.start(mode: :dispatch)
4+
SolidQueue::Supervisor.start
155
end
166
end

test/integration/concurrency_controls_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
1111
default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 }
1212
dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 }
1313

14-
@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], dispatchers: [ dispatcher ] })
14+
@pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ], dispatchers: [ dispatcher ] })
1515

1616
wait_for_registered_processes(5, timeout: 0.5.second) # 3 workers working the default queue + dispatcher + supervisor
1717
end

test/integration/processes_lifecycle_test.rb

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
66
self.use_transactional_tests = false
77

88
setup do
9-
@pid = run_supervisor_as_fork
9+
config_as_hash = { workers: [ { queues: :background }, { queues: :default, threads: 5 } ], dispatchers: [] }
10+
@pid = run_supervisor_as_fork(load_configuration_from: config_as_hash)
1011

1112
wait_for_registered_processes(3, timeout: 3.second)
1213
assert_registered_workers_for(:background, :default)
@@ -17,6 +18,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
1718

1819
SolidQueue::Process.destroy_all
1920
SolidQueue::Job.destroy_all
21+
JobResult.delete_all
2022
end
2123

2224
test "enqueue jobs in multiple queues" do
@@ -142,13 +144,13 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
142144
end
143145

144146
test "process some jobs that raise errors" do
145-
enqueue_store_result_job("no error", :background, 2)
146-
enqueue_store_result_job("no error", :default, 2)
147-
error1 = enqueue_store_result_job("error", :background, 1, exception: RuntimeError)
148-
enqueue_store_result_job("no error", :background, 1, pause: 0.03)
149-
error2 = enqueue_store_result_job("error", :background, 1, exception: RuntimeError, pause: 0.05)
150-
enqueue_store_result_job("no error", :default, 2, pause: 0.01)
151-
error3 = enqueue_store_result_job("error", :default, 1, exception: RuntimeError)
147+
2.times { enqueue_store_result_job("no error", :background) }
148+
2.times { enqueue_store_result_job("no error", :default) }
149+
error1 = enqueue_store_result_job("error", :background, exception: RuntimeError)
150+
enqueue_store_result_job("no error", :background, pause: 0.03)
151+
error2 = enqueue_store_result_job("error", :background, exception: RuntimeError, pause: 0.05)
152+
2.times { enqueue_store_result_job("no error", :default, pause: 0.01) }
153+
error3 = enqueue_store_result_job("error", :default, exception: RuntimeError)
152154

153155
wait_for_jobs_to_finish_for(0.5.seconds)
154156

@@ -165,12 +167,15 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
165167
end
166168

167169
test "process a job that exits" do
168-
enqueue_store_result_job("no exit", :background, 2)
169-
enqueue_store_result_job("no exit", :default, 2)
170-
enqueue_store_result_job("paused no exit", :default, 1, pause: 0.5)
171-
exit_job = enqueue_store_result_job("exit", :background, 1, exit: true, pause: 0.2)
172-
pause_job = enqueue_store_result_job("exit", :background, 1, pause: 0.3)
173-
enqueue_store_result_job("no exit", :background, 2)
170+
2.times do
171+
enqueue_store_result_job("no exit", :background)
172+
enqueue_store_result_job("no exit", :default)
173+
end
174+
enqueue_store_result_job("paused no exit", :default, pause: 0.5)
175+
exit_job = enqueue_store_result_job("exit", :background, exit: true, pause: 0.2)
176+
pause_job = enqueue_store_result_job("exit", :background, pause: 0.3)
177+
178+
2.times { enqueue_store_result_job("no exit", :background) }
174179

175180
wait_for_jobs_to_finish_for(5.seconds)
176181

@@ -215,10 +220,8 @@ def assert_no_registered_workers
215220
assert_empty find_processes_registered_as("Worker").to_a
216221
end
217222

218-
def enqueue_store_result_job(value, queue_name = :background, count = 1, **options)
219-
count.times.collect { StoreResultJob.set(queue: queue_name).perform_later(value, **options) }.then do |jobs|
220-
jobs.one? ? jobs.first : jobs
221-
end
223+
def enqueue_store_result_job(value, queue_name = :background, **options)
224+
StoreResultJob.set(queue: queue_name).perform_later(value, **options)
222225
end
223226

224227
def assert_completed_job_results(value, queue_name = :background, count = 1)

test/integration/recurring_tasks_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class RecurringTasksTest < ActiveSupport::TestCase
66
self.use_transactional_tests = false
77

88
setup do
9-
@pid = run_supervisor_as_fork(mode: :all)
9+
@pid = run_supervisor_as_fork
1010
# 1 supervisor + 2 workers + 1 dispatcher
1111
wait_for_registered_processes(4, timeout: 3.second)
1212
end

test/unit/configuration_test.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
class ConfigurationTest < ActiveSupport::TestCase
44
test "default configuration to process all queues and dispatch" do
55
configuration = stub_const(SolidQueue::Configuration, :DEFAULT_CONFIG_FILE_PATH, "non/existent/path") do
6-
SolidQueue::Configuration.new(mode: :all)
6+
SolidQueue::Configuration.new
77
end
88

99
assert_equal 2, configuration.processes.count
@@ -16,7 +16,7 @@ class ConfigurationTest < ActiveSupport::TestCase
1616
end
1717

1818
test "read configuration from default file" do
19-
configuration = SolidQueue::Configuration.new(mode: :all)
19+
configuration = SolidQueue::Configuration.new
2020
assert 3, configuration.processes.count
2121
assert_equal 2, configuration.workers.count
2222
assert_equal 1, configuration.dispatchers.count
@@ -26,7 +26,7 @@ class ConfigurationTest < ActiveSupport::TestCase
2626
background_worker = { queues: "background", polling_interval: 10 }
2727
dispatcher = { batch_size: 100 }
2828
config_as_hash = { workers: [ background_worker, background_worker ], dispatchers: [ dispatcher ] }
29-
configuration = SolidQueue::Configuration.new(mode: :all, load_from: config_as_hash)
29+
configuration = SolidQueue::Configuration.new(load_from: config_as_hash)
3030

3131
assert_equal 1, configuration.dispatchers.count
3232
dispatcher = configuration.dispatchers.first
@@ -39,14 +39,14 @@ class ConfigurationTest < ActiveSupport::TestCase
3939
end
4040

4141
test "max number of threads" do
42-
configuration = SolidQueue::Configuration.new(mode: :all)
42+
configuration = SolidQueue::Configuration.new
4343
assert 7, configuration.max_number_of_threads
4444
end
4545

4646
test "mulitple workers with the same configuration" do
4747
background_worker = { queues: "background", polling_interval: 10, processes: 3 }
4848
config_as_hash = { workers: [ background_worker ] }
49-
configuration = SolidQueue::Configuration.new(mode: :work, load_from: config_as_hash)
49+
configuration = SolidQueue::Configuration.new(load_from: config_as_hash)
5050

5151
assert_equal 3, configuration.workers.count
5252
assert_equal [ "background" ], configuration.workers.flat_map(&:queues).uniq

test/unit/supervisor_test.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,26 @@ class SupervisorTest < ActiveSupport::TestCase
1616
SolidQueue::Process.destroy_all
1717
end
1818

19-
test "start in work mode (default)" do
19+
test "start" do
2020
pid = run_supervisor_as_fork
2121
wait_for_registered_processes(4)
2222

2323
assert_registered_supervisor(pid)
2424
assert_registered_workers(2, supervisor_pid: pid)
25+
assert_registered_dispatcher(supervisor_pid: pid)
2526

2627
terminate_process(pid)
2728

2829
assert_no_registered_processes
2930
end
3031

31-
test "start in dispatch mode" do
32-
pid = run_supervisor_as_fork(mode: :dispatch)
33-
wait_for_registered_processes(4)
32+
test "start with provided configuration" do
33+
config_as_hash = { workers: [], dispatchers: [ { batch_size: 100 } ] }
34+
pid = run_supervisor_as_fork(load_configuration_from: config_as_hash)
35+
wait_for_registered_processes(2) # supervisor + dispatcher
3436

3537
assert_registered_supervisor(pid)
38+
assert_registered_workers(0)
3639
assert_registered_dispatcher(supervisor_pid: pid)
3740

3841
terminate_process(pid)
@@ -114,7 +117,7 @@ class SupervisorTest < ActiveSupport::TestCase
114117
end
115118

116119
private
117-
def assert_registered_workers(count, supervisor_pid:, **metadata)
120+
def assert_registered_workers(count, supervisor_pid: nil, **metadata)
118121
skip_active_record_query_cache do
119122
workers = find_processes_registered_as("Worker")
120123
assert_equal count, workers.count

0 commit comments

Comments
 (0)