Skip to content

Commit fa0a5cb

Browse files
authored
Merge pull request #252 from rails/enqueue-errors
Improve error handling on enqueuing
2 parents 664931d + 441c93a commit fa0a5cb

File tree

11 files changed

+165
-12
lines changed

11 files changed

+165
-12
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@ There are several settings that control how Solid Queue works that you can set a
173173
- `default_concurrency_control_period`: the value to be used as the default for the `duration` parameter in [concurrency controls](#concurrency-controls). It defaults to 3 minutes.
174174
- `enqueue_after_transaction_commit`: whether the job queuing is deferred to after the current Active Record transaction is committed. The default is `false`. [Read more](https://github.com/rails/rails/pull/51426).
175175

176+
## Errors when enqueuing
177+
Solid Queue will raise a `SolidQueue::Job::EnqueueError` for any Active Record errors that happen when enqueuing a job. The reason for not raising `ActiveJob::EnqueueError` is that this one gets handled by Active Job, causing `perform_later` to return `false` and set `job.enqueue_error`, yielding the job to a block that you need to pass to `perform_later`. This works very well for your own jobs, but makes failure very hard to handle for jobs enqueued by Rails or other gems, such as `Turbo::Streams::BroadcastJob` or `ActiveStorage::AnalyzeJob`, because you don't control the call to `perform_later` in that cases.
178+
179+
In the case of recurring tasks, if such error is raised when enqueuing the job corresponding to the task, it'll be handled and logged but it won't bubble up.
176180

177181
## Concurrency controls
178182
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked.

app/models/solid_queue/job.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module SolidQueue
44
class Job < Record
5+
class EnqueueError < StandardError; end
6+
57
include Executable, Clearable, Recurrable
68

79
serialize :arguments, coder: JSON
@@ -37,6 +39,11 @@ def enqueue(active_job, scheduled_at: Time.current)
3739

3840
def create_from_active_job(active_job)
3941
create!(**attributes_from_active_job(active_job))
42+
rescue ActiveRecord::ActiveRecordError => e
43+
enqueue_error = EnqueueError.new("#{e.class.name}: #{e.message}").tap do |error|
44+
error.set_backtrace e.backtrace
45+
end
46+
raise enqueue_error
4047
end
4148

4249
def create_all_from_active_jobs(active_jobs)

app/models/solid_queue/recurring_execution.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,21 @@
22

33
module SolidQueue
44
class RecurringExecution < Execution
5+
class AlreadyRecorded < StandardError; end
6+
57
scope :clearable, -> { where.missing(:job) }
68

79
class << self
810
def record(task_key, run_at, &block)
911
transaction do
1012
block.call.tap do |active_job|
11-
create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
13+
if active_job
14+
create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
15+
end
1216
end
1317
end
14-
rescue ActiveRecord::RecordNotUnique
15-
# Task already dispatched
18+
rescue ActiveRecord::RecordNotUnique => e
19+
raise AlreadyRecorded
1620
end
1721

1822
def clear_in_batches(batch_size: 500)

lib/solid_queue/dispatcher.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ def unload_recurring_schedule
5151
recurring_schedule.unload_tasks
5252
end
5353

54+
def all_work_completed?
55+
SolidQueue::ScheduledExecution.none? && recurring_schedule.empty?
56+
end
57+
5458
def set_procline
5559
procline "waiting"
5660
end

lib/solid_queue/dispatcher/recurring_schedule.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ def initialize(tasks)
1111
@scheduled_tasks = Concurrent::Hash.new
1212
end
1313

14+
def empty?
15+
configured_tasks.empty?
16+
end
17+
1418
def load_tasks
1519
configured_tasks.each do |task|
1620
load_task(task)

lib/solid_queue/dispatcher/recurring_task.rb

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,23 @@ def next_time
3131

3232
def enqueue(at:)
3333
SolidQueue.instrument(:enqueue_recurring_task, task: key, at: at) do |payload|
34-
if using_solid_queue_adapter?
34+
active_job = if using_solid_queue_adapter?
3535
perform_later_and_record(run_at: at)
3636
else
3737
payload[:other_adapter] = true
3838

39-
perform_later
40-
end.tap do |active_job|
41-
payload[:active_job_id] = active_job&.job_id
39+
perform_later do |job|
40+
unless job.successfully_enqueued?
41+
payload[:enqueue_error] = job.enqueue_error&.message
42+
end
43+
end
4244
end
45+
46+
payload[:active_job_id] = active_job.job_id if active_job
47+
rescue RecurringExecution::AlreadyRecorded
48+
payload[:skipped] = true
49+
rescue Job::EnqueueError => error
50+
payload[:enqueue_error] = error.message
4351
end
4452
end
4553

@@ -68,8 +76,8 @@ def perform_later_and_record(run_at:)
6876
RecurringExecution.record(key, run_at) { perform_later }
6977
end
7078

71-
def perform_later
72-
job_class.perform_later(*arguments_with_kwargs)
79+
def perform_later(&block)
80+
job_class.perform_later(*arguments_with_kwargs, &block)
7381
end
7482

7583
def arguments_with_kwargs

lib/solid_queue/log_subscriber.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,18 @@ def release_blocked(event)
4040
end
4141

4242
def enqueue_recurring_task(event)
43-
attributes = event.payload.slice(:task, :at, :active_job_id)
43+
attributes = event.payload.slice(:task, :active_job_id, :enqueue_error, :at)
4444

4545
if event.payload[:other_adapter]
46-
debug formatted_event(event, action: "Enqueued recurring task outside Solid Queue", **attributes)
46+
action = attributes[:active_job_id].present? ? "Enqueued recurring task outside Solid Queue" : "Error enqueuing recurring task"
47+
info formatted_event(event, action: action, **attributes)
4748
else
48-
action = attributes[:active_job_id].present? ? "Enqueued recurring task" : "Skipped recurring task – already dispatched"
49+
action = case
50+
when event.payload[:skipped].present? then "Skipped recurring task – already dispatched"
51+
when attributes[:active_job_id].nil? then "Error enqueuing recurring task"
52+
else "Enqueued recurring task"
53+
end
54+
4955
info formatted_event(event, action: action, **attributes)
5056
end
5157
end

test/integration/instrumentation_test.rb

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,49 @@ class InstrumentationTest < ActiveSupport::TestCase
280280
end
281281
end
282282

283+
test "an error enqueuing a recurring task in Solid Queue is reflected in the enqueue_recurring_task event" do
284+
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
285+
SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked)
286+
287+
dispatcher = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task)
288+
289+
events = subscribed("enqueue_recurring_task.solid_queue") do
290+
dispatcher.start
291+
sleep(1.01)
292+
dispatcher.stop
293+
end
294+
295+
assert events.size >= 1
296+
event = events.last
297+
298+
assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "ActiveRecord::Deadlocked: ActiveRecord::Deadlocked"
299+
assert event.last[:at].present?
300+
assert_nil event.last[:other_adapter]
301+
end
302+
303+
test "an error enqueuing a recurring task with another adapter is reflected in the enqueue_recurring_task event" do
304+
AddToBufferJob.queue_adapter = :async
305+
ActiveJob::QueueAdapters::AsyncAdapter.any_instance.stubs(:enqueue).raises(ActiveJob::EnqueueError.new("All is broken"))
306+
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
307+
308+
dispatcher = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task)
309+
310+
events = subscribed("enqueue_recurring_task.solid_queue") do
311+
dispatcher.start
312+
sleep(1.01)
313+
dispatcher.stop
314+
end
315+
316+
assert events.size >= 1
317+
event = events.last
318+
319+
assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "All is broken"
320+
assert event.last[:at].present?
321+
assert event.last[:other_adapter]
322+
ensure
323+
AddToBufferJob.queue_adapter = :solid_queue
324+
end
325+
283326
test "thread errors emit thread_error events" do
284327
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
285328

test/models/solid_queue/job_test.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,19 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
241241
end
242242
end
243243

244+
test "raise EnqueueError when there's an ActiveRecordError" do
245+
SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked)
246+
247+
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
248+
assert_raises SolidQueue::Job::EnqueueError do
249+
SolidQueue::Job.enqueue(active_job)
250+
end
251+
252+
assert_raises SolidQueue::Job::EnqueueError do
253+
AddToBufferJob.perform_later(1)
254+
end
255+
end
256+
244257
if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite"
245258
test "uses a different connection and transaction than the one in use when connects_to is specified" do
246259
assert_difference -> { SolidQueue::Job.count } do

test/unit/log_subscriber_test.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,27 @@ def set_logger(logger)
2626
assert_match_logged :debug, "Unblock jobs", "limit: 42, size: 10"
2727
end
2828

29+
test "recurring task enqueued succesfully" do
30+
attach_log_subscriber
31+
instrument "enqueue_recurring_task.solid_queue", task: :example_task, active_job_id: "b944ddbc-6a37-43c0-b661-4b56e57195f5", at: Time.now
32+
33+
assert_match_logged :info, "Enqueued recurring task", "task: :example_task, active_job_id: \"b944ddbc-6a37-43c0-b661-4b56e57195f5\""
34+
end
35+
36+
test "recurring task skipped" do
37+
attach_log_subscriber
38+
instrument "enqueue_recurring_task.solid_queue", task: :example_task, skipped: true, at: Time.now
39+
40+
assert_match_logged :info, "Skipped recurring task – already dispatched", "task: :example_task"
41+
end
42+
43+
test "error enqueuing recurring task" do
44+
attach_log_subscriber
45+
instrument "enqueue_recurring_task.solid_queue", task: :example_task, enqueue_error: "Everything is broken", at: Time.now
46+
47+
assert_match_logged :info, "Error enqueuing recurring task", "task: :example_task, enqueue_error: \"Everything is broken\""
48+
end
49+
2950
private
3051
def attach_log_subscriber
3152
ActiveSupport::LogSubscriber.attach_to :solid_queue, SolidQueue::LogSubscriber.new

0 commit comments

Comments
 (0)