Skip to content

Commit 6236739

Browse files
committed
Persist and reload recurring tasks before scheduling them
And delete them when the schedule is unloaded.
1 parent 546aaa6 commit 6236739

File tree

7 files changed

+97
-31
lines changed

7 files changed

+97
-31
lines changed

app/models/solid_queue/recurring_task.rb

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# frozen_string_literal: true
2+
13
require "fugit"
24

35
module SolidQueue
@@ -7,6 +9,8 @@ class RecurringTask < Record
79
validate :supported_schedule
810
validate :existing_job_class
911

12+
scope :static, -> { where(static: true) }
13+
1014
class << self
1115
def wrap(args)
1216
args.is_a?(self) ? args : from_configuration(args.first, **args.second)
@@ -51,12 +55,8 @@ def to_s
5155
"#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{parsed_schedule.original} ]"
5256
end
5357

54-
def to_h
55-
{
56-
schedule: schedule,
57-
class_name: class_name,
58-
arguments: arguments
59-
}
58+
def attributes_for_upsert
59+
attributes.without("id", "created_at", "updated_at")
6060
end
6161

6262
private
@@ -72,7 +72,6 @@ def existing_job_class
7272
end
7373
end
7474

75-
7675
def using_solid_queue_adapter?
7776
job_class.queue_adapter_name.inquiry.solid_queue?
7877
end

app/models/solid_queue/recurring_task/arguments.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# frozen_string_literal: true
2+
3+
require "active_job/arguments"
4+
15
module SolidQueue
26
class RecurringTask::Arguments
37
class << self
@@ -6,7 +10,7 @@ def load(data)
610
end
711

812
def dump(data)
9-
ActiveSupport::JSON.dump(ActiveJob::Arguments.serialize(data)) unless data.nil?
13+
ActiveSupport::JSON.dump(ActiveJob::Arguments.serialize(Array(data)))
1014
end
1115
end
1216
end

lib/solid_queue/dispatcher.rb

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ module SolidQueue
44
class Dispatcher < Processes::Poller
55
attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule
66

7-
after_boot :start_concurrency_maintenance, :load_recurring_schedule
8-
before_shutdown :stop_concurrency_maintenance, :unload_recurring_schedule
7+
after_boot :start_concurrency_maintenance, :schedule_recurring_tasks
8+
before_shutdown :stop_concurrency_maintenance, :unschedule_recurring_tasks
99

1010
def initialize(**options)
1111
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
@@ -19,7 +19,7 @@ def initialize(**options)
1919
end
2020

2121
def metadata
22-
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
22+
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.task_keys.presence)
2323
end
2424

2525
private
@@ -38,16 +38,16 @@ def start_concurrency_maintenance
3838
concurrency_maintenance&.start
3939
end
4040

41-
def load_recurring_schedule
42-
recurring_schedule.load_tasks
41+
def schedule_recurring_tasks
42+
recurring_schedule.schedule_tasks
4343
end
4444

4545
def stop_concurrency_maintenance
4646
concurrency_maintenance&.stop
4747
end
4848

49-
def unload_recurring_schedule
50-
recurring_schedule.unload_tasks
49+
def unschedule_recurring_tasks
50+
recurring_schedule.unschedule_tasks
5151
end
5252

5353
def all_work_completed?

lib/solid_queue/dispatcher/recurring_schedule.rb

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,41 +7,56 @@ class Dispatcher::RecurringSchedule
77
attr_reader :configured_tasks, :scheduled_tasks
88

99
def initialize(tasks)
10-
@configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }
10+
@configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
1111
@scheduled_tasks = Concurrent::Hash.new
1212
end
1313

1414
def empty?
1515
configured_tasks.empty?
1616
end
1717

18-
def load_tasks
18+
def schedule_tasks
19+
wrap_in_app_executor do
20+
persist_tasks
21+
reload_tasks
22+
end
23+
1924
configured_tasks.each do |task|
20-
load_task(task)
25+
schedule_task(task)
2126
end
2227
end
2328

24-
def load_task(task)
29+
def schedule_task(task)
2530
scheduled_tasks[task.key] = schedule(task)
2631
end
2732

28-
def unload_tasks
33+
def unschedule_tasks
2934
scheduled_tasks.values.each(&:cancel)
3035
scheduled_tasks.clear
31-
end
3236

33-
def tasks
34-
configured_tasks.each_with_object({}) { |task, hsh| hsh[task.key] = task.to_h }
37+
wrap_in_app_executor { delete_tasks }
3538
end
3639

37-
def inspect
38-
configured_tasks.map(&:to_s).join(" | ")
40+
def task_keys
41+
configured_tasks.map(&:key)
3942
end
4043

4144
private
45+
def persist_tasks
46+
SolidQueue::RecurringTask.upsert_all configured_tasks.map(&:attributes_for_upsert), record_timestamps: true
47+
end
48+
49+
def reload_tasks
50+
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys)
51+
end
52+
53+
def delete_tasks
54+
SolidQueue::RecurringTask.static.delete_all
55+
end
56+
4257
def schedule(task)
4358
scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at|
44-
thread_schedule.load_task(thread_task)
59+
thread_schedule.schedule_task(thread_task)
4560

4661
wrap_in_app_executor do
4762
thread_task.enqueue(at: thread_task_run_at)

test/integration/instrumentation_test.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ class InstrumentationTest < ActiveSupport::TestCase
306306

307307
assert events.size >= 2
308308
events.each do |event|
309-
assert_event event, "enqueue_recurring_task", task: :example_task
309+
assert_event event, "enqueue_recurring_task", task: "example_task"
310310
end
311311

312312
active_job_ids = SolidQueue::Job.all.map(&:active_job_id)
@@ -333,7 +333,7 @@ class InstrumentationTest < ActiveSupport::TestCase
333333
assert events.size >= 1
334334
event = events.last
335335

336-
assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "ActiveRecord::Deadlocked: ActiveRecord::Deadlocked"
336+
assert_event event, "enqueue_recurring_task", task: "example_task", enqueue_error: "ActiveRecord::Deadlocked: ActiveRecord::Deadlocked"
337337
assert event.last[:at].present?
338338
assert_nil event.last[:other_adapter]
339339
end
@@ -354,7 +354,7 @@ class InstrumentationTest < ActiveSupport::TestCase
354354
assert events.size >= 1
355355
event = events.last
356356

357-
assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "All is broken"
357+
assert_event event, "enqueue_recurring_task", task: "example_task", enqueue_error: "All is broken"
358358
assert event.last[:at].present?
359359
assert event.last[:other_adapter]
360360
ensure

test/integration/recurring_tasks_test.rb

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class RecurringTasksTest < ActiveSupport::TestCase
1616

1717
SolidQueue::Process.destroy_all
1818
SolidQueue::Job.destroy_all
19+
SolidQueue::RecurringTask.delete_all
1920
JobResult.delete_all
2021
end
2122

@@ -40,8 +41,56 @@ class RecurringTasksTest < ActiveSupport::TestCase
4041
end
4142
end
4243

44+
test "persist and delete configured tasks" do
45+
configured_task = { periodic_store_result: { class: "StoreResultJob", schedule: "every second" } }
46+
47+
assert_recurring_tasks configured_task
48+
terminate_process(@pid)
49+
assert_recurring_tasks []
50+
51+
SolidQueue::RecurringTask.create!(key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every minute", arguments: [ 42 ])
52+
53+
@pid = run_supervisor_as_fork
54+
wait_for_registered_processes(4, timeout: 3.second)
55+
56+
assert_recurring_tasks configured_task
57+
58+
another_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: [ 42 ] } }
59+
dispatcher1 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: another_task).tap(&:start)
60+
wait_for_registered_processes(5, timeout: 1.second)
61+
62+
assert_recurring_tasks configured_task.merge(another_task)
63+
64+
updated_task = { example_task: { class: "AddToBufferJob", schedule: "every minute" } }
65+
dispatcher2 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: updated_task).tap(&:start)
66+
wait_for_registered_processes(6, timeout: 1.second)
67+
68+
assert_recurring_tasks configured_task.merge(updated_task)
69+
70+
terminate_process(@pid)
71+
dispatcher1.stop
72+
dispatcher2.stop
73+
74+
assert_recurring_tasks []
75+
end
76+
4377
private
4478
def wait_for_jobs_to_be_enqueued(count, timeout: 1.second)
4579
wait_while_with_timeout(timeout) { SolidQueue::Job.count < count }
4680
end
81+
82+
def assert_recurring_tasks(expected_tasks)
83+
skip_active_record_query_cache do
84+
assert_equal expected_tasks.count, SolidQueue::RecurringTask.count
85+
86+
expected_tasks.each do |key, attrs|
87+
task = SolidQueue::RecurringTask.find_by(key: key)
88+
assert task.present?
89+
90+
assert_equal(attrs[:schedule], task.schedule) if attrs[:schedule]
91+
assert_equal(attrs[:class], task.class_name) if attrs[:class]
92+
assert_equal(attrs[:args], task.arguments) if attrs[:args]
93+
end
94+
end
95+
end
4796
end

test/unit/dispatcher_test.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ class DispatcherTest < ActiveSupport::TestCase
5151
assert_equal "Dispatcher", process.kind
5252

5353
schedule_from_metadata = process.metadata["recurring_schedule"]
54-
assert_equal 1, schedule_from_metadata.size
55-
assert_equal({ "class_name" => "AddToBufferJob", "schedule" => "every hour", "arguments" => [ 42 ] }, schedule_from_metadata["example_task"])
54+
assert_equal [ "example_task" ], schedule_from_metadata
5655
ensure
5756
with_recurring_schedule.stop
5857
end

0 commit comments

Comments
 (0)