Skip to content

Commit b13cb2a

Browse files
committed
Fix bug when enqueuing the job for a recurring task fails
As `perform_later` in this case returns `false`, which crashes when we try to access `job_id` on it. `perform_later` returns the job instance with `successfully_enqueued` set to true when it goes well, but `false` when it fails. You can provide a block and the job instance with `successfully_enqueued` set to `false` and an error set in `enqueue_error` is yielded to that block, but it's not returned. At first I tried to make this a bit more sophisticated, passing a block to get the job and populate the instrumentation payload with it, but the code was getting quite cumbersome and the error is instrumented by Active Job anyway, so in the end I just simplified this and opted for handling this case to avoid crashing.
1 parent 4c75a01 commit b13cb2a

File tree

6 files changed

+101
-9
lines changed

6 files changed

+101
-9
lines changed

app/models/solid_queue/recurring_execution.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,21 @@
22

33
module SolidQueue
44
class RecurringExecution < Execution
5+
class AlreadyRecorded < StandardError; end
6+
57
scope :clearable, -> { where.missing(:job) }
68

79
class << self
810
def record(task_key, run_at, &block)
911
transaction do
1012
block.call.tap do |active_job|
11-
create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
13+
if active_job
14+
create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
15+
end
1216
end
1317
end
14-
rescue ActiveRecord::RecordNotUnique
15-
# Task already dispatched
18+
rescue ActiveRecord::RecordNotUnique => e
19+
raise AlreadyRecorded
1620
end
1721

1822
def clear_in_batches(batch_size: 500)

lib/solid_queue/dispatcher/recurring_task.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,17 @@ def next_time
3131

3232
def enqueue(at:)
3333
SolidQueue.instrument(:enqueue_recurring_task, task: key, at: at) do |payload|
34-
if using_solid_queue_adapter?
34+
active_job = if using_solid_queue_adapter?
3535
perform_later_and_record(run_at: at)
3636
else
3737
payload[:other_adapter] = true
3838

3939
perform_later
40-
end.tap do |active_job|
41-
payload[:active_job_id] = active_job&.job_id
4240
end
41+
42+
payload[:active_job_id] = active_job.job_id if active_job
43+
rescue RecurringExecution::AlreadyRecorded
44+
payload[:skipped] = true
4345
end
4446
end
4547

lib/solid_queue/log_subscriber.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,18 @@ def release_blocked(event)
4040
end
4141

4242
def enqueue_recurring_task(event)
43-
attributes = event.payload.slice(:task, :at, :active_job_id)
43+
attributes = event.payload.slice(:task, :active_job_id, :at)
4444

4545
if event.payload[:other_adapter]
46-
debug formatted_event(event, action: "Enqueued recurring task outside Solid Queue", **attributes)
46+
action = attributes[:active_job_id].present? ? "Enqueued recurring task outside Solid Queue" : "Error enqueuing recurring task"
47+
info formatted_event(event, action: action, **attributes)
4748
else
48-
action = attributes[:active_job_id].present? ? "Enqueued recurring task" : "Skipped recurring task – already dispatched"
49+
action = case
50+
when event.payload[:skipped].present? then "Skipped recurring task – already dispatched"
51+
when attributes[:active_job_id].nil? then "Error enqueuing recurring task"
52+
else "Enqueued recurring task"
53+
end
54+
4955
info formatted_event(event, action: action, **attributes)
5056
end
5157
end

test/integration/instrumentation_test.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,26 @@ class InstrumentationTest < ActiveSupport::TestCase
280280
end
281281
end
282282

283+
test "an error enqueuing a recurring task is reflected in the enqueue_recurring_task event" do
284+
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
285+
SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked)
286+
287+
dispatcher = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task)
288+
289+
events = subscribed("enqueue_recurring_task.solid_queue") do
290+
dispatcher.start
291+
sleep(1.01)
292+
dispatcher.stop
293+
end
294+
295+
assert events.size >= 1
296+
event = events.last
297+
298+
assert_event event, "enqueue_recurring_task", task: :example_task
299+
assert event.last[:at].present?
300+
assert_nil event.last[:other_adapter]
301+
end
302+
283303
test "thread errors emit thread_error events" do
284304
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
285305

test/unit/log_subscriber_test.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,27 @@ def set_logger(logger)
2626
assert_match_logged :debug, "Unblock jobs", "limit: 42, size: 10"
2727
end
2828

29+
test "recurring task enqueued succesfully" do
30+
attach_log_subscriber
31+
instrument "enqueue_recurring_task.solid_queue", task: :example_task, active_job_id: "b944ddbc-6a37-43c0-b661-4b56e57195f5", at: Time.now
32+
33+
assert_match_logged :info, "Enqueued recurring task", "task: :example_task, active_job_id: \"b944ddbc-6a37-43c0-b661-4b56e57195f5\""
34+
end
35+
36+
test "recurring task skipped" do
37+
attach_log_subscriber
38+
instrument "enqueue_recurring_task.solid_queue", task: :example_task, skipped: true, at: Time.now
39+
40+
assert_match_logged :info, "Skipped recurring task – already dispatched", "task: :example_task"
41+
end
42+
43+
test "error enqueuing recurring task" do
44+
attach_log_subscriber
45+
instrument "enqueue_recurring_task.solid_queue", task: :example_task, at: Time.now
46+
47+
assert_match_logged :info, "Error enqueuing recurring task", "task: :example_task"
48+
end
49+
2950
private
3051
def attach_log_subscriber
3152
ActiveSupport::LogSubscriber.attach_to :solid_queue, SolidQueue::LogSubscriber.new

test/unit/recurring_task_test.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ def perform(value, options = {}, **kwargs)
2525
end
2626
end
2727

28+
class JobUsingAsyncAdapter < ApplicationJob
29+
self.queue_adapter = :async
30+
31+
def perform
32+
JobBuffer.add "job_using_async_adapter"
33+
end
34+
end
35+
2836
setup do
2937
@worker = SolidQueue::Worker.new(queues: "*")
3038
@worker.mode = :inline
@@ -58,6 +66,37 @@ def perform(value, options = {}, **kwargs)
5866
enqueue_and_assert_performed_with_result task, [ "multiple_types", nil, 42 ]
5967
end
6068

69+
test "job using a different adapter" do
70+
task = recurring_task_with(class_name: "JobUsingAsyncAdapter")
71+
previous_size = JobBuffer.size
72+
73+
task.enqueue(at: Time.now)
74+
wait_while_with_timeout!(0.5.seconds) { JobBuffer.size == previous_size }
75+
76+
assert_equal "job_using_async_adapter", JobBuffer.last_value
77+
end
78+
79+
test "error when enqueuing job before recording task" do
80+
SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked)
81+
82+
task = recurring_task_with(class_name: "JobWithoutArguments")
83+
assert_no_difference -> { SolidQueue::Job.count } do
84+
task.enqueue(at: Time.now)
85+
end
86+
end
87+
88+
test "error when enqueuing job using another adapter" do
89+
ActiveJob::QueueAdapters::AsyncAdapter.any_instance.stubs(:enqueue).raises(ActiveJob::EnqueueError)
90+
previous_size = JobBuffer.size
91+
92+
task = recurring_task_with(class_name: "JobUsingAsyncAdapter")
93+
task.enqueue(at: Time.now)
94+
95+
wait_while_with_timeout(0.5.seconds) { JobBuffer.size == previous_size }
96+
97+
assert_equal previous_size, JobBuffer.size
98+
end
99+
61100
test "valid and invalid schedules" do
62101
assert_not recurring_task_with(class_name: "JobWithoutArguments", schedule: "once a year").valid?
63102
assert_not recurring_task_with(class_name: "JobWithoutArguments", schedule: "tomorrow").valid?

0 commit comments

Comments
 (0)