Skip to content

Commit a8d6d47

Browse files
authored
Merge pull request rails#50999 from Shopify/refactor-transactional-fixtures
Decouple transactional fixtures and active connections
2 parents 0f9aaa5 + 1dcb411 commit a8d6d47

File tree

13 files changed

+198
-110
lines changed

13 files changed

+198
-110
lines changed

activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -162,34 +162,56 @@ def initialize(pool_config)
162162
@threads_blocking_new_connections = 0
163163

164164
@available = ConnectionLeasingQueue.new self
165-
166-
@lock_thread = false
165+
@pinned_connection = nil
167166

168167
@async_executor = build_async_executor
169168

170169
@reaper = Reaper.new(self, db_config.reaping_frequency)
171170
@reaper.run
172171
end
173172

174-
def lock_thread=(lock_thread)
175-
if lock_thread
176-
@lock_thread = ActiveSupport::IsolatedExecutionState.context
177-
else
178-
@lock_thread = nil
179-
end
180-
181-
if (active_connection = @thread_cached_conns[connection_cache_key(current_thread)])
182-
active_connection.lock_thread = @lock_thread
183-
end
184-
end
185-
186173
# Retrieve the connection associated with the current thread, or call
187174
# #checkout to obtain one if necessary.
188175
#
189176
# #connection can be called any number of times; the connection is
190177
# held in a cache keyed by a thread.
191178
def connection
192-
@thread_cached_conns[connection_cache_key(current_thread)] ||= checkout
179+
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout
180+
end
181+
182+
def pin_connection!(lock_thread) # :nodoc:
183+
raise "There is already a pinned connection" if @pinned_connection
184+
185+
@pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout)
186+
# Any leased connection must be in @connections otherwise
187+
# some methods like #connected? won't behave correctly
188+
unless @connections.include?(@pinned_connection)
189+
@connections << @pinned_connection
190+
end
191+
192+
@pinned_connection.lock_thread = ActiveSupport::IsolatedExecutionState.context if lock_thread
193+
@pinned_connection.verify! # eagerly validate the connection
194+
@pinned_connection.begin_transaction joinable: false, _lazy: false
195+
end
196+
197+
def unpin_connection! # :nodoc:
198+
raise "There isn't a pinned connection #{object_id}" unless @pinned_connection
199+
200+
clean = true
201+
@pinned_connection.lock.synchronize do
202+
connection, @pinned_connection = @pinned_connection, nil
203+
if connection.transaction_open?
204+
connection.rollback_transaction
205+
else
206+
# Something committed or rolled back the transaction
207+
clean = false
208+
connection.reset!
209+
end
210+
connection.lock_thread = nil
211+
checkin(connection)
212+
end
213+
214+
clean
193215
end
194216

195217
def connection_class # :nodoc:
@@ -204,7 +226,7 @@ def connection_class # :nodoc:
204226
# #connection or #with_connection methods. Connections obtained through
205227
# #checkout will not be detected by #active_connection?
206228
def active_connection?
207-
@thread_cached_conns[connection_cache_key(current_thread)]
229+
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
208230
end
209231

210232
# Signal that the thread is finished with the current connection.
@@ -276,6 +298,7 @@ def disconnect(raise_on_acquisition_timeout = true)
276298
conn.disconnect!
277299
end
278300
@connections = []
301+
@thread_cached_conns.clear
279302
@available.clear
280303
end
281304
end
@@ -360,9 +383,19 @@ def clear_reloadable_connections!
360383
# Raises:
361384
# - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
362385
def checkout(checkout_timeout = @checkout_timeout)
363-
connection = checkout_and_verify(acquire_connection(checkout_timeout))
364-
connection.lock_thread = @lock_thread
365-
connection
386+
if @pinned_connection
387+
synchronize do
388+
@pinned_connection.verify!
389+
# Any leased connection must be in @connections otherwise
390+
# some methods like #connected? won't behave correctly
391+
unless @connections.include?(@pinned_connection)
392+
@connections << @pinned_connection
393+
end
394+
end
395+
@pinned_connection
396+
else
397+
checkout_and_verify(acquire_connection(checkout_timeout))
398+
end
366399
end
367400

368401
# Check-in a database connection back into the pool, indicating that you
@@ -371,6 +404,8 @@ def checkout(checkout_timeout = @checkout_timeout)
371404
# +conn+: an AbstractAdapter object, which was obtained by earlier by
372405
# calling #checkout on this pool.
373406
def checkin(conn)
407+
return if @pinned_connection.equal?(conn)
408+
374409
conn.lock.synchronize do
375410
synchronize do
376411
remove_connection_from_thread_cache conn
@@ -379,7 +414,6 @@ def checkin(conn)
379414
conn.expire
380415
end
381416

382-
conn.lock_thread = nil
383417
@available.add conn
384418
end
385419
end
@@ -533,10 +567,6 @@ def connection_cache_key(thread)
533567
thread
534568
end
535569

536-
def current_thread
537-
@lock_thread || ActiveSupport::IsolatedExecutionState.context
538-
end
539-
540570
# Take control of all existing connections so a "group" action such as
541571
# reload/disconnect can be performed safely. It is no longer enough to
542572
# wrap it in +synchronize+ because some pool's actions are allowed

activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@ def initialize(*)
3636
end
3737

3838
def enable_query_cache!
39-
@query_cache_enabled[connection_cache_key(current_thread)] = true
39+
@query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] = true
4040
connection.enable_query_cache! if active_connection?
4141
end
4242

4343
def disable_query_cache!
44-
@query_cache_enabled.delete connection_cache_key(current_thread)
44+
@query_cache_enabled.delete connection_cache_key(ActiveSupport::IsolatedExecutionState.context)
4545
connection.disable_query_cache! if active_connection?
4646
end
4747

4848
def query_cache_enabled
49-
@query_cache_enabled[connection_cache_key(current_thread)]
49+
@query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
5050
end
5151
end
5252

activerecord/lib/active_record/connection_adapters/abstract_adapter.rb

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,13 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca
174174
@verified = false
175175
end
176176

177-
THREAD_LOCK = ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new
178-
private_constant :THREAD_LOCK
179-
180-
FIBER_LOCK = ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
181-
private_constant :FIBER_LOCK
182-
183177
def lock_thread=(lock_thread) # :nodoc:
184178
@lock =
185179
case lock_thread
186180
when Thread
187-
THREAD_LOCK
181+
ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new
188182
when Fiber
189-
FIBER_LOCK
183+
ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
190184
else
191185
ActiveSupport::Concurrency::NullLock
192186
end

activerecord/lib/active_record/test_fixtures.rb

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ def setup_fixtures(config = ActiveRecord::Base)
131131
end
132132

133133
@fixture_cache = {}
134-
@fixture_connections = {}
135134
@fixture_cache_key = [self.class.fixture_table_names.dup, self.class.fixture_paths.dup, self.class.fixture_class_names.dup]
135+
@fixture_connection_pools = []
136136
@@already_loaded_fixtures ||= {}
137137
@connection_subscriber = nil
138138
@saved_pool_configs = Hash.new { |hash, key| hash[key] = {} }
@@ -163,20 +163,24 @@ def teardown_fixtures
163163
teardown_transactional_fixtures
164164
else
165165
ActiveRecord::FixtureSet.reset_cache
166+
invalidate_already_loaded_fixtures
166167
end
167168

168169
ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
169170
end
170171

172+
def invalidate_already_loaded_fixtures
173+
@@already_loaded_fixtures.clear
174+
end
175+
171176
def setup_transactional_fixtures
172177
setup_shared_connection_pool
173178

174179
# Begin transactions for connections already established
175-
@fixture_connections = ActiveRecord::Base.connection_handler.connection_pool_list(:writing).to_h do |pool|
176-
pool.lock_thread = true if lock_threads
177-
connection = pool.connection
178-
transaction = connection.begin_transaction(joinable: false, _lazy: false)
179-
[connection, transaction]
180+
@fixture_connection_pools = ActiveRecord::Base.connection_handler.connection_pool_list(:writing)
181+
@fixture_connection_pools.each do |pool|
182+
pool.pin_connection!(lock_threads)
183+
pool.connection
180184
end
181185

182186
# When connections are established in the future, begin a transaction too
@@ -185,19 +189,14 @@ def setup_transactional_fixtures
185189
shard = payload[:shard] if payload.key?(:shard)
186190

187191
if connection_name
188-
begin
189-
connection = ActiveRecord::Base.connection_handler.retrieve_connection(connection_name, shard: shard)
190-
connection.connect! # eagerly validate the connection
191-
rescue ConnectionNotEstablished
192-
connection = nil
193-
end
194-
195-
if connection
192+
pool = ActiveRecord::Base.connection_handler.retrieve_connection_pool(connection_name, shard: shard)
193+
if pool
196194
setup_shared_connection_pool
197195

198-
if !@fixture_connections.key?(connection)
199-
connection.pool.lock_thread = true if lock_threads
200-
@fixture_connections[connection] = connection.begin_transaction(joinable: false, _lazy: false)
196+
unless @fixture_connection_pools.include?(pool)
197+
pool.pin_connection!(lock_threads)
198+
pool.connection
199+
@fixture_connection_pools << pool
201200
end
202201
end
203202
end
@@ -206,27 +205,15 @@ def setup_transactional_fixtures
206205

207206
def teardown_transactional_fixtures
208207
ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber
209-
@fixture_connections.each do |connection, transaction|
210-
begin
211-
connection.rollback_transaction(transaction)
212-
rescue ActiveRecord::StatementInvalid
213-
# Something commited or rolled back the transaction.
214-
# We can no longer trust the database state is clean.
215-
invalidate_already_loaded_fixtures
216-
# We also don't know for sure the connection wasn't
217-
# mutated in dangerous ways.
218-
connection.disconnect!
219-
end
220-
connection.pool.lock_thread = false
208+
unless @fixture_connection_pools.map(&:unpin_connection!).all?
209+
# Something caused the transaction to be committed or rolled back
210+
# We can no longer trust the database is in a clean state.
211+
@@already_loaded_fixtures.clear
221212
end
222-
@fixture_connections.clear
213+
@fixture_connection_pools.clear
223214
teardown_shared_connection_pool
224215
end
225216

226-
def invalidate_already_loaded_fixtures
227-
@@already_loaded_fixtures.clear
228-
end
229-
230217
# Shares the writing connection pool with connections on
231218
# other handlers.
232219
#

activerecord/test/cases/adapters/abstract_mysql_adapter/connection_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def test_no_automatic_reconnection_after_timeout
3737
assert_not_predicate @connection, :active?
3838
ensure
3939
# Repair all fixture connections so other tests won't break.
40-
@fixture_connections.each_key(&:verify!)
40+
@fixture_connection_pools.each { |p| p.connection.verify! }
4141
end
4242

4343
def test_successful_reconnection_after_timeout_with_manual_reconnect

activerecord/test/cases/adapters/postgresql/connection_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def test_reconnection_after_actual_disconnection_with_verify
145145
assert_predicate @connection, :active?
146146
ensure
147147
# Repair all fixture connections so other tests won't break.
148-
@fixture_connections.each_key(&:verify!)
148+
@fixture_connection_pools.each { |p| p.connection.verify! }
149149
end
150150

151151
def test_set_session_variable_true

activerecord/test/cases/connection_adapters/connection_handler_test.rb

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
module ActiveRecord
77
module ConnectionAdapters
88
class ConnectionHandlerTest < ActiveRecord::TestCase
9-
self.use_transactional_tests = false
10-
119
fixtures :people
1210

1311
def setup
@@ -95,8 +93,6 @@ def test_fixtures_dont_raise_if_theres_no_writing_pool_config
9593
connection_handler = ActiveRecord::Base.connection_handler
9694
ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
9795

98-
setup_transactional_fixtures
99-
10096
assert_nothing_raised do
10197
ActiveRecord::Base.connects_to(database: { reading: :arunit, writing: :arunit })
10298
end
@@ -105,8 +101,6 @@ def test_fixtures_dont_raise_if_theres_no_writing_pool_config
105101
ro_conn = ActiveRecord::Base.connection_handler.retrieve_connection("ActiveRecord::Base", role: :reading)
106102

107103
assert_equal rw_conn, ro_conn
108-
109-
teardown_transactional_fixtures
110104
ensure
111105
ActiveRecord::Base.connection_handler = connection_handler
112106
end

0 commit comments

Comments
 (0)