Skip to content

Commit 2b0ae16

Browse files
authored
Merge pull request rails#52806 from Shopify/transactional-async-queries
Allow async queries in transactional fixtures
2 parents ded0cfe + 6276488 commit 2b0ae16

File tree

8 files changed

+65
-59
lines changed

8 files changed

+65
-59
lines changed

activerecord/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
* Make Active Record asynchronous queries compatible with transactional fixtures.
2+
3+
Previously transactional fixtures would disable asynchronous queries, because transactional
4+
fixtures impose all queries use the same connection.
5+
6+
Now asynchronous queries will use the connection pinned by transactional fixtures, and behave
7+
much closer to production.
8+
9+
*Jean Boussier*
10+
111
* Deserialize binary data before decrypting
212

313
This ensures that we call `PG::Connection.unescape_bytea` on PostgreSQL before decryption.
Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,30 @@
11
# frozen_string_literal: true
22

3+
require "concurrent/atomic/atomic_boolean"
4+
require "concurrent/atomic/read_write_lock"
5+
36
module ActiveRecord
47
class AsynchronousQueriesTracker # :nodoc:
5-
module NullSession # :nodoc:
6-
class << self
7-
def active?
8-
true
9-
end
10-
11-
def finalize
12-
end
13-
end
14-
end
15-
168
class Session # :nodoc:
179
def initialize
18-
@active = true
10+
@active = Concurrent::AtomicBoolean.new(true)
11+
@lock = Concurrent::ReadWriteLock.new
1912
end
2013

2114
def active?
22-
@active
15+
@active.true?
2316
end
2417

25-
def finalize
26-
@active = false
18+
def synchronize(&block)
19+
@lock.with_read_lock(&block)
20+
end
21+
22+
def finalize(wait = false)
23+
@active.make_false
24+
if wait
25+
# Wait until all thread with a read lock are done
26+
@lock.with_write_lock { }
27+
end
2728
end
2829
end
2930

@@ -33,28 +34,31 @@ def install_executor_hooks(executor = ActiveSupport::Executor)
3334
end
3435

3536
def run
36-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
37+
ActiveRecord::Base.asynchronous_queries_tracker.tap(&:start_session)
3738
end
3839

3940
def complete(asynchronous_queries_tracker)
4041
asynchronous_queries_tracker.finalize_session
4142
end
4243
end
4344

44-
attr_reader :current_session
45-
4645
def initialize
47-
@current_session = NullSession
46+
@stack = []
47+
end
48+
49+
def current_session
50+
@stack.last or raise ActiveRecordError, "Can't perform asynchronous queries without a query session"
4851
end
4952

5053
def start_session
51-
@current_session = Session.new
52-
self
54+
session = Session.new
55+
@stack << session
5356
end
5457

55-
def finalize_session
56-
@current_session.finalize
57-
@current_session = NullSession
58+
def finalize_session(wait = false)
59+
session = @stack.pop
60+
session&.finalize(wait)
61+
self
5862
end
5963
end
6064
end

activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ def select(sql, name = nil, binds = [], prepare: false, async: false, allow_retr
683683
binds,
684684
prepare: prepare,
685685
)
686-
if supports_concurrent_connections? && current_transaction.closed?
686+
if supports_concurrent_connections? && !current_transaction.joinable?
687687
future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session)
688688
else
689689
future_result.execute!(self)

activerecord/lib/active_record/future_result.rb

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,21 @@ def cancel
100100
def execute_or_skip
101101
return unless pending?
102102

103-
@pool.with_connection do |connection|
104-
return unless @mutex.try_lock
105-
begin
106-
if pending?
107-
@event_buffer = EventBuffer.new(self, @instrumenter)
108-
connection.with_instrumenter(@event_buffer) do
109-
execute_query(connection, async: true)
103+
@session.synchronize do
104+
return unless pending?
105+
106+
@pool.with_connection do |connection|
107+
return unless @mutex.try_lock
108+
begin
109+
if pending?
110+
@event_buffer = EventBuffer.new(self, @instrumenter)
111+
connection.with_instrumenter(@event_buffer) do
112+
execute_query(connection, async: true)
113+
end
110114
end
115+
ensure
116+
@mutex.unlock
111117
end
112-
ensure
113-
@mutex.unlock
114118
end
115119
end
116120
end

activerecord/lib/active_record/relation.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,9 +1124,6 @@ def delete_by(*args)
11241124
# for queries to actually be executed concurrently. Otherwise it defaults to
11251125
# executing them in the foreground.
11261126
#
1127-
# +load_async+ will also fall back to executing in the foreground in the test environment when transactional
1128-
# fixtures are enabled.
1129-
#
11301127
# If the query was actually executed in the background, the Active Record logs will show
11311128
# it by prefixing the log line with <tt>ASYNC</tt>:
11321129
#
@@ -1136,7 +1133,7 @@ def load_async
11361133
return load if !c.async_enabled?
11371134

11381135
unless loaded?
1139-
result = exec_main_query(async: c.current_transaction.closed?)
1136+
result = exec_main_query(async: !c.current_transaction.joinable?)
11401137

11411138
if result.is_a?(Array)
11421139
@records = result

activerecord/lib/active_record/test_fixtures.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,15 @@ def setup_fixtures(config = ActiveRecord::Base)
137137
invalidate_already_loaded_fixtures
138138
@loaded_fixtures = load_fixtures(config)
139139
end
140+
setup_asynchronous_queries_session
140141

141142
# Instantiate fixtures for every test if requested.
142143
instantiate_fixtures if use_instantiated_fixtures
143144
end
144145

145146
def teardown_fixtures
147+
teardown_asynchronous_queries_session
148+
146149
# Rollback changes if a transaction is active.
147150
if run_in_transaction?
148151
teardown_transactional_fixtures
@@ -154,6 +157,14 @@ def teardown_fixtures
154157
ActiveRecord::Base.connection_handler.clear_active_connections!(:all)
155158
end
156159

160+
def setup_asynchronous_queries_session
161+
@_async_queries_session = ActiveRecord::Base.asynchronous_queries_tracker.start_session
162+
end
163+
164+
def teardown_asynchronous_queries_session
165+
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session(true) if @_async_queries_session
166+
end
167+
157168
def invalidate_already_loaded_fixtures
158169
@@already_loaded_fixtures.clear
159170
end
@@ -190,6 +201,7 @@ def setup_transactional_fixtures
190201

191202
def teardown_transactional_fixtures
192203
ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber
204+
193205
unless @fixture_connection_pools.map(&:unpin_connection!).all?
194206
# Something caused the transaction to be committed or rolled back
195207
# We can no longer trust the database is in a clean state.

activerecord/test/cases/asynchronous_queries_test.rb

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
module AsynchronousQueriesSharedTests
88
def test_async_select_failure
9-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
10-
119
if in_memory_db?
1210
assert_raises ActiveRecord::StatementInvalid do
1311
@connection.select_all "SELECT * FROM does_not_exists", async: true
@@ -19,13 +17,9 @@ def test_async_select_failure
1917
future_result.result
2018
end
2119
end
22-
ensure
23-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
2420
end
2521

2622
def test_async_query_from_transaction
27-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
28-
2923
assert_nothing_raised do
3024
@connection.select_all "SELECT * FROM posts", async: true
3125
end
@@ -37,20 +31,15 @@ def test_async_query_from_transaction
3731
end
3832
end
3933
end
40-
ensure
41-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
4234
end
4335

4436
def test_async_query_cache
45-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
46-
4737
@connection.enable_query_cache!
4838

4939
@connection.select_all "SELECT * FROM posts"
5040
result = @connection.select_all "SELECT * FROM posts", async: true
5141
assert_equal ActiveRecord::FutureResult::Complete, result.class
5242
ensure
53-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
5443
@connection.disable_query_cache!
5544
end
5645

@@ -103,7 +92,6 @@ def setup
10392
end
10493

10594
def test_async_select_all
106-
ActiveRecord::Base.asynchronous_queries_tracker.start_session
10795
status = {}
10896

10997
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
@@ -125,7 +113,6 @@ def test_async_select_all
125113
assert_kind_of ActiveRecord::Result, future_result.result
126114
assert_equal @connection.supports_concurrent_connections?, status[:async]
127115
ensure
128-
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
129116
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
130117
end
131118
end

activerecord/test/cases/relation/load_async_test.rb

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ module ActiveRecord
1010
class LoadAsyncTest < ActiveRecord::TestCase
1111
include WaitForAsyncTestHelper
1212

13-
self.use_transactional_tests = false
14-
1513
fixtures :posts, :comments, :categories, :categories_posts
1614

1715
def test_scheduled?
@@ -240,8 +238,6 @@ def test_load_async_count_with_query_cache
240238

241239
class LoadAsyncNullExecutorTest < ActiveRecord::TestCase
242240
unless in_memory_db?
243-
self.use_transactional_tests = false
244-
245241
fixtures :posts, :comments
246242

247243
def setup
@@ -364,8 +360,6 @@ class LoadAsyncMultiThreadPoolExecutorTest < ActiveRecord::TestCase
364360
unless in_memory_db?
365361
include WaitForAsyncTestHelper
366362

367-
self.use_transactional_tests = false
368-
369363
fixtures :posts, :comments
370364

371365
def setup
@@ -505,8 +499,6 @@ class LoadAsyncMixedThreadPoolExecutorTest < ActiveRecord::TestCase
505499
unless in_memory_db?
506500
include WaitForAsyncTestHelper
507501

508-
self.use_transactional_tests = false
509-
510502
fixtures :posts, :comments, :other_dogs
511503

512504
def setup

0 commit comments

Comments
 (0)