Skip to content

Commit 441c93a

Browse files
committed
Raise a custom exception instead of ActiveJob::EnqueueError on enqueue
The reason is that we can't easily control what happens when something goes wrong on enqueuing if the jobs are enqueued by Rails or other gems (eg. Turbo::Streams::BroadcastJob, ActiveStorage::AnalyzeJob...), because the only way is to check what the return value of the call to `perform_later` is, or to pass a block to that method to fetch the error that gets set in `enqueue_error`. In this way at least the error will bubble up and you can rescue or let it be raised.
1 parent b13cb2a commit 441c93a

File tree

8 files changed

+49
-17
lines changed

8 files changed

+49
-17
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@ There are several settings that control how Solid Queue works that you can set a
173173
- `default_concurrency_control_period`: the value to be used as the default for the `duration` parameter in [concurrency controls](#concurrency-controls). It defaults to 3 minutes.
174174
- `enqueue_after_transaction_commit`: whether the job queuing is deferred to after the current Active Record transaction is committed. The default is `false`. [Read more](https://github.com/rails/rails/pull/51426).
175175

176+
## Errors when enqueuing
177+
Solid Queue will raise a `SolidQueue::Job::EnqueueError` for any Active Record errors that happen when enqueuing a job. The reason for not raising `ActiveJob::EnqueueError` is that this one gets handled by Active Job, causing `perform_later` to return `false` and set `job.enqueue_error`, yielding the job to a block that you need to pass to `perform_later`. This works very well for your own jobs, but makes failure very hard to handle for jobs enqueued by Rails or other gems, such as `Turbo::Streams::BroadcastJob` or `ActiveStorage::AnalyzeJob`, because you don't control the call to `perform_later` in that cases.
178+
179+
In the case of recurring tasks, if such error is raised when enqueuing the job corresponding to the task, it'll be handled and logged but it won't bubble up.
176180

177181
## Concurrency controls
178182
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked.

app/models/solid_queue/job.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
module SolidQueue
44
class Job < Record
5+
class EnqueueError < StandardError; end
6+
57
include Executable, Clearable, Recurrable
68

79
serialize :arguments, coder: JSON
@@ -38,7 +40,7 @@ def enqueue(active_job, scheduled_at: Time.current)
3840
def create_from_active_job(active_job)
3941
create!(**attributes_from_active_job(active_job))
4042
rescue ActiveRecord::ActiveRecordError => e
41-
enqueue_error = ActiveJob::EnqueueError.new("#{e.class.name}: #{e.message}").tap do |error|
43+
enqueue_error = EnqueueError.new("#{e.class.name}: #{e.message}").tap do |error|
4244
error.set_backtrace e.backtrace
4345
end
4446
raise enqueue_error

lib/solid_queue/dispatcher/recurring_task.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,18 @@ def enqueue(at:)
3636
else
3737
payload[:other_adapter] = true
3838

39-
perform_later
39+
perform_later do |job|
40+
unless job.successfully_enqueued?
41+
payload[:enqueue_error] = job.enqueue_error&.message
42+
end
43+
end
4044
end
4145

4246
payload[:active_job_id] = active_job.job_id if active_job
4347
rescue RecurringExecution::AlreadyRecorded
4448
payload[:skipped] = true
49+
rescue Job::EnqueueError => error
50+
payload[:enqueue_error] = error.message
4551
end
4652
end
4753

@@ -70,8 +76,8 @@ def perform_later_and_record(run_at:)
7076
RecurringExecution.record(key, run_at) { perform_later }
7177
end
7278

73-
def perform_later
74-
job_class.perform_later(*arguments_with_kwargs)
79+
def perform_later(&block)
80+
job_class.perform_later(*arguments_with_kwargs, &block)
7581
end
7682

7783
def arguments_with_kwargs

lib/solid_queue/log_subscriber.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def release_blocked(event)
4040
end
4141

4242
def enqueue_recurring_task(event)
43-
attributes = event.payload.slice(:task, :active_job_id, :at)
43+
attributes = event.payload.slice(:task, :active_job_id, :enqueue_error, :at)
4444

4545
if event.payload[:other_adapter]
4646
action = attributes[:active_job_id].present? ? "Enqueued recurring task outside Solid Queue" : "Error enqueuing recurring task"

test/integration/instrumentation_test.rb

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class InstrumentationTest < ActiveSupport::TestCase
280280
end
281281
end
282282

283-
test "an error enqueuing a recurring task is reflected in the enqueue_recurring_task event" do
283+
test "an error enqueuing a recurring task in Solid Queue is reflected in the enqueue_recurring_task event" do
284284
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
285285
SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked)
286286

@@ -295,11 +295,34 @@ class InstrumentationTest < ActiveSupport::TestCase
295295
assert events.size >= 1
296296
event = events.last
297297

298-
assert_event event, "enqueue_recurring_task", task: :example_task
298+
assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "ActiveRecord::Deadlocked: ActiveRecord::Deadlocked"
299299
assert event.last[:at].present?
300300
assert_nil event.last[:other_adapter]
301301
end
302302

303+
test "an error enqueuing a recurring task with another adapter is reflected in the enqueue_recurring_task event" do
304+
AddToBufferJob.queue_adapter = :async
305+
ActiveJob::QueueAdapters::AsyncAdapter.any_instance.stubs(:enqueue).raises(ActiveJob::EnqueueError.new("All is broken"))
306+
recurring_task = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
307+
308+
dispatcher = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: recurring_task)
309+
310+
events = subscribed("enqueue_recurring_task.solid_queue") do
311+
dispatcher.start
312+
sleep(1.01)
313+
dispatcher.stop
314+
end
315+
316+
assert events.size >= 1
317+
event = events.last
318+
319+
assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "All is broken"
320+
assert event.last[:at].present?
321+
assert event.last[:other_adapter]
322+
ensure
323+
AddToBufferJob.queue_adapter = :solid_queue
324+
end
325+
303326
test "thread errors emit thread_error events" do
304327
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
305328

test/models/solid_queue/job_test.rb

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,20 +241,17 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
241241
end
242242
end
243243

244-
test "raise ActiveJob::EnqueueError when there's an ActiveRecordError" do
244+
test "raise EnqueueError when there's an ActiveRecordError" do
245245
SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked)
246246

247247
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
248-
assert_raises ActiveJob::EnqueueError do
248+
assert_raises SolidQueue::Job::EnqueueError do
249249
SolidQueue::Job.enqueue(active_job)
250250
end
251251

252-
enqueue_result = AddToBufferJob.perform_later(1) do |job|
253-
assert job.enqueue_error.is_a? ActiveJob::EnqueueError
254-
assert_not job.successfully_enqueued?
252+
assert_raises SolidQueue::Job::EnqueueError do
253+
AddToBufferJob.perform_later(1)
255254
end
256-
257-
assert_not enqueue_result
258255
end
259256

260257
if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite"

test/unit/log_subscriber_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ def set_logger(logger)
4242

4343
test "error enqueuing recurring task" do
4444
attach_log_subscriber
45-
instrument "enqueue_recurring_task.solid_queue", task: :example_task, at: Time.now
45+
instrument "enqueue_recurring_task.solid_queue", task: :example_task, enqueue_error: "Everything is broken", at: Time.now
4646

47-
assert_match_logged :info, "Error enqueuing recurring task", "task: :example_task"
47+
assert_match_logged :info, "Error enqueuing recurring task", "task: :example_task, enqueue_error: \"Everything is broken\""
4848
end
4949

5050
private

test/unit/recurring_task_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def perform
8585
end
8686
end
8787

88-
test "error when enqueuing job using another adapter" do
88+
test "error when enqueuing job using another adapter that raises ActiveJob::EnqueueError" do
8989
ActiveJob::QueueAdapters::AsyncAdapter.any_instance.stubs(:enqueue).raises(ActiveJob::EnqueueError)
9090
previous_size = JobBuffer.size
9191

0 commit comments

Comments
 (0)