Skip to content

Commit f7f9aff

Browse files
authored
feat: add blocking advisory locks with deadlock detection for PostgreSQL (#140)
1 parent 929e010 commit f7f9aff

File tree

8 files changed

+254
-27
lines changed

8 files changed

+254
-27
lines changed

.tool-versions

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ruby 4.0.0
1+
ruby 4.0.1

README.md

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ will be yielded to. If the lock is currently being held, the block will not be
4848
called.
4949

5050
> **Note**
51-
>
51+
>
5252
> If a non-nil value is provided for `timeout_seconds`, the block will
5353
*not* be invoked if the lock cannot be acquired within that time-frame. In this case, `with_advisory_lock` will return `false`, while `with_advisory_lock!` will raise a `WithAdvisoryLock::FailedToAcquireLock` error.
5454

@@ -72,6 +72,32 @@ to `true`.
7272
Note: transaction-level locks will not be reflected by `.current_advisory_lock`
7373
when the block has returned.
7474

75+
### Blocking locks (PostgreSQL only)
76+
77+
By default, PostgreSQL advisory locks use a polling strategy with Ruby-level
78+
retries and sleeps. Setting `blocking: true` switches to database-level blocking
79+
locks that enable PostgreSQL's deadlock detection:
80+
81+
```ruby
82+
User.with_advisory_lock("lock_name", blocking: true, transaction: true) do
83+
# PostgreSQL will detect circular lock waits and raise an error
84+
# instead of sleeping forever
85+
end
86+
```
87+
88+
**Benefits:**
89+
- **Deadlock detection**: PostgreSQL detects circular waits and raises `PG::TRDeadlockDetected` after ~1 second (configurable via `deadlock_timeout`)
90+
- **No polling overhead**: The database handles the wait queue instead of Ruby sleep/retry loops
91+
- **Clean failure**: Returns `false` on deadlock instead of infinite retries
92+
93+
**When to use:**
94+
- When acquiring multiple locks in your application (risk of deadlock)
95+
- When you need PostgreSQL to detect and break circular lock dependencies
96+
- When you want to avoid Ruby-level polling overhead
97+
98+
**Note:** MySQL ignores this option since `GET_LOCK` already provides native
99+
timeout and deadlock detection via the MDL subsystem.
100+
75101
### Return values
76102

77103
The return value of `with_advisory_lock_result` is a `WithAdvisoryLock::Result`
@@ -84,7 +110,7 @@ block, if the lock was able to be acquired and the block yielded, or `false`, if
84110
you provided a timeout_seconds value and the lock was not able to be acquired in
85111
time.
86112

87-
`with_advisory_lock!` is similar to `with_advisory_lock`, but raises a `WithAdvisoryLock::FailedToAcquireLock` error if the lock was not able to be acquired in time.
113+
`with_advisory_lock!` is similar to `with_advisory_lock`, but raises a `WithAdvisoryLock::FailedToAcquireLock` error if the lock was not able to be acquired in time.
88114

89115
### Testing for the current lock status
90116

lib/with_advisory_lock/core_advisory.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def advisory_lock_stack
1515

1616
def with_advisory_lock_if_needed(lock_name, options = {}, &block)
1717
options = { timeout_seconds: options } unless options.respond_to?(:fetch)
18-
options.assert_valid_keys :timeout_seconds, :shared, :transaction, :disable_query_cache
18+
options.assert_valid_keys :timeout_seconds, :shared, :transaction, :disable_query_cache, :blocking
1919

2020
# Validate transaction-level locks are used within a transaction
2121
if options.fetch(:transaction, false) && !transaction_open?
@@ -56,12 +56,14 @@ def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &)
5656
timeout_seconds = options.fetch(:timeout_seconds, nil)
5757
shared = options.fetch(:shared, false)
5858
transaction = options.fetch(:transaction, false)
59+
blocking = options.fetch(:blocking, false)
5960

6061
lock_keys = lock_keys_for(lock_name)
6162

6263
# MySQL supports database-level timeout in GET_LOCK, skip Ruby-level polling
63-
if supports_database_timeout? || timeout_seconds&.zero?
64-
yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &)
64+
# PostgreSQL blocking locks also skip polling and let the database handle waiting
65+
if supports_database_timeout? || timeout_seconds&.zero? || blocking
66+
yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, blocking, &)
6567
else
6668
yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction,
6769
timeout_seconds, &)
@@ -72,7 +74,7 @@ def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item,
7274
timeout_seconds, &)
7375
give_up_at = timeout_seconds ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout_seconds : nil
7476
while give_up_at.nil? || Process.clock_gettime(Process::CLOCK_MONOTONIC) < give_up_at
75-
r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, &)
77+
r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, false, &)
7678
return r if r.lock_was_acquired?
7779

7880
# Randomizing sleep time may help reduce contention.
@@ -81,9 +83,9 @@ def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item,
8183
Result.new(lock_was_acquired: false)
8284
end
8385

84-
def yield_with_lock(lock_keys, lock_name, _lock_str, lock_stack_item, shared, transaction, timeout_seconds = nil)
86+
def yield_with_lock(lock_keys, lock_name, _lock_str, lock_stack_item, shared, transaction, timeout_seconds = nil, blocking = false)
8587
if try_advisory_lock(lock_keys, lock_name: lock_name, shared: shared, transaction: transaction,
86-
timeout_seconds: timeout_seconds)
88+
timeout_seconds: timeout_seconds, blocking: blocking)
8789
begin
8890
advisory_lock_stack.push(lock_stack_item)
8991
result = block_given? ? yield : nil

lib/with_advisory_lock/mysql_advisory.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ module MySQLAdvisory
88

99
LOCK_PREFIX_ENV = 'WITH_ADVISORY_LOCK_PREFIX'
1010

11-
def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seconds: nil)
11+
def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seconds: nil, blocking: false)
1212
raise ArgumentError, 'shared locks are not supported on MySQL' if shared
1313
raise ArgumentError, 'transaction level locks are not supported on MySQL' if transaction
1414

15+
# Note: blocking parameter is accepted for API compatibility but ignored for MySQL
16+
# MySQL's GET_LOCK already provides native timeout support, making the blocking
17+
# parameter redundant. MySQL doesn't have separate try/blocking functions like PostgreSQL.
18+
1519
# MySQL GET_LOCK supports native timeout:
1620
# - timeout_seconds = nil: wait indefinitely (-1)
1721
# - timeout_seconds = 0: try once, no wait (0)

lib/with_advisory_lock/postgresql_advisory.rb

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,23 @@ module PostgreSQLAdvisory
1010
LOCK_RESULT_VALUES = ['t', true].freeze
1111
ERROR_MESSAGE_REGEX = / ERROR: +current transaction is aborted,/
1212

13-
def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seconds: nil)
13+
def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seconds: nil, blocking: false)
1414
# timeout_seconds is accepted for compatibility but ignored - PostgreSQL doesn't support
1515
# native timeouts with pg_try_advisory_lock, requiring Ruby-level polling instead
16-
function = advisory_try_lock_function(transaction, shared)
17-
execute_advisory(function, lock_keys, lock_name)
16+
function = if blocking
17+
advisory_lock_function(transaction, shared)
18+
else
19+
advisory_try_lock_function(transaction, shared)
20+
end
21+
execute_advisory(function, lock_keys, lock_name, blocking: blocking)
22+
rescue ActiveRecord::StatementInvalid => e
23+
# PostgreSQL deadlock detection raises PG::TRDeadlockDetected (SQLSTATE 40P01)
24+
# When using blocking locks, treat deadlocks as lock acquisition failure
25+
if blocking && (e.cause.is_a?(PG::TRDeadlockDetected) || e.message.include?('deadlock detected'))
26+
false
27+
else
28+
raise
29+
end
1830
end
1931

2032
def release_advisory_lock(*args)
@@ -88,16 +100,32 @@ def advisory_try_lock_function(transaction_scope, shared)
88100
].compact.join
89101
end
90102

103+
def advisory_lock_function(transaction_scope, shared)
104+
[
105+
'pg_advisory',
106+
transaction_scope ? '_xact' : nil,
107+
'_lock',
108+
shared ? '_shared' : nil
109+
].compact.join
110+
end
111+
91112
def advisory_unlock_function(shared)
92113
[
93114
'pg_advisory_unlock',
94115
shared ? '_shared' : nil
95116
].compact.join
96117
end
97118

98-
def execute_advisory(function, lock_keys, lock_name)
99-
result = query_value(prepare_sql(function, lock_keys, lock_name))
100-
LOCK_RESULT_VALUES.include?(result)
119+
def execute_advisory(function, lock_keys, lock_name, blocking: false)
120+
if blocking
121+
# Blocking locks return void - if the query executes successfully, the lock was acquired
122+
query_value(prepare_sql(function, lock_keys, lock_name))
123+
true
124+
else
125+
# Non-blocking try locks return boolean
126+
result = query_value(prepare_sql(function, lock_keys, lock_name))
127+
LOCK_RESULT_VALUES.include?(result)
128+
end
101129
end
102130

103131
def prepare_sql(function, lock_keys, lock_name)
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# frozen_string_literal: true
2+
3+
require 'test_helper'
4+
5+
# Universal blocking tests - work on all adapters
6+
module BlockingTestCases
7+
extend ActiveSupport::Concern
8+
9+
included do
10+
setup do
11+
@lock_name = 'test_blocking_lock'
12+
end
13+
14+
test 'blocking lock acquires lock successfully' do
15+
result = model_class.with_advisory_lock(@lock_name, blocking: true) do
16+
'success'
17+
end
18+
assert_equal('success', result)
19+
end
20+
end
21+
end
22+
23+
class PostgreSQLBlockingTest < GemTestCase
24+
include BlockingTestCases
25+
26+
def model_class
27+
Tag
28+
end
29+
30+
def setup
31+
super
32+
Tag.delete_all
33+
end
34+
35+
test 'blocking lock waits for lock to be released' do
36+
lock_acquired = false
37+
thread1_finished = false
38+
39+
thread1 = Thread.new do
40+
Tag.connection_pool.with_connection do
41+
Tag.transaction do
42+
Tag.with_advisory_lock(@lock_name, blocking: true, transaction: true) do
43+
lock_acquired = true
44+
sleep(0.5)
45+
thread1_finished = true
46+
end
47+
end
48+
end
49+
end
50+
51+
sleep(0.1) until lock_acquired
52+
53+
thread2_result = nil
54+
thread2 = Thread.new do
55+
Tag.connection_pool.with_connection do
56+
Tag.transaction do
57+
thread2_result = Tag.with_advisory_lock(@lock_name, blocking: true, transaction: true) do
58+
'thread2_success'
59+
end
60+
end
61+
end
62+
end
63+
64+
thread1.join
65+
thread2.join
66+
67+
assert(thread1_finished, 'Thread 1 should have finished')
68+
assert_equal('thread2_success', thread2_result, 'Thread 2 should have acquired lock after thread 1 released it')
69+
end
70+
71+
test 'blocking lock can be used with shared locks' do
72+
thread1_result = nil
73+
thread2_result = nil
74+
75+
thread1 = Thread.new do
76+
Tag.connection_pool.with_connection do
77+
Tag.transaction do
78+
thread1_result = Tag.with_advisory_lock(@lock_name, blocking: true, shared: true, transaction: true) do
79+
'shared1'
80+
end
81+
end
82+
end
83+
end
84+
85+
thread2 = Thread.new do
86+
Tag.connection_pool.with_connection do
87+
Tag.transaction do
88+
thread2_result = Tag.with_advisory_lock(@lock_name, blocking: true, shared: true, transaction: true) do
89+
'shared2'
90+
end
91+
end
92+
end
93+
end
94+
95+
thread1.join
96+
thread2.join
97+
98+
assert_equal('shared1', thread1_result)
99+
assert_equal('shared2', thread2_result)
100+
end
101+
end
102+
103+
class MySQLBlockingTest < GemTestCase
104+
include BlockingTestCases
105+
106+
def model_class
107+
MysqlTag
108+
end
109+
110+
def setup
111+
super
112+
MysqlTag.delete_all
113+
end
114+
end
115+
116+
# Deadlock test requires non-transactional mode to work properly
117+
class PostgreSQLDeadlockTest < GemTestCase
118+
self.use_transactional_tests = false
119+
120+
def setup
121+
super
122+
@lock_name = 'test_blocking_lock'
123+
Tag.delete_all
124+
end
125+
126+
test 'blocking lock detects deadlocks and returns false' do
127+
deadlock_detected = false
128+
thread1_started = Concurrent::AtomicBoolean.new(false)
129+
thread2_started = Concurrent::AtomicBoolean.new(false)
130+
131+
thread1 = Thread.new do
132+
Tag.connection_pool.with_connection do
133+
Tag.transaction do
134+
Tag.with_advisory_lock('lock_a', blocking: true, transaction: true) do
135+
thread1_started.make_true
136+
sleep(0.1) until thread2_started.true?
137+
138+
result = Tag.with_advisory_lock('lock_b', blocking: true, transaction: true) do
139+
'should_not_reach'
140+
end
141+
deadlock_detected = true if result == false
142+
end
143+
end
144+
rescue ActiveRecord::StatementInvalid => e
145+
deadlock_detected = true if e.message.downcase.include?('deadlock')
146+
end
147+
end
148+
149+
thread2 = Thread.new do
150+
Tag.connection_pool.with_connection do
151+
Tag.transaction do
152+
Tag.with_advisory_lock('lock_b', blocking: true, transaction: true) do
153+
thread2_started.make_true
154+
sleep(0.1) until thread1_started.true?
155+
156+
result = Tag.with_advisory_lock('lock_a', blocking: true, transaction: true) do
157+
'should_not_reach'
158+
end
159+
deadlock_detected = true if result == false
160+
end
161+
end
162+
rescue ActiveRecord::StatementInvalid => e
163+
deadlock_detected = true if e.message.downcase.include?('deadlock')
164+
end
165+
end
166+
167+
joined1 = thread1.join(10)
168+
joined2 = thread2.join(10)
169+
170+
unless joined1 && joined2
171+
thread1.kill if thread1.alive?
172+
thread2.kill if thread2.alive?
173+
flunk 'Deadlock detection timed out - threads did not complete within 10 seconds'
174+
end
175+
176+
assert(deadlock_detected, 'Deadlock should have been detected by PostgreSQL')
177+
end
178+
end

test/with_advisory_lock/shared_test.rb

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,6 @@ class PostgreSQLSharedLocksTest < GemTestCase
9797
two.cleanup!
9898
end
9999

100-
test 'allows shared lock to be upgraded to an exclusive lock' do
101-
skip 'PostgreSQL lock visibility issue - locks acquired via advisory lock methods not showing in pg_locks'
102-
end
103100
end
104101

105102
class MySQLSharedLocksTest < GemTestCase

test/with_advisory_lock/transaction_test.rb

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ class PostgreSQLTransactionScopingTest < GemTestCase
1212
end
1313
end
1414

15-
test 'session locks release after the block executes' do
16-
skip 'PostgreSQL lock visibility issue - locks acquired via advisory lock methods not showing in pg_locks'
17-
end
18-
1915
test 'session locks release when transaction fails inside block' do
2016
Tag.transaction do
2117
assert_equal(0, @pg_lock_count.call)
@@ -31,10 +27,6 @@ class PostgreSQLTransactionScopingTest < GemTestCase
3127
end
3228
end
3329

34-
test 'transaction level locks hold until the transaction completes' do
35-
skip 'PostgreSQL lock visibility issue - locks acquired via advisory lock methods not showing in pg_locks'
36-
end
37-
3830
test 'raises an error when attempting to use transaction level locks outside a transaction' do
3931
exception = assert_raises(ArgumentError) do
4032
Tag.with_advisory_lock 'test', transaction: true do

0 commit comments

Comments
 (0)