Skip to content

Commit 1dcb411

Browse files
committed
Decouple transactional fixtures and active connections
Ref: rails#50793 Transactional fixtures are currently tightly coupled with the pool active connection. It assumes calling `pool.connection` will memoize the checked out connection and leverage that to start a transaction on it and ensure all subsequent accesses will get the same connection. To allow to remove checkout caching (or make it optional), we first must decouple transactional fixtures to not rely on it. The idea is to behave similarly, but store the connection in the pool as a special "pinned" connection, and not as the regular active connection. This allows to always return the same pinned connection, but without necessarily assigning it as the active connection. Additionally, this pinning impact all threads and fibers, so that all threads have a consistent view of the database state.
1 parent f5ea4a5 commit 1dcb411

File tree

13 files changed

+198
-110
lines changed

13 files changed

+198
-110
lines changed

activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -162,34 +162,56 @@ def initialize(pool_config)
162162
@threads_blocking_new_connections = 0
163163

164164
@available = ConnectionLeasingQueue.new self
165-
166-
@lock_thread = false
165+
@pinned_connection = nil
167166

168167
@async_executor = build_async_executor
169168

170169
@reaper = Reaper.new(self, db_config.reaping_frequency)
171170
@reaper.run
172171
end
173172

174-
def lock_thread=(lock_thread)
175-
if lock_thread
176-
@lock_thread = ActiveSupport::IsolatedExecutionState.context
177-
else
178-
@lock_thread = nil
179-
end
180-
181-
if (active_connection = @thread_cached_conns[connection_cache_key(current_thread)])
182-
active_connection.lock_thread = @lock_thread
183-
end
184-
end
185-
186173
# Retrieve the connection associated with the current thread, or call
187174
# #checkout to obtain one if necessary.
188175
#
189176
# #connection can be called any number of times; the connection is
190177
# held in a cache keyed by a thread.
191178
def connection
192-
@thread_cached_conns[connection_cache_key(current_thread)] ||= checkout
179+
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout
180+
end
181+
182+
def pin_connection!(lock_thread) # :nodoc:
183+
raise "There is already a pinned connection" if @pinned_connection
184+
185+
@pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout)
186+
# Any leased connection must be in @connections otherwise
187+
# some methods like #connected? won't behave correctly
188+
unless @connections.include?(@pinned_connection)
189+
@connections << @pinned_connection
190+
end
191+
192+
@pinned_connection.lock_thread = ActiveSupport::IsolatedExecutionState.context if lock_thread
193+
@pinned_connection.verify! # eagerly validate the connection
194+
@pinned_connection.begin_transaction joinable: false, _lazy: false
195+
end
196+
197+
def unpin_connection! # :nodoc:
198+
raise "There isn't a pinned connection #{object_id}" unless @pinned_connection
199+
200+
clean = true
201+
@pinned_connection.lock.synchronize do
202+
connection, @pinned_connection = @pinned_connection, nil
203+
if connection.transaction_open?
204+
connection.rollback_transaction
205+
else
206+
# Something committed or rolled back the transaction
207+
clean = false
208+
connection.reset!
209+
end
210+
connection.lock_thread = nil
211+
checkin(connection)
212+
end
213+
214+
clean
193215
end
194216

195217
def connection_class # :nodoc:
@@ -204,7 +226,7 @@ def connection_class # :nodoc:
204226
# #connection or #with_connection methods. Connections obtained through
205227
# #checkout will not be detected by #active_connection?
206228
def active_connection?
207-
@thread_cached_conns[connection_cache_key(current_thread)]
229+
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
208230
end
209231

210232
# Signal that the thread is finished with the current connection.
@@ -276,6 +298,7 @@ def disconnect(raise_on_acquisition_timeout = true)
276298
conn.disconnect!
277299
end
278300
@connections = []
301+
@thread_cached_conns.clear
279302
@available.clear
280303
end
281304
end
@@ -360,9 +383,19 @@ def clear_reloadable_connections!
360383
# Raises:
361384
# - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool.
362385
def checkout(checkout_timeout = @checkout_timeout)
363-
connection = checkout_and_verify(acquire_connection(checkout_timeout))
364-
connection.lock_thread = @lock_thread
365-
connection
386+
if @pinned_connection
387+
synchronize do
388+
@pinned_connection.verify!
389+
# Any leased connection must be in @connections otherwise
390+
# some methods like #connected? won't behave correctly
391+
unless @connections.include?(@pinned_connection)
392+
@connections << @pinned_connection
393+
end
394+
end
395+
@pinned_connection
396+
else
397+
checkout_and_verify(acquire_connection(checkout_timeout))
398+
end
366399
end
367400

368401
# Check-in a database connection back into the pool, indicating that you
@@ -371,6 +404,8 @@ def checkout(checkout_timeout = @checkout_timeout)
371404
# +conn+: an AbstractAdapter object, which was obtained by earlier by
372405
# calling #checkout on this pool.
373406
def checkin(conn)
407+
return if @pinned_connection.equal?(conn)
408+
374409
conn.lock.synchronize do
375410
synchronize do
376411
remove_connection_from_thread_cache conn
@@ -379,7 +414,6 @@ def checkin(conn)
379414
conn.expire
380415
end
381416

382-
conn.lock_thread = nil
383417
@available.add conn
384418
end
385419
end
@@ -533,10 +567,6 @@ def connection_cache_key(thread)
533567
thread
534568
end
535569

536-
def current_thread
537-
@lock_thread || ActiveSupport::IsolatedExecutionState.context
538-
end
539-
540570
# Take control of all existing connections so a "group" action such as
541571
# reload/disconnect can be performed safely. It is no longer enough to
542572
# wrap it in +synchronize+ because some pool's actions are allowed

activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@ def initialize(*)
3636
end
3737

3838
def enable_query_cache!
39-
@query_cache_enabled[connection_cache_key(current_thread)] = true
39+
@query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] = true
4040
connection.enable_query_cache! if active_connection?
4141
end
4242

4343
def disable_query_cache!
44-
@query_cache_enabled.delete connection_cache_key(current_thread)
44+
@query_cache_enabled.delete connection_cache_key(ActiveSupport::IsolatedExecutionState.context)
4545
connection.disable_query_cache! if active_connection?
4646
end
4747

4848
def query_cache_enabled
49-
@query_cache_enabled[connection_cache_key(current_thread)]
49+
@query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
5050
end
5151
end
5252

activerecord/lib/active_record/connection_adapters/abstract_adapter.rb

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,13 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca
174174
@verified = false
175175
end
176176

177-
THREAD_LOCK = ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new
178-
private_constant :THREAD_LOCK
179-
180-
FIBER_LOCK = ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
181-
private_constant :FIBER_LOCK
182-
183177
def lock_thread=(lock_thread) # :nodoc:
184178
@lock =
185179
case lock_thread
186180
when Thread
187-
THREAD_LOCK
181+
ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new
188182
when Fiber
189-
FIBER_LOCK
183+
ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new
190184
else
191185
ActiveSupport::Concurrency::NullLock
192186
end

activerecord/lib/active_record/test_fixtures.rb

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ def setup_fixtures(config = ActiveRecord::Base)
131131
end
132132

133133
@fixture_cache = {}
134-
@fixture_connections = {}
135134
@fixture_cache_key = [self.class.fixture_table_names.dup, self.class.fixture_paths.dup, self.class.fixture_class_names.dup]
135+
@fixture_connection_pools = []
136136
@@already_loaded_fixtures ||= {}
137137
@connection_subscriber = nil
138138
@saved_pool_configs = Hash.new { |hash, key| hash[key] = {} }
@@ -163,20 +163,24 @@ def teardown_fixtures
163163
teardown_transactional_fixtures
164164
else
165165
ActiveRecord::FixtureSet.reset_cache
166+
invalidate_already_loaded_fixtures
166167
end
167168

168169
ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
169170
end
170171

172+
def invalidate_already_loaded_fixtures
173+
@@already_loaded_fixtures.clear
174+
end
175+
171176
def setup_transactional_fixtures
172177
setup_shared_connection_pool
173178

174179
# Begin transactions for connections already established
175-
@fixture_connections = ActiveRecord::Base.connection_handler.connection_pool_list(:writing).to_h do |pool|
176-
pool.lock_thread = true if lock_threads
177-
connection = pool.connection
178-
transaction = connection.begin_transaction(joinable: false, _lazy: false)
179-
[connection, transaction]
180+
@fixture_connection_pools = ActiveRecord::Base.connection_handler.connection_pool_list(:writing)
181+
@fixture_connection_pools.each do |pool|
182+
pool.pin_connection!(lock_threads)
183+
pool.connection
180184
end
181185

182186
# When connections are established in the future, begin a transaction too
@@ -185,19 +189,14 @@ def setup_transactional_fixtures
185189
shard = payload[:shard] if payload.key?(:shard)
186190

187191
if connection_name
188-
begin
189-
connection = ActiveRecord::Base.connection_handler.retrieve_connection(connection_name, shard: shard)
190-
connection.connect! # eagerly validate the connection
191-
rescue ConnectionNotEstablished
192-
connection = nil
193-
end
194-
195-
if connection
192+
pool = ActiveRecord::Base.connection_handler.retrieve_connection_pool(connection_name, shard: shard)
193+
if pool
196194
setup_shared_connection_pool
197195

198-
if !@fixture_connections.key?(connection)
199-
connection.pool.lock_thread = true if lock_threads
200-
@fixture_connections[connection] = connection.begin_transaction(joinable: false, _lazy: false)
196+
unless @fixture_connection_pools.include?(pool)
197+
pool.pin_connection!(lock_threads)
198+
pool.connection
199+
@fixture_connection_pools << pool
201200
end
202201
end
203202
end
@@ -206,27 +205,15 @@ def setup_transactional_fixtures
206205

207206
def teardown_transactional_fixtures
208207
ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber
209-
@fixture_connections.each do |connection, transaction|
210-
begin
211-
connection.rollback_transaction(transaction)
212-
rescue ActiveRecord::StatementInvalid
213-
# Something commited or rolled back the transaction.
214-
# We can no longer trust the database state is clean.
215-
invalidate_already_loaded_fixtures
216-
# We also don't know for sure the connection wasn't
217-
# mutated in dangerous ways.
218-
connection.disconnect!
219-
end
220-
connection.pool.lock_thread = false
208+
unless @fixture_connection_pools.map(&:unpin_connection!).all?
209+
# Something caused the transaction to be committed or rolled back
210+
# We can no longer trust the database is in a clean state.
211+
@@already_loaded_fixtures.clear
221212
end
222-
@fixture_connections.clear
213+
@fixture_connection_pools.clear
223214
teardown_shared_connection_pool
224215
end
225216

226-
def invalidate_already_loaded_fixtures
227-
@@already_loaded_fixtures.clear
228-
end
229-
230217
# Shares the writing connection pool with connections on
231218
# other handlers.
232219
#

activerecord/test/cases/adapters/abstract_mysql_adapter/connection_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def test_no_automatic_reconnection_after_timeout
3737
assert_not_predicate @connection, :active?
3838
ensure
3939
# Repair all fixture connections so other tests won't break.
40-
@fixture_connections.each_key(&:verify!)
40+
@fixture_connection_pools.each { |p| p.connection.verify! }
4141
end
4242

4343
def test_successful_reconnection_after_timeout_with_manual_reconnect

activerecord/test/cases/adapters/postgresql/connection_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def test_reconnection_after_actual_disconnection_with_verify
145145
assert_predicate @connection, :active?
146146
ensure
147147
# Repair all fixture connections so other tests won't break.
148-
@fixture_connections.each_key(&:verify!)
148+
@fixture_connection_pools.each { |p| p.connection.verify! }
149149
end
150150

151151
def test_set_session_variable_true

activerecord/test/cases/connection_adapters/connection_handler_test.rb

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
module ActiveRecord
77
module ConnectionAdapters
88
class ConnectionHandlerTest < ActiveRecord::TestCase
9-
self.use_transactional_tests = false
10-
119
fixtures :people
1210

1311
def setup
@@ -95,8 +93,6 @@ def test_fixtures_dont_raise_if_theres_no_writing_pool_config
9593
connection_handler = ActiveRecord::Base.connection_handler
9694
ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
9795

98-
setup_transactional_fixtures
99-
10096
assert_nothing_raised do
10197
ActiveRecord::Base.connects_to(database: { reading: :arunit, writing: :arunit })
10298
end
@@ -105,8 +101,6 @@ def test_fixtures_dont_raise_if_theres_no_writing_pool_config
105101
ro_conn = ActiveRecord::Base.connection_handler.retrieve_connection("ActiveRecord::Base", role: :reading)
106102

107103
assert_equal rw_conn, ro_conn
108-
109-
teardown_transactional_fixtures
110104
ensure
111105
ActiveRecord::Base.connection_handler = connection_handler
112106
end

0 commit comments

Comments
 (0)