Skip to content

Commit 3e4c0f0

Browse files
committed
Track recurring executions to prevent enqueuing the same task more than once
Only when the recurring job being enqueued is using Solid Queue as the adapter. This supports other adapters as well, but in that case we can't guarantee unique runs of the same task at the same time.
1 parent 8726b39 commit 3e4c0f0

File tree

8 files changed

+58
-10
lines changed

8 files changed

+58
-10
lines changed

app/models/solid_queue/job.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module SolidQueue
44
class Job < Record
5-
include Executable
5+
include Executable, Recurrable, Clearable
66

77
serialize :arguments, coder: JSON
88

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ module Executable
66
extend ActiveSupport::Concern
77

88
included do
9-
include Clearable, ConcurrencyControls, Schedulable
9+
include ConcurrencyControls, Schedulable
1010

1111
has_one :ready_execution
1212
has_one :claimed_execution
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Job
5+
module Recurrable
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
has_one :recurring_execution
10+
end
11+
12+
private
13+
def execution
14+
super || recurring_execution
15+
end
16+
end
17+
end
18+
end
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class RecurringExecution < Execution
5+
def self.record(task_key, run_at, &block)
6+
transaction do
7+
if job_id = block.call
8+
create!(job_id: job_id, task_key: task_key, run_at: run_at)
9+
end
10+
end
11+
rescue ActiveRecord::RecordNotUnique
12+
SolidQueue.logger.info("[SolidQueue] Skipped recurring task #{task_key} at #{run_at} — already dispatched")
13+
end
14+
end
15+
end

lib/solid_queue/dispatcher/recurring_schedule.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ def inspect
3636

3737
private
3838
def schedule(task)
39-
scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task ]) do |thread_schedule, thread_task|
40-
thread_schedule.load_task(task)
39+
scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at|
40+
thread_schedule.load_task(thread_task)
4141

4242
wrap_in_app_executor do
43-
thread_task.enqueue
43+
thread_task.enqueue(at: thread_task_run_at)
4444
end
4545
end
4646

lib/solid_queue/dispatcher/recurring_task.rb

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,35 @@ def next_time
2929
schedule.next_time.utc
3030
end
3131

32-
def enqueue
33-
SolidQueue.logger.info("[SolidQueue] Dispatching recurring task #{self}")
34-
job_class.perform_later(*arguments)
32+
def enqueue(at:)
33+
if using_solid_queue_adapter?
34+
perform_later_and_record(run_at: at)
35+
else
36+
perform_later
37+
end
3538
end
3639

3740
def valid?
3841
schedule.instance_of?(Fugit::Cron)
3942
end
4043

4144
def to_s
42-
"#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) with schedule #{schedule.original}"
45+
"#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{schedule.original} ]"
4346
end
4447

4548
private
49+
def using_solid_queue_adapter?
50+
job_class.queue_adapter_name.inquiry.solid_queue?
51+
end
52+
53+
def perform_later_and_record(run_at:)
54+
RecurringExecution.record(key, run_at) { perform_later.provider_job_id }
55+
end
56+
57+
def perform_later
58+
job_class.perform_later(*arguments)
59+
end
60+
4661
def job_class
4762
@job_class ||= class_name.safe_constantize
4863
end

test/fixtures/solid_queue/recurring_executions.yml

Whitespace-only changes.

test/unit/dispatcher_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class DispatcherTest < ActiveSupport::TestCase
4646

4747
process = SolidQueue::Process.first
4848
assert_equal "Dispatcher", process.kind
49-
assert_equal [ "AddToBufferJob.perform_later(42) with schedule 0 * * * *" ], process.metadata["recurring_schedule"]
49+
assert_equal [ "AddToBufferJob.perform_later(42) [ 0 * * * * ]" ], process.metadata["recurring_schedule"]
5050

5151
with_recurring_schedule.stop
5252
end

0 commit comments

Comments
 (0)