diff --git a/README.md b/README.md index 606abb7..efad627 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ will be yielded to. If the lock is currently being held, the block will not be called. > **Note** -> +> > If a non-nil value is provided for `timeout_seconds`, the block will *not* be invoked if the lock cannot be acquired within that time-frame. In this case, `with_advisory_lock` will return `false`, while `with_advisory_lock!` will raise a `WithAdvisoryLock::FailedToAcquireLock` error. @@ -72,6 +72,32 @@ to `true`. Note: transaction-level locks will not be reflected by `.current_advisory_lock` when the block has returned. +### Blocking locks (PostgreSQL only) + +By default, PostgreSQL advisory locks use a polling strategy with Ruby-level +retries and sleeps. Setting `blocking: true` switches to database-level blocking +locks that enable PostgreSQL's deadlock detection: + +```ruby +User.with_advisory_lock("lock_name", blocking: true, transaction: true) do + # PostgreSQL will detect circular lock waits and raise an error + # instead of sleeping forever +end +``` + +**Benefits:** +- **Deadlock detection**: PostgreSQL detects circular waits and raises `PG::TRDeadlockDetected` after ~1 second (configurable via `deadlock_timeout`) +- **No polling overhead**: The database handles the wait queue instead of Ruby sleep/retry loops +- **Clean failure**: Returns `false` on deadlock instead of infinite retries + +**When to use:** +- When acquiring multiple locks in your application (risk of deadlock) +- When you need PostgreSQL to detect and break circular lock dependencies +- When you want to avoid Ruby-level polling overhead + +**Note:** MySQL ignores this option since `GET_LOCK` already provides native +timeout and deadlock detection via the MDL subsystem. + ### Return values The return value of `with_advisory_lock_result` is a `WithAdvisoryLock::Result` @@ -84,7 +110,7 @@ block, if the lock was able to be acquired and the block yielded, or `false`, if you provided a timeout_seconds value and the lock was not able to be acquired in time. -`with_advisory_lock!` is similar to `with_advisory_lock`, but raises a `WithAdvisoryLock::FailedToAcquireLock` error if the lock was not able to be acquired in time. +`with_advisory_lock!` is similar to `with_advisory_lock`, but raises a `WithAdvisoryLock::FailedToAcquireLock` error if the lock was not able to be acquired in time. ### Testing for the current lock status diff --git a/lib/with_advisory_lock/core_advisory.rb b/lib/with_advisory_lock/core_advisory.rb index d0991e0..6ab00be 100644 --- a/lib/with_advisory_lock/core_advisory.rb +++ b/lib/with_advisory_lock/core_advisory.rb @@ -15,7 +15,7 @@ def advisory_lock_stack def with_advisory_lock_if_needed(lock_name, options = {}, &block) options = { timeout_seconds: options } unless options.respond_to?(:fetch) - options.assert_valid_keys :timeout_seconds, :shared, :transaction, :disable_query_cache + options.assert_valid_keys :timeout_seconds, :shared, :transaction, :disable_query_cache, :blocking # Validate transaction-level locks are used within a transaction if options.fetch(:transaction, false) && !transaction_open? @@ -56,12 +56,14 @@ def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &) timeout_seconds = options.fetch(:timeout_seconds, nil) shared = options.fetch(:shared, false) transaction = options.fetch(:transaction, false) + blocking = options.fetch(:blocking, false) lock_keys = lock_keys_for(lock_name) # MySQL supports database-level timeout in GET_LOCK, skip Ruby-level polling - if supports_database_timeout? || timeout_seconds&.zero? - yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &) + # PostgreSQL blocking locks also skip polling and let the database handle waiting + if supports_database_timeout? || timeout_seconds&.zero? || blocking + yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, blocking, &) else yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &) @@ -72,7 +74,7 @@ def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, timeout_seconds, &) give_up_at = timeout_seconds ? Time.now + timeout_seconds : nil while give_up_at.nil? || Time.now < give_up_at - r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, &) + r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, false, &) return r if r.lock_was_acquired? # Randomizing sleep time may help reduce contention. @@ -81,9 +83,9 @@ def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, Result.new(lock_was_acquired: false) end - def yield_with_lock(lock_keys, lock_name, _lock_str, lock_stack_item, shared, transaction, timeout_seconds = nil) + def yield_with_lock(lock_keys, lock_name, _lock_str, lock_stack_item, shared, transaction, timeout_seconds = nil, blocking = false) if try_advisory_lock(lock_keys, lock_name: lock_name, shared: shared, transaction: transaction, - timeout_seconds: timeout_seconds) + timeout_seconds: timeout_seconds, blocking: blocking) begin advisory_lock_stack.push(lock_stack_item) result = block_given? ? yield : nil diff --git a/lib/with_advisory_lock/mysql_advisory.rb b/lib/with_advisory_lock/mysql_advisory.rb index 756d757..02e35ce 100644 --- a/lib/with_advisory_lock/mysql_advisory.rb +++ b/lib/with_advisory_lock/mysql_advisory.rb @@ -8,10 +8,14 @@ module MySQLAdvisory LOCK_PREFIX_ENV = 'WITH_ADVISORY_LOCK_PREFIX' - def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seconds: nil) + def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seconds: nil, blocking: false) raise ArgumentError, 'shared locks are not supported on MySQL' if shared raise ArgumentError, 'transaction level locks are not supported on MySQL' if transaction + # Note: blocking parameter is accepted for API compatibility but ignored for MySQL + # MySQL's GET_LOCK already provides native timeout support, making the blocking + # parameter redundant. MySQL doesn't have separate try/blocking functions like PostgreSQL. + # MySQL GET_LOCK supports native timeout: # - timeout_seconds = nil: wait indefinitely (-1) # - timeout_seconds = 0: try once, no wait (0) diff --git a/lib/with_advisory_lock/postgresql_advisory.rb b/lib/with_advisory_lock/postgresql_advisory.rb index 6673bdf..9c9b2b3 100644 --- a/lib/with_advisory_lock/postgresql_advisory.rb +++ b/lib/with_advisory_lock/postgresql_advisory.rb @@ -10,11 +10,23 @@ module PostgreSQLAdvisory LOCK_RESULT_VALUES = ['t', true].freeze ERROR_MESSAGE_REGEX = / ERROR: +current transaction is aborted,/ - def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seconds: nil) + def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seconds: nil, blocking: false) # timeout_seconds is accepted for compatibility but ignored - PostgreSQL doesn't support # native timeouts with pg_try_advisory_lock, requiring Ruby-level polling instead - function = advisory_try_lock_function(transaction, shared) - execute_advisory(function, lock_keys, lock_name) + function = if blocking + advisory_lock_function(transaction, shared) + else + advisory_try_lock_function(transaction, shared) + end + execute_advisory(function, lock_keys, lock_name, blocking: blocking) + rescue ActiveRecord::StatementInvalid => e + # PostgreSQL deadlock detection raises PG::TRDeadlockDetected (SQLSTATE 40P01) + # When using blocking locks, treat deadlocks as lock acquisition failure + if blocking && (e.cause.is_a?(PG::TRDeadlockDetected) || e.message.include?('deadlock detected')) + false + else + raise + end end def release_advisory_lock(*args) @@ -88,6 +100,15 @@ def advisory_try_lock_function(transaction_scope, shared) ].compact.join end + def advisory_lock_function(transaction_scope, shared) + [ + 'pg_advisory', + transaction_scope ? '_xact' : nil, + '_lock', + shared ? '_shared' : nil + ].compact.join + end + def advisory_unlock_function(shared) [ 'pg_advisory_unlock', @@ -95,9 +116,16 @@ def advisory_unlock_function(shared) ].compact.join end - def execute_advisory(function, lock_keys, lock_name) - result = query_value(prepare_sql(function, lock_keys, lock_name)) - LOCK_RESULT_VALUES.include?(result) + def execute_advisory(function, lock_keys, lock_name, blocking: false) + if blocking + # Blocking locks return void - if the query executes successfully, the lock was acquired + query_value(prepare_sql(function, lock_keys, lock_name)) + true + else + # Non-blocking try locks return boolean + result = query_value(prepare_sql(function, lock_keys, lock_name)) + LOCK_RESULT_VALUES.include?(result) + end end def prepare_sql(function, lock_keys, lock_name) diff --git a/test/with_advisory_lock/blocking_test.rb b/test/with_advisory_lock/blocking_test.rb new file mode 100644 index 0000000..0e08f4c --- /dev/null +++ b/test/with_advisory_lock/blocking_test.rb @@ -0,0 +1,181 @@ +# frozen_string_literal: true + +require 'test_helper' + +module BlockingTestCases + extend ActiveSupport::Concern + + included do + setup do + @lock_name = 'test_blocking_lock' + end + + test 'blocking lock acquires lock successfully' do + result = model_class.with_advisory_lock(@lock_name, blocking: true) do + 'success' + end + assert_equal('success', result) + end + + test 'blocking lock waits for lock to be released' do + skip 'Transaction-level locks only supported on PostgreSQL' unless postgresql? + + lock_acquired = false + thread1_finished = false + + thread1 = Thread.new do + model_class.connection_pool.with_connection do + model_class.transaction do + model_class.with_advisory_lock(@lock_name, blocking: true, transaction: true) do + lock_acquired = true + sleep(0.5) # Hold lock for a bit + thread1_finished = true + end + end + end + end + + # Wait for thread1 to acquire the lock + sleep(0.1) until lock_acquired + + thread2_result = nil + thread2 = Thread.new do + model_class.connection_pool.with_connection do + model_class.transaction do + thread2_result = model_class.with_advisory_lock(@lock_name, blocking: true, transaction: true) do + 'thread2_success' + end + end + end + end + + thread1.join + thread2.join + + assert(thread1_finished, 'Thread 1 should have finished') + assert_equal('thread2_success', thread2_result, 'Thread 2 should have acquired lock after thread 1 released it') + end + + test 'blocking lock detects deadlocks and returns false' do + skip 'Deadlock detection test only for PostgreSQL' unless postgresql? + + deadlock_detected = false + thread1_started = Concurrent::AtomicBoolean.new(false) + thread2_started = Concurrent::AtomicBoolean.new(false) + + thread1 = Thread.new do + model_class.connection_pool.with_connection do + model_class.transaction do + model_class.with_advisory_lock('lock_a', blocking: true, transaction: true) do + thread1_started.make_true + # Wait for thread2 to acquire lock_b + sleep(0.1) until thread2_started.true? + + # Try to acquire lock_b - this should cause a deadlock + result = model_class.with_advisory_lock('lock_b', blocking: true, transaction: true) do + 'should_not_reach' + end + deadlock_detected = true if result == false + end + end + rescue ActiveRecord::StatementInvalid => e + # Transaction is aborted after deadlock, rollback will happen automatically + deadlock_detected = true if e.message.include?('deadlock') + end + end + + thread2 = Thread.new do + model_class.connection_pool.with_connection do + model_class.transaction do + model_class.with_advisory_lock('lock_b', blocking: true, transaction: true) do + thread2_started.make_true + # Wait for thread1 to acquire lock_a + sleep(0.1) until thread1_started.true? + + # Try to acquire lock_a - this should cause a deadlock + model_class.with_advisory_lock('lock_a', blocking: true, transaction: true) do + 'should_not_reach' + end + end + end + rescue ActiveRecord::StatementInvalid => e + deadlock_detected = true if e.message.include?('deadlock') + end + end + + thread1.join + thread2.join + + assert(deadlock_detected, 'Deadlock should have been detected by PostgreSQL') + end + + test 'blocking lock can be used with shared locks' do + skip 'Shared locks only supported on PostgreSQL' unless postgresql? + + thread1_result = nil + thread2_result = nil + + thread1 = Thread.new do + model_class.connection_pool.with_connection do + model_class.transaction do + thread1_result = model_class.with_advisory_lock(@lock_name, blocking: true, shared: true, transaction: true) do + 'shared1' + end + end + end + end + + thread2 = Thread.new do + model_class.connection_pool.with_connection do + model_class.transaction do + thread2_result = model_class.with_advisory_lock(@lock_name, blocking: true, shared: true, transaction: true) do + 'shared2' + end + end + end + end + + thread1.join + thread2.join + + assert_equal('shared1', thread1_result) + assert_equal('shared2', thread2_result) + end + + private + + def postgresql? + model_class.connection.adapter_name.downcase.include?('postgresql') + end + end +end + +class PostgreSQLBlockingTest < GemTestCase + include BlockingTestCases + + def model_class + Tag + end + + def setup + super + Tag.delete_all + end +end + +class MySQLBlockingTest < GemTestCase + include BlockingTestCases + + def model_class + MysqlTag + end + + def setup + super + MysqlTag.delete_all + end + + def postgresql? + false + end +end