Skip to content

Commit a134d91

Browse files
authored
Merge pull request rails#51426 from Shopify/transaction-callbacks-2
Automatically delay Active Job enqueues to after commit
2 parents 0e0da31 + e922c59 commit a134d91

24 files changed

+237
-15
lines changed

.rubocop.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ Rails/IndexWith:
5555
Style/AndOr:
5656
Enabled: true
5757

58-
# Align `when` with `case`.
59-
Layout/CaseIndentation:
60-
Enabled: true
61-
6258
Layout/ClosingHeredocIndentation:
6359
Enabled: true
6460

activejob/CHANGELOG.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,30 @@
1+
* Make Active Job transaction aware when used conjointly with Active Record.
2+
3+
A common mistake with Active Job is to enqueue jobs from inside a transaction,
4+
causing them to potentially be picked and ran by another process, before the
5+
transaction is committed, which result in various errors.
6+
7+
```ruby
8+
Topic.transaction do
9+
topic = Topic.create(...)
10+
NewTopicNotificationJob.perform_later(topic)
11+
end
12+
```
13+
14+
Now Active Job will automatically defer the enqueuing to after the transaction is committed,
15+
and drop the job if the transaction is rolled back.
16+
17+
Various queue implementations can chose to disable this behavior, and users can disable it,
18+
or force it on a per job basis:
19+
20+
```ruby
21+
class NewTopicNotificationJob < ApplicationJob
22+
self.enqueue_after_transaction_commit = false # or `true`
23+
end
24+
```
25+
26+
*Jean Boussier*, *Cristian Bica*
27+
128
* Do not trigger immediate loading of `ActiveJob::Base` when loading `ActiveJob::TestHelper`.
229

330
*Maxime Réty*

activejob/lib/active_job.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ module ActiveJob
3939
autoload :Arguments
4040
autoload :DeserializationError, "active_job/arguments"
4141
autoload :SerializationError, "active_job/arguments"
42+
autoload :EnqueueAfterTransactionCommit
4243

4344
eager_autoload do
4445
autoload :Serializers
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveJob
4+
module EnqueueAfterTransactionCommit # :nodoc:
5+
extend ActiveSupport::Concern
6+
7+
included do
8+
##
9+
# :singleton-method:
10+
#
11+
# Defines if enqueueing this job from inside an Active Record transaction
12+
# automatically defers the enqueue to after the transaction commit.
13+
#
14+
# It can be set on a per job basis:
15+
# - `:always` forces the job to be deferred.
16+
# - `:never` forces the job to be queueed immediately
17+
# - `:default` let the queue adapter define the behavior (recommended).
18+
class_attribute :enqueue_after_transaction_commit, instance_accessor: false, instance_predicate: false, default: :never
19+
20+
around_enqueue do |job, block|
21+
after_transaction = case job.class.enqueue_after_transaction_commit
22+
when :always
23+
true
24+
when :never
25+
false
26+
else # :default
27+
queue_adapter.enqueue_after_transaction_commit?
28+
end
29+
30+
if after_transaction
31+
ActiveRecord.after_all_transactions_commit(&block)
32+
else
33+
block.call
34+
end
35+
end
36+
end
37+
end
38+
end

activejob/lib/active_job/enqueuing.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ module ClassMethods
5353
# Job#arguments or false if the enqueue did not succeed.
5454
#
5555
# After the attempted enqueue, the job will be yielded to an optional block.
56+
#
57+
# If Active Job is used conjointly with Active Record, and #perform_later is called
58+
# inside an Active Record transaction, then the enqueue is implictly defered to after
59+
# the transaction is committed, or droped if it's rolled back. This behavior can
60+
# be changed on a per job basis:
61+
#
62+
# class NotificationJob < ApplicationJob
63+
# self.enqueue_after_transaction_commit = false
64+
# end
5665
def perform_later(...)
5766
job = job_or_instantiate(...)
5867
enqueue_result = job.enqueue

activejob/lib/active_job/queue_adapters.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ module ActiveJob
114114
module QueueAdapters
115115
extend ActiveSupport::Autoload
116116

117+
autoload :AbstractAdapter
117118
autoload :AsyncAdapter
118119
autoload :InlineAdapter
119120
autoload :BackburnerAdapter
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveJob
4+
module QueueAdapters
5+
# = Active Job Abstract Adapter
6+
#
7+
# Active Job supports multiple job queue systems. ActiveJob::QueueAdapters::AbstractAdapter
8+
# form the abstraction layer which makes this possible.
9+
class AbstractAdapter
10+
# Define whether enqueuing should implictly to after commit when called from
11+
# inside a transaction. Most adapters should return true, but some adapters
12+
# that use the same database as Active Record and are transaction aware can return
13+
# false to continue enqueuing jobs are part of the transaction.
14+
def enqueue_after_transaction_commit?
15+
true
16+
end
17+
18+
def enqueue(job)
19+
raise NotImplementedError
20+
end
21+
22+
def enqueue_at(job, timestamp)
23+
raise NotImplementedError
24+
end
25+
end
26+
end
27+
end

activejob/lib/active_job/queue_adapters/async_adapter.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ module QueueAdapters
3030
# The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
3131
# jobs. Since jobs share a single thread pool, long-running jobs will block
3232
# short-lived jobs. Fine for dev/test; bad for production.
33-
class AsyncAdapter
33+
class AsyncAdapter < AbstractAdapter
3434
# See {Concurrent::ThreadPoolExecutor}[https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html] for executor options.
3535
def initialize(**executor_options)
3636
@scheduler = Scheduler.new(**executor_options)

activejob/lib/active_job/queue_adapters/backburner_adapter.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ module QueueAdapters
1414
# To use Backburner set the queue_adapter config to +:backburner+.
1515
#
1616
# Rails.application.config.active_job.queue_adapter = :backburner
17-
class BackburnerAdapter
17+
class BackburnerAdapter < AbstractAdapter
1818
def enqueue(job) # :nodoc:
1919
response = Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority)
2020
job.provider_job_id = response[:id] if response.is_a?(Hash)

activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,15 @@ module QueueAdapters
1515
# To use Delayed Job, set the queue_adapter config to +:delayed_job+.
1616
#
1717
# Rails.application.config.active_job.queue_adapter = :delayed_job
18-
class DelayedJobAdapter
18+
class DelayedJobAdapter < AbstractAdapter
19+
def initialize(enqueue_after_transaction_commit: false)
20+
@enqueue_after_transaction_commit = enqueue_after_transaction_commit
21+
end
22+
23+
def enqueue_after_transaction_commit? # :nodoc:
24+
@enqueue_after_transaction_commit
25+
end
26+
1927
def enqueue(job) # :nodoc:
2028
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
2129
job.provider_job_id = delayed_job.id

0 commit comments

Comments
 (0)