Skip to content

Commit 306ef99

Browse files
Ensure transaction tracking finishes when reconnecting
We noticed a couple of spots where transaction tracking events were potentially incorrect. When the connection reconnects with `restore_transactions: true`, we were keeping the original start time for the transaction. In this case it seems more accurate to treat these as separte transactions where the first one finishes as incomplete. Instead of forcing the adapter to tell the transaction manager to suspend (or `detach` or `lost`) the transactions, we can keep that logic encapsulated inside of the Transaction class for when to broadcast a finish event. This means that we won't capture a finish event if you manually call `reconnect!` without `restore_transactions: true`, but that might be worthy of the tradeoff since this is a rare use-case anyway. We also start raising here if the TransactionInstrumenter receives `#start` when already started, or `#finish` when not started. This ensures that we don't quietly end up in any weird states. When marking that a transaction is incomplete, we also want to check that the transaction is materialized to avoid finishing our instrumentation if it hasn't already started. Also added a test to simulate losing a connection without ever materializing a transaction. Co-authored-by: Daniel Colson <[email protected]>
1 parent adf1c21 commit 306ef99

File tree

2 files changed

+103
-7
lines changed

2 files changed

+103
-7
lines changed

activerecord/lib/active_record/connection_adapters/abstract/transaction.rb

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,11 @@ def initialize(payload = {})
8282
@base_payload = payload
8383
end
8484

85+
class InstrumentationNotStartedError < ActiveRecordError; end
86+
class InstrumentationAlreadyStartedError < ActiveRecordError; end
87+
8588
def start
86-
return if @started
89+
raise InstrumentationAlreadyStartedError.new("Called start on an already started transaction") if @started
8790
@started = true
8891

8992
@payload = @base_payload.dup
@@ -92,7 +95,7 @@ def start
9295
end
9396

9497
def finish(outcome)
95-
return unless @started
98+
raise InstrumentationNotStartedError.new("Called finish on a transaction that hasn't started") unless @started
9699
@started = false
97100

98101
@payload[:outcome] = outcome
@@ -166,7 +169,7 @@ def restartable?
166169
end
167170

168171
def incomplete!
169-
@instrumenter.finish(:incomplete)
172+
@instrumenter.finish(:incomplete) if materialized?
170173
end
171174

172175
def materialize!
@@ -180,6 +183,7 @@ def materialized?
180183

181184
def restore!
182185
if materialized?
186+
incomplete!
183187
@materialized = false
184188
materialize!
185189
end
@@ -348,13 +352,13 @@ def rollback
348352
connection.rollback_to_savepoint(savepoint_name) if materialized?
349353
end
350354
@state.rollback!
351-
@instrumenter.finish(:rollback)
355+
@instrumenter.finish(:rollback) if materialized?
352356
end
353357

354358
def commit
355359
connection.release_savepoint(savepoint_name) if materialized?
356360
@state.commit!
357-
@instrumenter.finish(:commit)
361+
@instrumenter.finish(:commit) if materialized?
358362
end
359363

360364
def full_rollback?; false; end
@@ -389,13 +393,13 @@ def restart
389393
def rollback
390394
connection.rollback_db_transaction if materialized?
391395
@state.full_rollback!
392-
@instrumenter.finish(:rollback)
396+
@instrumenter.finish(:rollback) if materialized?
393397
end
394398

395399
def commit
396400
connection.commit_db_transaction if materialized?
397401
@state.full_commit!
398-
@instrumenter.finish(:commit)
402+
@instrumenter.finish(:commit) if materialized?
399403
end
400404
end
401405

activerecord/test/cases/transaction_instrumentation_test.rb

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,27 @@ def test_transaction_instrumentation_with_unmaterialized_restart_parent_transact
129129
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
130130
end
131131

132+
def test_transaction_instrumentation_with_materialized_restart_parent_transactions
133+
topic = topics(:fifth)
134+
events = []
135+
subscriber = ActiveSupport::Notifications.subscribe("transaction.active_record") do |event|
136+
events << event
137+
end
138+
139+
ActiveRecord::Base.transaction do
140+
topic.update(title: "Sinatra")
141+
ActiveRecord::Base.transaction(requires_new: true) do
142+
raise ActiveRecord::Rollback
143+
end
144+
end
145+
146+
assert_equal 1, events.count
147+
event = events.first
148+
assert_equal :commit, event.payload[:outcome]
149+
ensure
150+
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
151+
end
152+
132153
def test_transaction_instrumentation_with_restart_savepoint_parent_transactions
133154
topic = topics(:fifth)
134155

@@ -156,6 +177,27 @@ def test_transaction_instrumentation_with_restart_savepoint_parent_transactions
156177
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
157178
end
158179

180+
def test_transaction_instrumentation_with_restart_savepoint_parent_transactions_on_commit
181+
topic = topics(:fifth)
182+
183+
events = []
184+
subscriber = ActiveSupport::Notifications.subscribe("transaction.active_record") do |event|
185+
events << event
186+
end
187+
188+
ActiveRecord::Base.transaction do
189+
topic.update(title: "Sinatra")
190+
ActiveRecord::Base.transaction(requires_new: true) do
191+
end
192+
end
193+
194+
assert_equal 1, events.count
195+
event = events.first
196+
assert_equal :commit, event.payload[:outcome]
197+
ensure
198+
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
199+
end
200+
159201
def test_transaction_instrumentation_only_fires_if_materialized
160202
notified = false
161203
subscriber = ActiveSupport::Notifications.subscribe("transaction.active_record") do |event|
@@ -170,6 +212,36 @@ def test_transaction_instrumentation_only_fires_if_materialized
170212
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
171213
end
172214

215+
def test_transaction_instrumentation_only_fires_on_rollback_if_materialized
216+
notified = false
217+
subscriber = ActiveSupport::Notifications.subscribe("transaction.active_record") do |event|
218+
notified = true
219+
end
220+
221+
ActiveRecord::Base.transaction do
222+
raise ActiveRecord::Rollback
223+
end
224+
225+
assert_not notified
226+
ensure
227+
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
228+
end
229+
230+
def test_reconnecting_after_materialized_transaction_starts_new_event
231+
events = []
232+
subscriber = ActiveSupport::Notifications.subscribe("transaction.active_record") do |event|
233+
events << event
234+
end
235+
Topic.transaction do
236+
Topic.connection.materialize_transactions
237+
Topic.connection.reconnect!(restore_transactions: true)
238+
end
239+
240+
assert_equal 2, events.count
241+
ensure
242+
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
243+
end
244+
173245
def test_transaction_instrumentation_fires_before_after_commit_callbacks
174246
notified = false
175247
after_commit_triggered = false
@@ -269,6 +341,26 @@ def test_transaction_instrumentation_on_failed_rollback
269341
ensure
270342
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
271343
end
344+
345+
def test_transaction_instrumentation_on_failed_rollback_when_unmaterialized
346+
notified = false
347+
subscriber = ActiveSupport::Notifications.subscribe("transaction.active_record") do |event|
348+
notified = true
349+
end
350+
351+
error = Class.new(StandardError)
352+
assert_raises error do
353+
# Stubbing this method simulates an error that occurs when the transaction is still unmaterilized.
354+
Topic.connection.transaction_manager.stub(:rollback_transaction, -> (*) { raise error }) do
355+
Topic.transaction do
356+
raise ActiveRecord::Rollback
357+
end
358+
end
359+
end
360+
assert_not notified
361+
ensure
362+
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
363+
end
272364
end
273365

274366
def test_transaction_instrumentation_on_broken_subscription

0 commit comments

Comments
 (0)