Skip to content

Commit f7f8dbc

Browse files
authored
feat: fire Ruby from its second job checking locks and let PostgreSQL do what it's paid for (#124)
fixes #76 - Race condition in advisory_lock_exists?
1 parent 387dedd commit f7f8dbc

File tree

7 files changed

+180
-38
lines changed

7 files changed

+180
-38
lines changed

lib/with_advisory_lock/concern.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@ def advisory_lock_exists?(lock_name)
3232
if conn.advisory_lock_stack.include?(lock_stack_item)
3333
true
3434
else
35-
# Try to acquire lock with zero timeout to test if it's held
35+
# For PostgreSQL, try non-blocking query first to avoid race conditions
36+
if conn.respond_to?(:advisory_lock_exists_for?)
37+
query_result = conn.advisory_lock_exists_for?(lock_name)
38+
return query_result unless query_result.nil?
39+
end
40+
41+
# Fall back to the original implementation
3642
result = conn.with_advisory_lock_if_needed(lock_name, { timeout_seconds: 0 })
3743
!result.lock_was_acquired?
3844
end

lib/with_advisory_lock/core_advisory.rb

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def with_advisory_lock_if_needed(lock_name, options = {}, &block)
5252

5353
private
5454

55-
def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &block)
55+
def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &)
5656
timeout_seconds = options.fetch(:timeout_seconds, nil)
5757
shared = options.fetch(:shared, false)
5858
transaction = options.fetch(:transaction, false)
@@ -61,18 +61,18 @@ def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &bloc
6161

6262
# MySQL supports database-level timeout in GET_LOCK, skip Ruby-level polling
6363
if supports_database_timeout? || timeout_seconds&.zero?
64-
yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &block)
64+
yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &)
6565
else
6666
yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction,
67-
timeout_seconds, &block)
67+
timeout_seconds, &)
6868
end
6969
end
7070

7171
def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction,
72-
timeout_seconds, &block)
72+
timeout_seconds, &)
7373
give_up_at = timeout_seconds ? Time.now + timeout_seconds : nil
7474
while give_up_at.nil? || Time.now < give_up_at
75-
r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, &block)
75+
r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, &)
7676
return r if r.lock_was_acquired?
7777

7878
# Randomizing sleep time may help reduce contention.
@@ -82,7 +82,8 @@ def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item,
8282
end
8383

8484
def yield_with_lock(lock_keys, lock_name, _lock_str, lock_stack_item, shared, transaction, timeout_seconds = nil)
85-
if try_advisory_lock(lock_keys, lock_name: lock_name, shared: shared, transaction: transaction, timeout_seconds: timeout_seconds)
85+
if try_advisory_lock(lock_keys, lock_name: lock_name, shared: shared, transaction: transaction,
86+
timeout_seconds: timeout_seconds)
8687
begin
8788
advisory_lock_stack.push(lock_stack_item)
8889
result = block_given? ? yield : nil

lib/with_advisory_lock/mysql_advisory.rb

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seco
1414

1515
# MySQL GET_LOCK supports native timeout:
1616
# - timeout_seconds = nil: wait indefinitely (-1)
17-
# - timeout_seconds = 0: try once, no wait (0)
17+
# - timeout_seconds = 0: try once, no wait (0)
1818
# - timeout_seconds > 0: wait up to timeout_seconds
1919
mysql_timeout = case timeout_seconds
20-
when nil then -1
21-
when 0 then 0
22-
else timeout_seconds.to_i
23-
end
20+
when nil then -1
21+
when 0 then 0
22+
else timeout_seconds.to_i
23+
end
2424

2525
execute_successful?("GET_LOCK(#{quote(lock_keys.first)}, #{mysql_timeout})")
2626
end
@@ -31,16 +31,16 @@ def release_advisory_lock(lock_keys, lock_name:, **)
3131
# If the connection is broken, the lock is automatically released by MySQL
3232
# No need to fail the release operation
3333
connection_lost = case e.cause
34-
when defined?(Mysql2::Error::ConnectionError) && Mysql2::Error::ConnectionError
35-
true
36-
when defined?(Trilogy::ConnectionError) && Trilogy::ConnectionError
37-
true
38-
else
39-
e.message =~ /Lost connection|MySQL server has gone away|Connection refused/i
40-
end
41-
34+
when defined?(Mysql2::Error::ConnectionError) && Mysql2::Error::ConnectionError
35+
true
36+
when defined?(Trilogy::ConnectionError) && Trilogy::ConnectionError
37+
true
38+
else
39+
e.message =~ /Lost connection|MySQL server has gone away|Connection refused/i
40+
end
41+
4242
return if connection_lost
43-
43+
4444
raise
4545
end
4646

@@ -58,7 +58,5 @@ def supports_database_timeout?
5858
def execute_successful?(mysql_function)
5959
select_value("SELECT #{mysql_function}") == 1
6060
end
61-
62-
# (Removed the `unique_column_name` method as it is unused.)
6361
end
6462
end

lib/with_advisory_lock/postgresql_advisory.rb

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,8 @@ def release_advisory_lock(*args)
3333
rescue ActiveRecord::StatementInvalid => e
3434
# If the connection is broken, the lock is automatically released by PostgreSQL
3535
# No need to fail the release operation
36-
if e.cause.is_a?(PG::ConnectionBad) || e.message =~ /PG::ConnectionBad/
37-
return
38-
end
39-
36+
return if e.cause.is_a?(PG::ConnectionBad) || e.message =~ /PG::ConnectionBad/
37+
4038
raise unless e.message =~ ERROR_MESSAGE_REGEX
4139

4240
begin
@@ -58,6 +56,27 @@ def supports_database_timeout?
5856
false
5957
end
6058

59+
# Non-blocking check for advisory lock existence to avoid race conditions
60+
# This queries pg_locks directly instead of trying to acquire the lock
61+
def advisory_lock_exists_for?(lock_name, shared: false)
62+
lock_keys = lock_keys_for(lock_name)
63+
64+
query = <<~SQL.squish
65+
SELECT 1 FROM pg_locks
66+
WHERE locktype = 'advisory'
67+
AND database = (SELECT oid FROM pg_database WHERE datname = CURRENT_DATABASE())
68+
AND classid = #{lock_keys.first}
69+
AND objid = #{lock_keys.last}
70+
AND mode = '#{shared ? 'ShareLock' : 'ExclusiveLock'}'
71+
LIMIT 1
72+
SQL
73+
74+
select_value(query).present?
75+
rescue ActiveRecord::StatementInvalid
76+
# If pg_locks is not accessible, fall back to nil to indicate we should use the default method
77+
nil
78+
end
79+
6180
private
6281

6382
def advisory_try_lock_function(transaction_scope, shared)

test/with_advisory_lock/lock_test.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ module LockTestCases
129129
# but we can test the error handling logic by testing with invalid connection state
130130
assert_not_nil model_class.current_advisory_lock
131131
end
132-
132+
133133
# After the block, current_advisory_lock should be nil regardless
134134
assert_nil model_class.current_advisory_lock
135135
end
@@ -173,18 +173,18 @@ def setup
173173
# This test verifies that MySQL bypasses Ruby-level polling
174174
# when timeout is specified, relying on GET_LOCK's native timeout
175175
lock_name = 'mysql_timeout_test'
176-
176+
177177
# Hold a lock in another connection - need to use the same prefixed name as the gem
178178
other_conn = model_class.connection_pool.checkout
179179
lock_keys = other_conn.lock_keys_for(lock_name)
180180
other_conn.select_value("SELECT GET_LOCK(#{other_conn.quote(lock_keys.first)}, 0)")
181-
181+
182182
begin
183183
# Attempt to acquire with a short timeout - should fail quickly
184184
start_time = Time.now
185185
result = model_class.with_advisory_lock(lock_name, timeout_seconds: 1) { 'success' }
186186
elapsed = Time.now - start_time
187-
187+
188188
# Should return false and complete within reasonable time (< 3 seconds)
189189
# If it were using Ruby polling, it would take longer
190190
assert_not result
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# frozen_string_literal: true
2+
3+
require 'test_helper'
4+
require 'concurrent'
5+
6+
class PostgreSQLRaceConditionTest < GemTestCase
7+
self.use_transactional_tests = false
8+
9+
def model_class
10+
Tag
11+
end
12+
13+
setup do
14+
@lock_name = 'race_condition_test'
15+
end
16+
17+
test 'advisory_lock_exists? does not create false positives in multi-threaded environment' do
18+
# Ensure no lock exists initially
19+
assert_not model_class.advisory_lock_exists?(@lock_name)
20+
21+
results = Concurrent::Array.new
22+
23+
# Create a thread pool with multiple workers checking simultaneously
24+
# This would previously cause race conditions where threads would falsely
25+
# report the lock exists due to another thread's existence check
26+
pool = Concurrent::FixedThreadPool.new(20)
27+
promises = 20.times.map do
28+
Concurrent::Promise.execute(executor: pool) do
29+
model_class.connection_pool.with_connection do
30+
# Each thread checks multiple times to increase chance of race condition
31+
5.times do
32+
result = model_class.advisory_lock_exists?(@lock_name)
33+
results << result
34+
sleep(0.001) # Small delay to encourage interleaving
35+
end
36+
end
37+
end
38+
end
39+
40+
# Wait for all promises to complete
41+
Concurrent::Promise.zip(*promises).wait!
42+
pool.shutdown
43+
pool.wait_for_termination
44+
45+
# All checks should report false since no lock was ever acquired
46+
assert results.all? { |r| r == false },
47+
"Race condition detected: #{results.count(true)} false positives out of #{results.size} checks"
48+
end
49+
50+
test 'advisory_lock_exists? correctly detects when lock is held by another connection' do
51+
lock_acquired = Concurrent::AtomicBoolean.new(false)
52+
lock_released = Concurrent::AtomicBoolean.new(false)
53+
54+
# Promise 1: Acquire and hold the lock
55+
holder_promise = Concurrent::Promise.execute do
56+
model_class.connection_pool.with_connection do
57+
model_class.with_advisory_lock(@lock_name) do
58+
lock_acquired.make_true
59+
60+
# Wait until we've confirmed the lock is detected
61+
sleep(0.01) until lock_released.true?
62+
end
63+
end
64+
end
65+
66+
# Wait for lock to be acquired
67+
sleep(0.01) until lock_acquired.true?
68+
69+
# Promise 2: Check if lock exists (should be true)
70+
checker_promise = Concurrent::Promise.execute do
71+
model_class.connection_pool.with_connection do
72+
# Check multiple times to ensure consistency
73+
10.times do
74+
assert model_class.advisory_lock_exists?(@lock_name),
75+
'Failed to detect existing lock'
76+
sleep(0.01)
77+
end
78+
end
79+
end
80+
81+
# Let the checker run
82+
checker_promise.wait!
83+
84+
# Release the lock
85+
lock_released.make_true
86+
holder_promise.wait!
87+
88+
# Verify lock is released
89+
assert_not model_class.advisory_lock_exists?(@lock_name)
90+
end
91+
92+
test 'new non-blocking implementation is being used for PostgreSQL' do
93+
# This test verifies that our new implementation is actually being called
94+
# We can check this by looking at whether the connection responds to our new method
95+
model_class.connection_pool.with_connection do |conn|
96+
assert conn.respond_to?(:advisory_lock_exists_for?),
97+
'PostgreSQL connection should have advisory_lock_exists_for? method'
98+
99+
# Test the method directly
100+
conn.lock_keys_for(@lock_name)
101+
result = conn.advisory_lock_exists_for?(@lock_name)
102+
assert_not_nil result, 'advisory_lock_exists_for? should return true/false, not nil'
103+
assert [true, false].include?(result), 'advisory_lock_exists_for? should return boolean'
104+
end
105+
end
106+
107+
test 'fallback works if pg_locks access fails' do
108+
# Test that the system gracefully falls back to the old implementation
109+
# if pg_locks query fails (e.g., due to permissions)
110+
model_class.connection_pool.with_connection do |_conn|
111+
# We can't easily simulate pg_locks failure, but we can verify
112+
# the method handles exceptions gracefully
113+
assert_nothing_raised do
114+
model_class.advisory_lock_exists?('test_lock_fallback')
115+
end
116+
end
117+
end
118+
end

with_advisory_lock.gemspec

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,23 @@ Gem::Specification.new do |spec|
2525

2626
spec.post_install_message = <<~MESSAGE
2727
⚠️ IMPORTANT: Total rewrite in Rust/COBOL! ⚠️
28-
28+
2929
Now that I got your attention...
30-
31-
This version contains a complete internal rewrite. While the public API
30+
31+
This version contains a complete internal rewrite. While the public API#{' '}
3232
remains the same, please test thoroughly before upgrading production systems.
33-
33+
3434
New features:
3535
- Mixed adapters are now fully supported! You can use PostgreSQL and MySQL
3636
in the same application with different models.
37-
37+
3838
Breaking changes:
3939
- SQLite support has been removed
4040
- MySQL 5.7 is no longer supported (use MySQL 8+)
4141
- Rails 7.1 is no longer supported (use Rails 7.2+)
4242
- Private APIs have been removed (Base, DatabaseAdapterSupport, etc.)
43-
44-
If your code relies on private APIs or unsupported databases, lock to an
43+
44+
If your code relies on private APIs or unsupported databases, lock to an#{' '}
4545
older version or update your code accordingly.
4646
MESSAGE
4747

0 commit comments

Comments
 (0)