Skip to content

Commit e922c59

Browse files
byrootcristianbica
andcommitted
Implement Active Job enqueue_after_transaction_commit
A fairly common mistake with Rails is to enqueue a job from inside a transaction, with a record as argumemnt, which then lead to a RecordNotFound error when picked up by the queue. This is even one of the arguments advanced for job runners backed by the database such as `solid_queue`, `delayed_job` or `good_job`. But relying on this is undesirable in my opinion as it makes the Active Job abstraction leaky, and if in the future you need to migrate to another backend or even just move the queue to a separate database, you may experience a lot of race conditions of the sort. To resolve this problem globally, we can make Active Job optionally transaction aware, and automatically defer job queueing to `after_commit`. Co-Authored-By: Cristian Bica <[email protected]>
1 parent 0e0da31 commit e922c59

24 files changed

+237
-15
lines changed

.rubocop.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ Rails/IndexWith:
5555
Style/AndOr:
5656
Enabled: true
5757

58-
# Align `when` with `case`.
59-
Layout/CaseIndentation:
60-
Enabled: true
61-
6258
Layout/ClosingHeredocIndentation:
6359
Enabled: true
6460

activejob/CHANGELOG.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,30 @@
1+
* Make Active Job transaction aware when used conjointly with Active Record.
2+
3+
A common mistake with Active Job is to enqueue jobs from inside a transaction,
4+
causing them to potentially be picked and ran by another process, before the
5+
transaction is committed, which result in various errors.
6+
7+
```ruby
8+
Topic.transaction do
9+
topic = Topic.create(...)
10+
NewTopicNotificationJob.perform_later(topic)
11+
end
12+
```
13+
14+
Now Active Job will automatically defer the enqueuing to after the transaction is committed,
15+
and drop the job if the transaction is rolled back.
16+
17+
Various queue implementations can chose to disable this behavior, and users can disable it,
18+
or force it on a per job basis:
19+
20+
```ruby
21+
class NewTopicNotificationJob < ApplicationJob
22+
self.enqueue_after_transaction_commit = false # or `true`
23+
end
24+
```
25+
26+
*Jean Boussier*, *Cristian Bica*
27+
128
* Do not trigger immediate loading of `ActiveJob::Base` when loading `ActiveJob::TestHelper`.
229

330
*Maxime Réty*

activejob/lib/active_job.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ module ActiveJob
3939
autoload :Arguments
4040
autoload :DeserializationError, "active_job/arguments"
4141
autoload :SerializationError, "active_job/arguments"
42+
autoload :EnqueueAfterTransactionCommit
4243

4344
eager_autoload do
4445
autoload :Serializers
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveJob
4+
module EnqueueAfterTransactionCommit # :nodoc:
5+
extend ActiveSupport::Concern
6+
7+
included do
8+
##
9+
# :singleton-method:
10+
#
11+
# Defines if enqueueing this job from inside an Active Record transaction
12+
# automatically defers the enqueue to after the transaction commit.
13+
#
14+
# It can be set on a per job basis:
15+
# - `:always` forces the job to be deferred.
16+
# - `:never` forces the job to be queueed immediately
17+
# - `:default` let the queue adapter define the behavior (recommended).
18+
class_attribute :enqueue_after_transaction_commit, instance_accessor: false, instance_predicate: false, default: :never
19+
20+
around_enqueue do |job, block|
21+
after_transaction = case job.class.enqueue_after_transaction_commit
22+
when :always
23+
true
24+
when :never
25+
false
26+
else # :default
27+
queue_adapter.enqueue_after_transaction_commit?
28+
end
29+
30+
if after_transaction
31+
ActiveRecord.after_all_transactions_commit(&block)
32+
else
33+
block.call
34+
end
35+
end
36+
end
37+
end
38+
end

activejob/lib/active_job/enqueuing.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ module ClassMethods
5353
# Job#arguments or false if the enqueue did not succeed.
5454
#
5555
# After the attempted enqueue, the job will be yielded to an optional block.
56+
#
57+
# If Active Job is used conjointly with Active Record, and #perform_later is called
58+
# inside an Active Record transaction, then the enqueue is implictly defered to after
59+
# the transaction is committed, or droped if it's rolled back. This behavior can
60+
# be changed on a per job basis:
61+
#
62+
# class NotificationJob < ApplicationJob
63+
# self.enqueue_after_transaction_commit = false
64+
# end
5665
def perform_later(...)
5766
job = job_or_instantiate(...)
5867
enqueue_result = job.enqueue

activejob/lib/active_job/queue_adapters.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ module ActiveJob
114114
module QueueAdapters
115115
extend ActiveSupport::Autoload
116116

117+
autoload :AbstractAdapter
117118
autoload :AsyncAdapter
118119
autoload :InlineAdapter
119120
autoload :BackburnerAdapter
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveJob
4+
module QueueAdapters
5+
# = Active Job Abstract Adapter
6+
#
7+
# Active Job supports multiple job queue systems. ActiveJob::QueueAdapters::AbstractAdapter
8+
# form the abstraction layer which makes this possible.
9+
class AbstractAdapter
10+
# Define whether enqueuing should implictly to after commit when called from
11+
# inside a transaction. Most adapters should return true, but some adapters
12+
# that use the same database as Active Record and are transaction aware can return
13+
# false to continue enqueuing jobs are part of the transaction.
14+
def enqueue_after_transaction_commit?
15+
true
16+
end
17+
18+
def enqueue(job)
19+
raise NotImplementedError
20+
end
21+
22+
def enqueue_at(job, timestamp)
23+
raise NotImplementedError
24+
end
25+
end
26+
end
27+
end

activejob/lib/active_job/queue_adapters/async_adapter.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ module QueueAdapters
3030
# The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
3131
# jobs. Since jobs share a single thread pool, long-running jobs will block
3232
# short-lived jobs. Fine for dev/test; bad for production.
33-
class AsyncAdapter
33+
class AsyncAdapter < AbstractAdapter
3434
# See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html] for executor options.
3535
def initialize(**executor_options)
3636
@scheduler = Scheduler.new(**executor_options)

activejob/lib/active_job/queue_adapters/backburner_adapter.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ module QueueAdapters
1414
# To use Backburner set the queue_adapter config to +:backburner+.
1515
#
1616
# Rails.application.config.active_job.queue_adapter = :backburner
17-
class BackburnerAdapter
17+
class BackburnerAdapter < AbstractAdapter
1818
def enqueue(job) # :nodoc:
1919
response = Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority)
2020
job.provider_job_id = response[:id] if response.is_a?(Hash)

activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,15 @@ module QueueAdapters
1515
# To use Delayed Job, set the queue_adapter config to +:delayed_job+.
1616
#
1717
# Rails.application.config.active_job.queue_adapter = :delayed_job
18-
class DelayedJobAdapter
18+
class DelayedJobAdapter < AbstractAdapter
19+
def initialize(enqueue_after_transaction_commit: false)
20+
@enqueue_after_transaction_commit = enqueue_after_transaction_commit
21+
end
22+
23+
def enqueue_after_transaction_commit? # :nodoc:
24+
@enqueue_after_transaction_commit
25+
end
26+
1927
def enqueue(job) # :nodoc:
2028
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
2129
job.provider_job_id = delayed_job.id

0 commit comments

Comments
 (0)