Skip to content

Commit c2df237

Browse files
byrootcristianbica
andcommitted
Allow to register transaction callbacks outside of a record
Ref: rails#26103 Ref: rails#51426 A fairly common mistake with Rails is to enqueue a job from inside a transaction, and a record as argumemnt, which then lead to a RecordNotFound error when picked up by the queue. This is even one of the arguments advanced for job runners backed by the database such as `solid_queue`, `delayed_job` or `good_job`. But relying on this is undesirable iin my opinion as it makes the Active Job abstraction leaky, and if in the future you need to migrate to another backend or even just move the queue to a separate database, you may experience a lot of race conditions of the sort. But more generally, being able to defer work to after the current transaction has been a missing feature of Active Record. Right now the only way to do it is from a model callback, and this forces moving things in Active Record models that sometimes are better done elsewhere. Even as a self-proclaimed "service object skeptic", I often wanted this capability over the last decade, and I'm sure it got asked or desired by many more people. Also there's some 3rd party gems adding this capability using monkey patches. It's not a reason to upstream the capability, but it's a proof that there is demand for it. Implementation wise, this proof of concept shows that it's not really hard to implement, even with nested multi-db transactions support. Co-Authored-By: Cristian Bica <[email protected]>
1 parent eac95d5 commit c2df237

File tree

7 files changed

+413
-39
lines changed

7 files changed

+413
-39
lines changed

activerecord/CHANGELOG.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,51 @@
1+
* `ActiveRecord::Base.transaction` now yields an `ActiveRecord::Transation` object.
2+
3+
This allows to register callbacks on it.
4+
5+
```ruby
6+
Article.transaction do |transaction|
7+
article.update(published: true)
8+
transaction.after_commit do
9+
PublishNotificationMailer.with(article: article).deliver_later
10+
end
11+
end
12+
```
13+
14+
*Jean Boussier*
15+
16+
* Add `ActiveRecord::Base.current_transaction`.
17+
18+
Returns the current transaction, to allow registering callbacks on it.
19+
20+
```ruby
21+
Article.current_transaction.after_commit do
22+
PublishNotificationMailer.with(article: article).deliver_later
23+
end
24+
```
25+
26+
*Jean Boussier*
27+
28+
* Add `ActiveRecord.after_all_transactions_commit` callback.
29+
30+
Useful for code that may run either inside or outside a transaction and need
31+
to perform works after the state changes have been properly peristed.
32+
33+
```ruby
34+
def publish_article(article)
35+
article.update(published: true)
36+
ActiveRecord.after_all_transactions_commit do
37+
PublishNotificationMailer.with(article: article).deliver_later
38+
end
39+
end
40+
```
41+
42+
In the above example, the block is either executed immediately if called outside
43+
of a transaction, or called after the open transaction is committed.
44+
45+
If the transaction is rolled back, the block isn't called.
46+
47+
*Jean Boussier*
48+
149
* Add the ability to ignore counter cache columns until they are backfilled
250
351
Starting to use counter caches on existing large tables can be troublesome, because the column

activerecord/lib/active_record.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ module ActiveRecord
8686
autoload :Timestamp
8787
autoload :TokenFor
8888
autoload :TouchLater
89+
autoload :Transaction
8990
autoload :Transactions
9091
autoload :Translation
9192
autoload :Validations
@@ -540,6 +541,51 @@ def self.eager_load!
540541
def self.disconnect_all!
541542
ConnectionAdapters::PoolConfig.disconnect_all!
542543
end
544+
545+
# Registers a block to be called after all the current transactions have been
546+
# committed.
547+
#
548+
# If there is no currently open transaction, the block is called immediately.
549+
#
550+
# If there are multiple nested transactions, the block is called after the outermost one
551+
# has been committed,
552+
#
553+
# If any of the currently open transactions is rolled back, the block is never called.
554+
#
555+
# If multiple transactions are open across multiple databases, the block will be invoked
556+
# if and once all of them have been committed. But note that nesting transactions across
557+
# two distinct databases is a sharding anti-pattern that comes with a world of hurts.
558+
def self.after_all_transactions_commit(&block)
559+
open_transactions = all_open_transactions
560+
561+
if open_transactions.empty?
562+
yield
563+
elsif open_transactions.size == 1
564+
open_transactions.first.after_commit(&block)
565+
else
566+
count = open_transactions.size
567+
callback = -> do
568+
count -= 1
569+
block.call if count.zero?
570+
end
571+
open_transactions.each do |t|
572+
t.after_commit(&callback)
573+
end
574+
open_transactions = nil # rubocop:disable Lint/UselessAssignment avoid holding it in the closure
575+
end
576+
end
577+
578+
def self.all_open_transactions # :nodoc:
579+
open_transactions = []
580+
Base.connection_handler.each_connection_pool do |pool|
581+
if active_connection = pool.active_connection
582+
if active_connection.current_transaction.open? && active_connection.current_transaction.joinable?
583+
open_transactions << active_connection.current_transaction
584+
end
585+
end
586+
end
587+
open_transactions
588+
end
543589
end
544590

545591
ActiveSupport.on_load(:active_record) do

activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,17 @@ def truncate_tables(*table_names) # :nodoc:
235235
# Runs the given block in a database transaction, and returns the result
236236
# of the block.
237237
#
238+
# == Transaction callbacks
239+
#
240+
# #transaction yields an ActiveRecord::Transaction object on which it is
241+
# possible to register callback:
242+
#
243+
# ActiveRecord::Base.transaction do |transaction|
244+
# transaction.before_commit { puts "before commit!" }
245+
# transaction.after_commit { puts "after commit!" }
246+
# transaction.after_rollback { puts "after rollback!" }
247+
# end
248+
#
238249
# == Nested transactions support
239250
#
240251
# #transaction calls can be nested. By default, this makes all database
@@ -345,7 +356,7 @@ def transaction(requires_new: nil, isolation: nil, joinable: true, &block)
345356
if isolation
346357
raise ActiveRecord::TransactionIsolationError, "cannot set isolation when joining a transaction"
347358
end
348-
yield
359+
yield current_transaction
349360
else
350361
transaction_manager.within_new_transaction(isolation: isolation, joinable: joinable, &block)
351362
end

activerecord/lib/active_record/connection_adapters/abstract/transaction.rb

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,19 @@ def dirty!; end
116116
def invalidated?; false; end
117117
def invalidate!; end
118118
def materialized?; false; end
119+
def before_commit; yield; end
120+
def after_commit; yield; end
121+
def after_rollback; end # noop
119122
end
120123

121-
class Transaction # :nodoc:
124+
class Transaction < ActiveRecord::Transaction # :nodoc:
122125
attr_reader :connection, :state, :savepoint_name, :isolation_level
123126
attr_accessor :written
124127

125128
delegate :invalidate!, :invalidated?, to: :@state
126129

127130
def initialize(connection, isolation: nil, joinable: true, run_commit_callbacks: false)
131+
super()
128132
@connection = connection
129133
@state = TransactionState.new
130134
@records = nil
@@ -191,60 +195,76 @@ def restore!
191195
end
192196

193197
def rollback_records
194-
return unless records
195-
196-
ite = unique_records
198+
if records
199+
begin
200+
ite = unique_records
197201

198-
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
202+
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
199203

200-
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
201-
record.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: should_run_callbacks)
202-
end
203-
ensure
204-
ite&.each do |i|
205-
i.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: false)
204+
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
205+
record.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: should_run_callbacks)
206+
end
207+
ensure
208+
ite&.each do |i|
209+
i.rolledback!(force_restore_state: full_rollback?, should_run_callbacks: false)
210+
end
211+
end
206212
end
213+
214+
@callbacks&.each(&:after_rollback)
207215
end
208216

209217
def before_commit_records
210-
return unless records
211-
212218
if @run_commit_callbacks
213-
if ActiveRecord.before_committed_on_all_records
214-
ite = unique_records
219+
if records
220+
if ActiveRecord.before_committed_on_all_records
221+
ite = unique_records
215222

216-
instances_to_run_callbacks_on = records.each_with_object({}) do |record, candidates|
217-
candidates[record] = record
218-
end
223+
instances_to_run_callbacks_on = records.each_with_object({}) do |record, candidates|
224+
candidates[record] = record
225+
end
219226

220-
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
221-
record.before_committed! if should_run_callbacks
227+
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
228+
record.before_committed! if should_run_callbacks
229+
end
230+
else
231+
records.uniq.each(&:before_committed!)
222232
end
223-
else
224-
records.uniq.each(&:before_committed!)
225233
end
234+
235+
@callbacks&.each(&:before_commit)
226236
end
237+
# Note: When @run_commit_callbacks is false #commit_records takes care of appending
238+
# remaining callbacks to the parent transaction
227239
end
228240

229241
def commit_records
230-
return unless records
231-
232-
ite = unique_records
242+
if records
243+
begin
244+
ite = unique_records
233245

234-
if @run_commit_callbacks
235-
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
246+
if @run_commit_callbacks
247+
instances_to_run_callbacks_on = prepare_instances_to_run_callbacks_on(ite)
236248

237-
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
238-
record.committed!(should_run_callbacks: should_run_callbacks)
239-
end
240-
else
241-
while record = ite.shift
242-
# if not running callbacks, only adds the record to the parent transaction
243-
connection.add_transaction_record(record)
249+
run_action_on_records(ite, instances_to_run_callbacks_on) do |record, should_run_callbacks|
250+
record.committed!(should_run_callbacks: should_run_callbacks)
251+
end
252+
else
253+
while record = ite.shift
254+
# if not running callbacks, only adds the record to the parent transaction
255+
connection.add_transaction_record(record)
256+
end
257+
end
258+
ensure
259+
ite&.each { |i| i.committed!(should_run_callbacks: false) }
244260
end
245261
end
246-
ensure
247-
ite&.each { |i| i.committed!(should_run_callbacks: false) }
262+
263+
if @run_commit_callbacks
264+
@callbacks&.each(&:after_commit)
265+
elsif @callbacks
266+
connection.current_transaction.append_callbacks(@callbacks)
267+
end
248268
end
249269

250270
def full_rollback?; true; end
@@ -533,7 +553,7 @@ def within_new_transaction(isolation: nil, joinable: true)
533553
@connection.lock.synchronize do
534554
transaction = begin_transaction(isolation: isolation, joinable: joinable)
535555
begin
536-
yield
556+
yield transaction
537557
rescue Exception => error
538558
rollback_transaction
539559
after_failure_actions(transaction, error)
@@ -573,7 +593,7 @@ def current_transaction
573593
end
574594

575595
private
576-
NULL_TRANSACTION = NullTransaction.new
596+
NULL_TRANSACTION = NullTransaction.new.freeze
577597

578598
# Deallocate invalidated prepared statements outside of the transaction
579599
def after_failure_actions(transaction, error)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveRecord
4+
class Transaction
5+
class Callback # :nodoc:
6+
def initialize(event, callback)
7+
@event = event
8+
@callback = callback
9+
end
10+
11+
def before_commit
12+
@callback.call if @event == :before_commit
13+
end
14+
15+
def after_commit
16+
@callback.call if @event == :after_commit
17+
end
18+
19+
def after_rollback
20+
@callback.call if @event == :after_rollback
21+
end
22+
end
23+
24+
def initialize # :nodoc:
25+
@callbacks = nil
26+
end
27+
28+
# Registers a block to be called before the current transaction is fully committed.
29+
#
30+
# If there is no currently open transactions, the block is called immediately.
31+
#
32+
# If the current transaction has a parent transaction, the callback is transfered to
33+
# the parent when the current transaction commits, or dropped when the current transaction
34+
# is rolled back. This operation is repeated until the outermost transaction is reached.
35+
def before_commit(&block)
36+
(@callbacks ||= []) << Callback.new(:before_commit, block)
37+
end
38+
39+
# Registers a block to be called after the current transaction is fully committed.
40+
#
41+
# If there is no currently open transactions, the block is called immediately.
42+
#
43+
# If the current transaction has a parent transaction, the callback is transfered to
44+
# the parent when the current transaction commits, or dropped when the current transaction
45+
# is rolled back. This operation is repeated until the outermost transaction is reached.
46+
def after_commit(&block)
47+
(@callbacks ||= []) << Callback.new(:after_commit, block)
48+
end
49+
50+
# Registers a block to be called after the current transaction is rolled back.
51+
#
52+
# If there is no currently open transactions, the block is never called.
53+
#
54+
# If the current transaction is successfully committed but has a parent
55+
# transaction, the callback is automatically added to the parent transaction.
56+
#
57+
# If the entire chain of nested transactions are all successfully committed,
58+
# the block is never called.
59+
def after_rollback(&block)
60+
(@callbacks ||= []) << Callback.new(:after_rollback, block)
61+
end
62+
63+
protected
64+
def append_callbacks(callbacks)
65+
(@callbacks ||= []).concat(callbacks)
66+
end
67+
end
68+
end

activerecord/lib/active_record/transactions.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,11 @@ def transaction(**options, &block)
224224
end
225225
end
226226

227+
# Returns the current transaction. See ActiveRecord::Transactions API docs.
228+
def current_transaction
229+
connection_pool.active_connection&.current_transaction || ConnectionAdapters::NULL_TRANSACTION
230+
end
231+
227232
def before_commit(*args, &block) # :nodoc:
228233
set_options_for_callbacks!(args)
229234
set_callback(:before_commit, :before, *args, &block)

0 commit comments

Comments
 (0)