Skip to content

Commit 75e3407

Browse files
authored
Merge pull request rails#51192 from Shopify/connection-leasing-2
Make `.connection` always return a permanently leased connection
2 parents 1f6cef4 + 38e8609 commit 75e3407

File tree

7 files changed

+178
-45
lines changed

7 files changed

+178
-45
lines changed

activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb

Lines changed: 117 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,90 @@ def db_config
113113
# * private methods that require being called in a +synchronize+ blocks
114114
# are now explicitly documented
115115
class ConnectionPool
116+
class Lease # :nodoc:
117+
attr_accessor :connection, :sticky
118+
119+
def initialize
120+
@connection = nil
121+
@sticky = false
122+
end
123+
124+
def release
125+
conn = @connection
126+
@connection = nil
127+
@sticky = false
128+
conn
129+
end
130+
131+
def clear(connection)
132+
if @connection == connection
133+
@connection = nil
134+
@sticky = false
135+
true
136+
else
137+
false
138+
end
139+
end
140+
end
141+
142+
if ObjectSpace.const_defined?(:WeakKeyMap) # RUBY_VERSION >= 3.3
143+
WeakKeyMap = ::ObjectSpace::WeakKeyMap # :nodoc:
144+
else
145+
class WeakKeyMap # :nodoc:
146+
def initialize
147+
@map = ObjectSpace::WeakMap.new
148+
@values = nil
149+
@size = 0
150+
end
151+
152+
alias_method :clear, :initialize
153+
154+
def [](key)
155+
prune if @map.size != @size
156+
@map[key]
157+
end
158+
159+
def []=(key, value)
160+
@map[key] = value
161+
prune if @map.size != @size
162+
value
163+
end
164+
165+
def delete(key)
166+
if value = self[key]
167+
self[key] = nil
168+
prune
169+
end
170+
value
171+
end
172+
173+
private
174+
def prune(force = false)
175+
@values = @map.values
176+
@size = @map.size
177+
end
178+
end
179+
end
180+
181+
class LeaseRegistry # :nodoc:
182+
def initialize
183+
@mutex = Mutex.new
184+
@map = WeakKeyMap.new
185+
end
186+
187+
def [](context)
188+
@mutex.synchronize do
189+
@map[context] ||= Lease.new
190+
end
191+
end
192+
193+
def clear
194+
@mutex.synchronize do
195+
@map = WeakKeyMap.new
196+
end
197+
end
198+
end
199+
116200
include MonitorMixin
117201
prepend QueryCache::ConnectionPoolConfiguration
118202
include ConnectionAdapters::AbstractPool
@@ -148,9 +232,9 @@ def initialize(pool_config)
148232
# then that +thread+ does indeed own that +conn+. However, an absence of such
149233
# mapping does not mean that the +thread+ doesn't own the said connection. In
150234
# that case +conn.owner+ attr should be consulted.
151-
# Access and modification of <tt>@thread_cached_conns</tt> does not require
235+
# Access and modification of <tt>@leases</tt> does not require
152236
# synchronization.
153-
@thread_cached_conns = Concurrent::Map.new(initial_capacity: @size)
237+
@leases = LeaseRegistry.new
154238

155239
@connections = []
156240
@automatic_reconnect = true
@@ -203,14 +287,18 @@ def internal_metadata # :nodoc:
203287
#
204288
# #connection can be called any number of times; the connection is
205289
# held in a cache keyed by a thread.
206-
def connection
207-
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] ||= checkout
290+
def lease_connection
291+
lease = connection_lease
292+
lease.sticky = true
293+
lease.connection ||= checkout
208294
end
209295

296+
alias_method :connection, :lease_connection # TODO: deprecate
297+
210298
def pin_connection!(lock_thread) # :nodoc:
211299
raise "There is already a pinned connection" if @pinned_connection
212300

213-
@pinned_connection = (@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] || checkout)
301+
@pinned_connection = (connection_lease&.connection || checkout)
214302
# Any leased connection must be in @connections otherwise
215303
# some methods like #connected? won't behave correctly
216304
unless @connections.include?(@pinned_connection)
@@ -252,7 +340,7 @@ def connection_class # :nodoc:
252340
# #connection or #with_connection methods. Connections obtained through
253341
# #checkout will not be detected by #active_connection?
254342
def active_connection?
255-
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context]
343+
connection_lease.connection
256344
end
257345

258346
# Signal that the thread is finished with the current connection.
@@ -262,10 +350,12 @@ def active_connection?
262350
# This method only works for connections that have been obtained through
263351
# #connection or #with_connection methods, connections obtained through
264352
# #checkout will not be automatically released.
265-
def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context)
266-
if conn = @thread_cached_conns.delete(owner_thread)
353+
def release_connection(existing_lease = nil)
354+
if conn = connection_lease.release
267355
checkin conn
356+
return true
268357
end
358+
false
269359
end
270360

271361
# Yields a connection from the connection pool to the block. If no connection
@@ -278,13 +368,14 @@ def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.cont
278368
# connection will be properly returned to the pool by the code that checked
279369
# it out.
280370
def with_connection
281-
if conn = @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context]
282-
yield conn
371+
lease = connection_lease
372+
if lease.connection
373+
yield lease.connection
283374
else
284375
begin
285-
yield connection
376+
yield lease.connection = checkout
286377
ensure
287-
release_connection
378+
release_connection(lease) unless lease.sticky
288379
end
289380
end
290381
end
@@ -326,7 +417,7 @@ def disconnect(raise_on_acquisition_timeout = true)
326417
conn.disconnect!
327418
end
328419
@connections = []
329-
@thread_cached_conns.clear
420+
@leases.clear
330421
@available.clear
331422
end
332423
end
@@ -353,7 +444,7 @@ def discard! # :nodoc:
353444
@connections.each do |conn|
354445
conn.discard!
355446
end
356-
@connections = @available = @thread_cached_conns = nil
447+
@connections = @available = @leases = nil
357448
end
358449
end
359450

@@ -436,7 +527,7 @@ def checkin(conn)
436527

437528
conn.lock.synchronize do
438529
synchronize do
439-
remove_connection_from_thread_cache conn
530+
connection_lease.clear(conn)
440531

441532
conn._run_checkin_callbacks do
442533
conn.expire
@@ -560,6 +651,10 @@ def schedule_query(future_result) # :nodoc:
560651
end
561652

562653
private
654+
def connection_lease
655+
@leases[ActiveSupport::IsolatedExecutionState.context]
656+
end
657+
563658
def build_async_executor
564659
case ActiveRecord.async_query_executor
565660
when :multi_thread_pool
@@ -734,17 +829,10 @@ def acquire_connection(checkout_timeout)
734829
#--
735830
# if owner_thread param is omitted, this must be called in synchronize block
736831
def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
737-
@thread_cached_conns.delete_pair(owner_thread, conn)
832+
@leases[owner_thread].clear(conn)
738833
end
739834
alias_method :release, :remove_connection_from_thread_cache
740835

741-
def prune_thread_cache
742-
dead_threads = @thread_cached_conns.keys.reject(&:alive?)
743-
dead_threads.each do |dead_thread|
744-
@thread_cached_conns.delete(dead_thread)
745-
end
746-
end
747-
748836
def new_connection
749837
connection = db_config.new_connection
750838
connection.pool = self
@@ -788,6 +876,12 @@ def try_to_checkout_new_connection
788876
def adopt_connection(conn)
789877
conn.pool = self
790878
@connections << conn
879+
880+
# We just created the first connection, it's time to load the schema
881+
# cache if that wasn't eagerly done before
882+
if @schema_cache.nil? && ActiveRecord.lazily_load_schema_cache
883+
schema_cache.load!
884+
end
791885
end
792886

793887
def checkout_new_connection

activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def initialize(...)
8989
end
9090
end
9191

92-
def checkout(...)
92+
def lease_connection
9393
connection = super
9494
connection.query_cache ||= query_cache
9595
connection
@@ -141,7 +141,6 @@ def clear_query_cache
141141

142142
private
143143
def prune_thread_cache
144-
super
145144
dead_threads = @thread_query_caches.keys.reject(&:alive?)
146145
dead_threads.each do |dead_thread|
147146
@thread_query_caches.delete(dead_thread)

activerecord/lib/active_record/connection_adapters/abstract_adapter.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ def pool=(value)
4949
return if value.eql?(@pool)
5050
@schema_cache = nil
5151
@pool = value
52-
53-
if @pool && ActiveRecord.lazily_load_schema_cache
54-
@pool.schema_reflection.load!(@pool)
55-
end
5652
end
5753

5854
set_callback :checkin, :after, :enable_lazy_transactions!

activerecord/lib/active_record/connection_handling.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,17 @@ def clear_query_caches_for_current_thread
250250
# Returns the connection currently associated with the class. This can
251251
# also be used to "borrow" the connection to do database work unrelated
252252
# to any of the specific Active Records.
253-
def connection
254-
connection_pool.connection
253+
# The connection will remain leased for the entire duration of the request
254+
# or job, or until +#release_connection+ is called.
255+
def lease_connection
256+
connection_pool.lease_connection
257+
end
258+
259+
alias_method :connection, :lease_connection
260+
261+
# Return the currently leased connection into the pool
262+
def release_connection
263+
connection.release_connection
255264
end
256265

257266
# Checkouts a connection from the pool, yield it and then check it back in.

activerecord/test/cases/connection_handling_test.rb

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,28 @@ class ConnectionHandlingTest < ActiveRecord::TestCase
1111

1212
ActiveRecord::Base.with_connection do |connection|
1313
assert_predicate ActiveRecord::Base.connection_pool, :active_connection?
14-
assert_same connection, ActiveRecord::Base.connection
1514
end
1615

1716
assert_not_predicate ActiveRecord::Base.connection_pool, :active_connection?
1817
end
1918

19+
test "#connection makes the lease permanent even inside #with_connection" do
20+
ActiveRecord::Base.connection_pool.release_connection
21+
assert_not_predicate ActiveRecord::Base.connection_pool, :active_connection?
22+
23+
conn = nil
24+
ActiveRecord::Base.with_connection do |connection|
25+
conn = connection
26+
assert_predicate ActiveRecord::Base.connection_pool, :active_connection?
27+
2.times do
28+
assert_same connection, ActiveRecord::Base.connection
29+
end
30+
end
31+
32+
assert_predicate ActiveRecord::Base.connection_pool, :active_connection?
33+
assert_same conn, ActiveRecord::Base.connection
34+
end
35+
2036
test "#with_connection use the already leased connection if available" do
2137
leased_connection = ActiveRecord::Base.connection
2238
assert_predicate ActiveRecord::Base.connection_pool, :active_connection?

activerecord/test/cases/connection_pool_test.rb

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ def teardown
4646
ActiveSupport::IsolatedExecutionState.isolation_level = @previous_isolation_level
4747
end
4848

49-
def active_connections(pool)
50-
pool.connections.find_all(&:in_use?)
51-
end
52-
5349
def test_checkout_after_close
5450
connection = pool.connection
5551
assert_predicate connection, :in_use?
@@ -90,6 +86,16 @@ def test_with_connection
9086
assert_equal 2, active_connections(pool).size
9187
end
9288
assert_equal 1, active_connections(pool).size
89+
90+
pool.with_connection do |conn|
91+
assert conn
92+
assert_equal 2, active_connections(pool).size
93+
pool.connection # lease
94+
end
95+
96+
assert_equal 2, active_connections(pool).size
97+
pool.release_connection
98+
assert_equal 1, active_connections(pool).size
9399
}.join
94100

95101
main_thread.close
@@ -697,15 +703,24 @@ def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads
697703

698704
def test_bang_versions_of_disconnect_and_clear_reloadable_connections_if_unable_to_acquire_all_connections_proceed_anyway
699705
@pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout
700-
[:disconnect!, :clear_reloadable_connections!].each do |group_action_method|
701-
@pool.with_connection do |connection|
702-
new_thread { @pool.send(group_action_method) }.join
703-
# assert connection has been forcefully taken away from us
704-
assert_not_predicate @pool, :active_connection?
705706

706-
# make a new connection for with_connection to clean up
707-
@pool.connection
708-
end
707+
@pool.with_connection do |connection|
708+
new_thread { @pool.disconnect! }.join
709+
# assert connection has been forcefully taken away from us
710+
assert_not_predicate @pool, :active_connection?
711+
712+
# make a new connection for with_connection to clean up
713+
@pool.connection
714+
end
715+
@pool.release_connection
716+
717+
@pool.with_connection do |connection|
718+
new_thread { @pool.clear_reloadable_connections! }.join
719+
# assert connection has been forcefully taken away from us
720+
assert_not_predicate @pool, :active_connection?
721+
722+
# make a new connection for with_connection to clean up
723+
@pool.connection
709724
end
710725
end
711726

@@ -920,6 +935,10 @@ def test_unpin_connection_returns_whether_transaction_has_been_rolledback
920935
end
921936

922937
private
938+
def active_connections(pool)
939+
pool.connections.find_all(&:in_use?)
940+
end
941+
923942
def with_single_connection_pool
924943
config = @db_config.configuration_hash.merge(pool: 1)
925944
db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new("arunit", "primary", config)

activerecord/test/cases/pooled_connections_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def test_pooled_connection_remove
3636
old_connection = ActiveRecord::Base.connection
3737
extra_connection = ActiveRecord::Base.connection_pool.checkout
3838
ActiveRecord::Base.connection_pool.remove(extra_connection)
39-
assert_equal ActiveRecord::Base.connection, old_connection
39+
assert_equal ActiveRecord::Base.connection.object_id, old_connection.object_id
4040
end
4141

4242
private

0 commit comments

Comments
 (0)