Skip to content

Commit dc522a3

Browse files
committed
Fix active record instrumentation not thread safe:
- When using Active Record async feature, the instrumentation was not thread safe. ```ruby Post.count_async ActiveSupport::Notifications.subscribed(->(*) { }, "active_record.sql") do Post.count end ``` With the right race condition, the subscription would never fire up. This is due to temporarly overriding the instrumentation instance variable on the connection.
1 parent 47f9165 commit dc522a3

File tree

4 files changed

+60
-18
lines changed

4 files changed

+60
-18
lines changed

activerecord/CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
* Fix SQL notifications sometimes not sent when using async queries.
2+
3+
```ruby
4+
Post.async_count
5+
ActiveSupport::Notifications.subscribed(->(*) { "Will never reach here" }) do
6+
Post.count
7+
end
8+
```
9+
10+
In rare circumstances and under the right race condition, Active Support notifications
11+
would no longer be dispatched after using an asynchronous query.
12+
This is now fixed.
13+
14+
*Edouard Chin*
15+
116
* Eliminate queries loading dumped schema cache on Postgres
217

318
Improve resiliency by avoiding needing to open a database connection to load the

activerecord/lib/active_record/connection_adapters/abstract_adapter.rb

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca
150150
end
151151

152152
@owner = nil
153-
@instrumenter = ActiveSupport::Notifications.instrumenter
154153
@pool = ActiveRecord::ConnectionAdapters::NullPool.new
155154
@idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
156155
@visitor = arel_visitor
@@ -191,19 +190,6 @@ def lock_thread=(lock_thread) # :nodoc:
191190
end
192191
end
193192

194-
EXCEPTION_NEVER = { Exception => :never }.freeze # :nodoc:
195-
EXCEPTION_IMMEDIATE = { Exception => :immediate }.freeze # :nodoc:
196-
private_constant :EXCEPTION_NEVER, :EXCEPTION_IMMEDIATE
197-
def with_instrumenter(instrumenter, &block) # :nodoc:
198-
Thread.handle_interrupt(EXCEPTION_NEVER) do
199-
previous_instrumenter = @instrumenter
200-
@instrumenter = instrumenter
201-
Thread.handle_interrupt(EXCEPTION_IMMEDIATE, &block)
202-
ensure
203-
@instrumenter = previous_instrumenter
204-
end
205-
end
206-
207193
def check_if_write_query(sql) # :nodoc:
208194
if preventing_writes? && write_query?(sql)
209195
raise ActiveRecord::ReadOnlyError, "Write query attempted while in readonly mode: #{sql}"
@@ -1146,7 +1132,7 @@ def translate_exception_class(native_error, sql, binds)
11461132
end
11471133

11481134
def log(sql, name = "SQL", binds = [], type_casted_binds = [], async: false, &block) # :doc:
1149-
@instrumenter.instrument(
1135+
instrumenter.instrument(
11501136
"sql.active_record",
11511137
sql: sql,
11521138
name: name,
@@ -1163,6 +1149,10 @@ def log(sql, name = "SQL", binds = [], type_casted_binds = [], async: false, &bl
11631149
raise ex.set_query(sql, binds)
11641150
end
11651151

1152+
def instrumenter # :nodoc:
1153+
ActiveSupport::IsolatedExecutionState[:active_record_instrumenter] ||= ActiveSupport::Notifications.instrumenter
1154+
end
1155+
11661156
def translate_exception(exception, message:, sql:, binds:)
11671157
# override in derived class
11681158
case exception

activerecord/lib/active_record/future_result.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@ def execute_or_skip
108108
begin
109109
if pending?
110110
@event_buffer = EventBuffer.new(self, @instrumenter)
111-
connection.with_instrumenter(@event_buffer) do
112-
execute_query(connection, async: true)
113-
end
111+
ActiveSupport::IsolatedExecutionState[:active_record_instrumenter] = @event_buffer
112+
113+
execute_query(connection, async: true)
114114
end
115115
ensure
116116
@mutex.unlock

activerecord/test/cases/relation/load_async_test.rb

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
require "models/category"
66
require "models/comment"
77
require "models/other_dog"
8+
require "concurrent/atomic/count_down_latch"
89

910
module ActiveRecord
1011
class LoadAsyncTest < ActiveRecord::TestCase
@@ -157,6 +158,42 @@ def test_load_async_from_transaction
157158
assert_equal ["In Transaction"], posts.map(&:title).uniq
158159
end
159160

161+
def test_load_async_instrumentation_is_thread_safe
162+
skip unless ActiveRecord::Base.connection.async_enabled?
163+
164+
begin
165+
latch1 = Concurrent::CountDownLatch.new
166+
latch2 = Concurrent::CountDownLatch.new
167+
168+
old_log = ActiveRecord::Base.connection.method(:log)
169+
ActiveRecord::Base.connection.singleton_class.undef_method(:log)
170+
171+
ActiveRecord::Base.connection.singleton_class.define_method(:log) do |*args, **kwargs, &block|
172+
unless kwargs[:async]
173+
return old_log.call(*args, **kwargs, &block)
174+
end
175+
176+
latch1.count_down
177+
latch2.wait
178+
old_log.call(*args, **kwargs, &block)
179+
end
180+
181+
Post.async_count
182+
latch1.wait
183+
184+
notification_called = false
185+
ActiveSupport::Notifications.subscribed(->(*) { notification_called = true }, "sql.active_record") do
186+
Post.count
187+
end
188+
189+
assert(notification_called)
190+
ensure
191+
latch2.count_down
192+
ActiveRecord::Base.connection.singleton_class.undef_method(:log)
193+
ActiveRecord::Base.connection.singleton_class.define_method(:log, old_log)
194+
end
195+
end
196+
160197
def test_eager_loading_query
161198
expected_records = Post.where(author_id: 1).eager_load(:comments).to_a
162199

0 commit comments

Comments
 (0)