Skip to content

Commit a4467db

Browse files
committed
Split recurring scheduling responsibilities into another process
A Scheduler, instead of Dispatcher, that will be created only when there are recurring tasks to run.
1 parent 03334b1 commit a4467db

16 files changed

+148
-122
lines changed

lib/solid_queue/cli.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ class Cli < Thor
1414
desc: "Path to recurring schedule definition",
1515
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"
1616

17-
class_option :dispatch_only, type: :boolean, default: false
1817
class_option :work_only, type: :boolean, default: false
1918
class_option :skip_recurring, type: :boolean, default: false
2019

lib/solid_queue/configuration.rb

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@ def initialize(**options)
3030
end
3131

3232
def configured_processes
33-
case
34-
when only_work? then workers
35-
when only_dispatch? then dispatchers
33+
if only_work? then workers
3634
else
37-
dispatchers + workers
35+
dispatchers + workers + schedulers
3836
end
3937
end
4038

@@ -81,6 +79,14 @@ def dispatchers
8179
end
8280
end
8381

82+
def schedulers
83+
if !skip_recurring_tasks? && recurring_tasks.any?
84+
[ Process.new(:scheduler, recurring_tasks: recurring_tasks) ]
85+
else
86+
[]
87+
end
88+
end
89+
8490
def workers_options
8591
@workers_options ||= processes_config.fetch(:workers, [])
8692
.map { |options| options.dup.symbolize_keys }
@@ -89,22 +95,6 @@ def workers_options
8995
def dispatchers_options
9096
@dispatchers_options ||= processes_config.fetch(:dispatchers, [])
9197
.map { |options| options.dup.symbolize_keys }
92-
.then { |options| with_recurring_tasks(options) }
93-
end
94-
95-
def with_recurring_tasks(options)
96-
if !skip_recurring_tasks? && recurring_tasks.any?
97-
options.sort_by! { |attrs| attrs[:polling_interval] }
98-
99-
if least_busy_dispatcher = options.pop
100-
least_busy_dispatcher[:recurring_tasks] = recurring_tasks
101-
options.push(least_busy_dispatcher)
102-
else
103-
[ DISPATCHER_DEFAULTS.merge(recurring_tasks: recurring_tasks) ]
104-
end
105-
else
106-
options
107-
end
10898
end
10999

110100
def recurring_tasks
@@ -128,7 +118,7 @@ def recurring_tasks_config
128118
def config_from(file_or_hash, keys: [], fallback: {}, env: Rails.env)
129119
load_config_from(file_or_hash).then do |config|
130120
config = config[env.to_sym] ? config[env.to_sym] : config
131-
config = config.slice(*keys) if keys.any?
121+
config = config.slice(*keys) if keys.any? && config.present?
132122

133123
if config.empty? then fallback
134124
else

lib/solid_queue/dispatcher.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,23 @@
22

33
module SolidQueue
44
class Dispatcher < Processes::Poller
5-
attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule
5+
attr_accessor :batch_size, :concurrency_maintenance
66

7-
after_boot :start_concurrency_maintenance, :schedule_recurring_tasks
8-
before_shutdown :stop_concurrency_maintenance, :unschedule_recurring_tasks
7+
after_boot :start_concurrency_maintenance
8+
before_shutdown :stop_concurrency_maintenance
99

1010
def initialize(**options)
1111
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
1212

1313
@batch_size = options[:batch_size]
1414

1515
@concurrency_maintenance = ConcurrencyMaintenance.new(options[:concurrency_maintenance_interval], options[:batch_size]) if options[:concurrency_maintenance]
16-
@recurring_schedule = RecurringSchedule.new(options[:recurring_tasks])
1716

1817
super(**options)
1918
end
2019

2120
def metadata
22-
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.task_keys.presence)
21+
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval)
2322
end
2423

2524
private

lib/solid_queue/log_subscriber.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def register_process(event)
9494
if error = event.payload[:error]
9595
warn formatted_event(event, action: "Error registering #{process_kind}", **attributes.merge(error: formatted_error(error)))
9696
else
97-
info formatted_event(event, action: "Register #{process_kind}", **attributes)
97+
debug formatted_event(event, action: "Register #{process_kind}", **attributes)
9898
end
9999
end
100100

@@ -114,7 +114,7 @@ def deregister_process(event)
114114
if error = event.payload[:error]
115115
warn formatted_event(event, action: "Error deregistering #{process.kind}", **attributes.merge(error: formatted_error(error)))
116116
else
117-
info formatted_event(event, action: "Deregister #{process.kind}", **attributes)
117+
debug formatted_event(event, action: "Deregister #{process.kind}", **attributes)
118118
end
119119
end
120120

lib/solid_queue/processes/poller.rb

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ def poll
4141
raise NotImplementedError
4242
end
4343

44-
def shutdown
45-
end
46-
4744
def with_polling_volume
4845
SolidQueue.instrument(:polling) do
4946
if SolidQueue.silence_polling? && ActiveRecord::Base.logger

lib/solid_queue/processes/runnable.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ def start
1818

1919
def stop
2020
@stopped = true
21+
wake_up if running_async?
22+
2123
@thread&.join
2224
end
2325

@@ -61,6 +63,9 @@ def all_work_completed?
6163
false
6264
end
6365

66+
def shutdown
67+
end
68+
6469
def set_procline
6570
end
6671

lib/solid_queue/scheduler.rb

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Scheduler < Processes::Base
5+
include Processes::Runnable
6+
7+
attr_accessor :recurring_schedule
8+
9+
after_boot :schedule_recurring_tasks
10+
before_shutdown :unschedule_recurring_tasks
11+
12+
def initialize(recurring_tasks:, **options)
13+
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
14+
15+
super(**options)
16+
end
17+
18+
def metadata
19+
super.merge(recurring_schedule: recurring_schedule.task_keys.presence)
20+
end
21+
22+
private
23+
SLEEP_INTERVAL = 300 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks
24+
25+
def run
26+
loop do
27+
break if shutting_down?
28+
29+
interruptible_sleep(SLEEP_INTERVAL)
30+
end
31+
ensure
32+
SolidQueue.instrument(:shutdown_process, process: self) do
33+
run_callbacks(:shutdown) { shutdown }
34+
end
35+
end
36+
37+
def schedule_recurring_tasks
38+
recurring_schedule.schedule_tasks
39+
end
40+
41+
def unschedule_recurring_tasks
42+
recurring_schedule.unschedule_tasks
43+
end
44+
45+
def all_work_completed?
46+
recurring_schedule.empty?
47+
end
48+
49+
def set_procline
50+
procline "scheduling #{recurring_schedule.task_keys.join(",")}"
51+
end
52+
end
53+
end

lib/solid_queue/dispatcher/recurring_schedule.rb renamed to lib/solid_queue/scheduler/recurring_schedule.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# frozen_string_literal: true
22

33
module SolidQueue
4-
class Dispatcher::RecurringSchedule
4+
class Scheduler::RecurringSchedule
55
include AppExecutor
66

77
attr_reader :configured_tasks, :scheduled_tasks
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
default: &default
1+
development: []
22

3-
development:
4-
<<: *default
5-
6-
test:
7-
<<: *default
3+
test: []

test/integration/recurring_tasks_test.rb

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ class RecurringTasksTest < ActiveSupport::TestCase
66
self.use_transactional_tests = false
77

88
setup do
9-
@pid = run_supervisor_as_fork
10-
# 1 supervisor + 2 workers + 1 dispatcher
11-
wait_for_registered_processes(4, timeout: 3.second)
9+
@pid = run_supervisor_as_fork(skip_recurring: false)
10+
# 1 supervisor + 2 workers + 1 dispatcher + 1 scheduler
11+
wait_for_registered_processes(5, timeout: 3.second)
1212
end
1313

1414
teardown do
@@ -52,29 +52,29 @@ class RecurringTasksTest < ActiveSupport::TestCase
5252
task = SolidQueue::RecurringTask.find_by(key: "periodic_store_result")
5353
task.update!(class_name: "StoreResultJob", schedule: "every minute", arguments: [ 42 ])
5454

55-
@pid = run_supervisor_as_fork
56-
wait_for_registered_processes(4, timeout: 3.second)
55+
@pid = run_supervisor_as_fork(skip_recurring: false)
56+
wait_for_registered_processes(5, timeout: 3.second)
5757

5858
# Wait for concurrency schedule loading after process registration
5959
sleep(0.5)
6060

6161
assert_recurring_tasks configured_task
6262

6363
another_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: [ 42 ] } }
64-
dispatcher1 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: another_task).tap(&:start)
65-
wait_for_registered_processes(5, timeout: 1.second)
64+
scheduler1 = SolidQueue::Scheduler.new(recurring_tasks: another_task).tap(&:start)
65+
wait_for_registered_processes(6, timeout: 1.second)
6666

6767
assert_recurring_tasks configured_task.merge(another_task)
6868

6969
updated_task = { example_task: { class: "AddToBufferJob", schedule: "every minute" } }
70-
dispatcher2 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: updated_task).tap(&:start)
71-
wait_for_registered_processes(6, timeout: 1.second)
70+
scheduler2 = SolidQueue::Scheduler.new(recurring_tasks: updated_task).tap(&:start)
71+
wait_for_registered_processes(7, timeout: 1.second)
7272

7373
assert_recurring_tasks configured_task.merge(updated_task)
7474

7575
terminate_process(@pid)
76-
dispatcher1.stop
77-
dispatcher2.stop
76+
scheduler1.stop
77+
scheduler2.stop
7878
end
7979

8080
private

0 commit comments

Comments
 (0)