Skip to content

Commit 6276488

Browse files
committed
Allow async queries in transactional fixtures
This is possible since rails#50999. Now in transactional fixtures, the other threads do checkout the same connection as the same thread, it's just synchronized around performing the query. This allow to properly test async queries without having to disable transactional fixtures.
1 parent 734b7f4 commit 6276488

File tree

8 files changed

+65
-59
lines changed

8 files changed

+65
-59
lines changed

activerecord/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
* Make Active Record asynchronous queries compatible with transactional fixtures.
2+
3+
Previously transactional fixtures would disable asynchronous queries, because transactional
4+
fixtures impose all queries use the same connection.
5+
6+
Now asynchronous queries will use the connection pinned by transactional fixtures, and behave
7+
much closer to production.
8+
9+
*Jean Boussier*
10+
111
* Deserialize binary data before decrypting
212

313
This ensures that we call `PG::Connection.unescape_bytea` on PostgreSQL before decryption.
Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,30 @@
11
# frozen_string_literal: true
22

3+
require "concurrent/atomic/atomic_boolean"
4+
require "concurrent/atomic/read_write_lock"
5+
36
module ActiveRecord
47
class AsynchronousQueriesTracker # :nodoc:
5-
module NullSession # :nodoc:
6-
class << self
7-
def active?
8-
true
9-
end
10-
11-
def finalize
12-
end
13-
end
14-
end
15-
168
class Session # :nodoc:
179
def initialize
18-
@active = true
10+
@active = Concurrent::AtomicBoolean.new(true)
11+
@lock = Concurrent::ReadWriteLock.new
1912
end
2013

2114
def active?
22-
@active
15+
@active.true?
2316
end
2417

25-
def finalize
26-
@active = false
18+
def synchronize(&block)
19+
@lock.with_read_lock(&block)
20+
end
21+
22+
def finalize(wait = false)
23+
@active.make_false
24+
if wait
25+
# Wait until all thread with a read lock are done
26+
@lock.with_write_lock { }
27+
end
2728
end
2829
end
2930

@@ -33,28 +34,31 @@ def install_executor_hooks(executor = ActiveSupport::Executor)
3334
end
3435

3536
def run
36-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
37+
ActiveRecord::Base.asynchronous_queries_tracker.tap(&:start_session)
3738
end
3839

3940
def complete(asynchronous_queries_tracker)
4041
asynchronous_queries_tracker.finalize_session
4142
end
4243
end
4344

44-
attr_reader :current_session
45-
4645
def initialize
47-
@current_session = NullSession
46+
@stack = []
47+
end
48+
49+
def current_session
50+
@stack.last or raise ActiveRecordError, "Can't perform asynchronous queries without a query session"
4851
end
4952

5053
def start_session
51-
@current_session = Session.new
52-
self
54+
session = Session.new
55+
@stack << session
5356
end
5457

55-
def finalize_session
56-
@current_session.finalize
57-
@current_session = NullSession
58+
def finalize_session(wait = false)
59+
session = @stack.pop
60+
session&.finalize(wait)
61+
self
5862
end
5963
end
6064
end

activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ def select(sql, name = nil, binds = [], prepare: false, async: false, allow_retr
683683
binds,
684684
prepare: prepare,
685685
)
686-
if supports_concurrent_connections? && current_transaction.closed?
686+
if supports_concurrent_connections? && !current_transaction.joinable?
687687
future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session)
688688
else
689689
future_result.execute!(self)

activerecord/lib/active_record/future_result.rb

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,21 @@ def cancel
100100
def execute_or_skip
101101
return unless pending?
102102

103-
@pool.with_connection do |connection|
104-
return unless @mutex.try_lock
105-
begin
106-
if pending?
107-
@event_buffer = EventBuffer.new(self, @instrumenter)
108-
connection.with_instrumenter(@event_buffer) do
109-
execute_query(connection, async: true)
103+
@session.synchronize do
104+
return unless pending?
105+
106+
@pool.with_connection do |connection|
107+
return unless @mutex.try_lock
108+
begin
109+
if pending?
110+
@event_buffer = EventBuffer.new(self, @instrumenter)
111+
connection.with_instrumenter(@event_buffer) do
112+
execute_query(connection, async: true)
113+
end
110114
end
115+
ensure
116+
@mutex.unlock
111117
end
112-
ensure
113-
@mutex.unlock
114118
end
115119
end
116120
end

activerecord/lib/active_record/relation.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,9 +1124,6 @@ def delete_by(*args)
11241124
# for queries to actually be executed concurrently. Otherwise it defaults to
11251125
# executing them in the foreground.
11261126
#
1127-
# +load_async+ will also fall back to executing in the foreground in the test environment when transactional
1128-
# fixtures are enabled.
1129-
#
11301127
# If the query was actually executed in the background, the Active Record logs will show
11311128
# it by prefixing the log line with <tt>ASYNC</tt>:
11321129
#
@@ -1136,7 +1133,7 @@ def load_async
11361133
return load if !c.async_enabled?
11371134

11381135
unless loaded?
1139-
result = exec_main_query(async: c.current_transaction.closed?)
1136+
result = exec_main_query(async: !c.current_transaction.joinable?)
11401137

11411138
if result.is_a?(Array)
11421139
@records = result

activerecord/lib/active_record/test_fixtures.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,15 @@ def setup_fixtures(config = ActiveRecord::Base)
137137
invalidate_already_loaded_fixtures
138138
@loaded_fixtures = load_fixtures(config)
139139
end
140+
setup_asynchronous_queries_session
140141

141142
# Instantiate fixtures for every test if requested.
142143
instantiate_fixtures if use_instantiated_fixtures
143144
end
144145

145146
def teardown_fixtures
147+
teardown_asynchronous_queries_session
148+
146149
# Rollback changes if a transaction is active.
147150
if run_in_transaction?
148151
teardown_transactional_fixtures
@@ -154,6 +157,14 @@ def teardown_fixtures
154157
ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
155158
end
156159

160+
def setup_asynchronous_queries_session
161+
@_async_queries_session = ActiveRecord::Base.asynchronous_queries_tracker.start_session
162+
end
163+
164+
def teardown_asynchronous_queries_session
165+
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session(true) if @_async_queries_session
166+
end
167+
157168
def invalidate_already_loaded_fixtures
158169
@@already_loaded_fixtures.clear
159170
end
@@ -190,6 +201,7 @@ def setup_transactional_fixtures
190201

191202
def teardown_transactional_fixtures
192203
ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber
204+
193205
unless @fixture_connection_pools.map(&:unpin_connection!).all?
194206
# Something caused the transaction to be committed or rolled back
195207
# We can no longer trust the database is in a clean state.

activerecord/test/cases/asynchronous_queries_test.rb

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
module AsynchronousQueriesSharedTests
88
def test_async_select_failure
9-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
10-
119
if in_memory_db?
1210
assert_raises ActiveRecord::StatementInvalid do
1311
@connection.select_all "SELECT * FROM does_not_exists", async: true
@@ -19,13 +17,9 @@ def test_async_select_failure
1917
future_result.result
2018
end
2119
end
22-
ensure
23-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
2420
end
2521

2622
def test_async_query_from_transaction
27-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
28-
2923
assert_nothing_raised do
3024
@connection.select_all "SELECT * FROM posts", async: true
3125
end
@@ -37,20 +31,15 @@ def test_async_query_from_transaction
3731
end
3832
end
3933
end
40-
ensure
41-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
4234
end
4335

4436
def test_async_query_cache
45-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
46-
4737
@connection.enable_query_cache!
4838

4939
@connection.select_all "SELECT * FROM posts"
5040
result = @connection.select_all "SELECT * FROM posts", async: true
5141
assert_equal ActiveRecord::FutureResult::Complete, result.class
5242
ensure
53-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
5443
@connection.disable_query_cache!
5544
end
5645

@@ -103,7 +92,6 @@ def setup
10392
end
10493

10594
def test_async_select_all
106-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
10795
status = {}
10896

10997
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
@@ -125,7 +113,6 @@ def test_async_select_all
125113
assert_kind_of ActiveRecord::Result, future_result.result
126114
assert_equal @connection.supports_concurrent_connections?, status[:async]
127115
ensure
128-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
129116
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
130117
end
131118
end

activerecord/test/cases/relation/load_async_test.rb

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ module ActiveRecord
1010
class LoadAsyncTest < ActiveRecord::TestCase
1111
include WaitForAsyncTestHelper
1212

13-
self.use_transactional_tests = false
14-
1513
fixtures :posts, :comments, :categories, :categories_posts
1614

1715
def test_scheduled?
@@ -240,8 +238,6 @@ def test_load_async_count_with_query_cache
240238

241239
class LoadAsyncNullExecutorTest < ActiveRecord::TestCase
242240
unless in_memory_db?
243-
self.use_transactional_tests = false
244-
245241
fixtures :posts, :comments
246242

247243
def setup
@@ -364,8 +360,6 @@ class LoadAsyncMultiThreadPoolExecutorTest < ActiveRecord::TestCase
364360
unless in_memory_db?
365361
include WaitForAsyncTestHelper
366362

367-
self.use_transactional_tests = false
368-
369363
fixtures :posts, :comments
370364

371365
def setup
@@ -505,8 +499,6 @@ class LoadAsyncMixedThreadPoolExecutorTest < ActiveRecord::TestCase
505499
unless in_memory_db?
506500
include WaitForAsyncTestHelper
507501

508-
self.use_transactional_tests = false
509-
510502
fixtures :posts, :comments, :other_dogs
511503

512504
def setup

0 commit comments

Comments
 (0)