Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ansible_base/lib/management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Generated by Claude Sonnet 4
79 changes: 79 additions & 0 deletions ansible_base/lib/management/advisory_locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Generated by Claude Sonnet 4
import json

from django.core.management.base import BaseCommand

from ansible_base.lib.utils.db import advisory_lock_id_to_debug_info, get_active_advisory_locks


class AdvisoryLocksCommand(BaseCommand):
"""Base command class for advisory lock management."""

help = "Show currently active PostgreSQL advisory locks"

def add_arguments(self, parser):
parser.add_argument(
'--format',
choices=['table', 'json'],
default='table',
help='Output format (default: table)',
)
parser.add_argument(
'--show-debug-info',
action='store_true',
help='Include debug information for lock IDs',
)

def handle(self, *args, **options):
locks = get_active_advisory_locks()

if not locks:
self.stdout.write("No active advisory locks found.")
return

if options['format'] == 'json':
self._output_json(locks, options['show_debug_info'])
else:
self._output_table(locks, options['show_debug_info'])

def _output_json(self, locks, show_debug_info):
"""Output locks in JSON format."""
if show_debug_info:
for lock in locks:
# Add debug info for the objid (main lock ID)
lock['debug_info'] = advisory_lock_id_to_debug_info(lock['objid'])

self.stdout.write(json.dumps(locks, indent=2, default=str))

def _output_table(self, locks, show_debug_info):
"""Output locks in table format."""
# Header
headers = ['PID', 'Mode', 'Granted', 'Class ID', 'Object ID', 'Object Sub ID']
if show_debug_info:
headers.extend(['Hex', 'Unsigned CRC32', 'High Bit'])

self.stdout.write(self.style.SUCCESS(' | '.join(headers)))
self.stdout.write('-' * (len(' | '.join(headers)) + 20))

# Rows
for lock in locks:
row = [
str(lock['pid']),
lock['mode'],
'Yes' if lock['granted'] else 'No',
str(lock['classid']),
str(lock['objid']),
str(lock['objsubid']),
]

if show_debug_info:
debug_info = advisory_lock_id_to_debug_info(lock['objid'])
row.extend(
[
debug_info['hex_representation'],
str(debug_info['unsigned_crc32']),
'Yes' if debug_info['had_high_bit_set'] else 'No',
]
)

self.stdout.write(' | '.join(row))
104 changes: 95 additions & 9 deletions ansible_base/lib/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

import psycopg
from django.conf import settings
from django.db import DEFAULT_DB_ALIAS, OperationalError, connection, connections, transaction
from django.db import DEFAULT_DB_ALIAS, OperationalError, connection, transaction
from django.db.backends.postgresql.base import DatabaseWrapper as PsycopgDatabaseWrapper
from django.db.migrations.executor import MigrationExecutor
from django.db.transaction import get_connection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,6 +41,97 @@ def migrations_are_complete() -> bool:
# that was licensed under the MIT license


def string_to_advisory_lock_id(lock_string: str) -> int:
"""Convert a string to a PostgreSQL advisory lock ID integer.

Generates an id within postgres integer range (-2^31 to 2^31 - 1).
crc32 generates an unsigned integer in Py3, we convert it into
a signed integer using 2's complement (this is a noop in Py2).

Args:
lock_string: The string to convert to a lock ID

Returns:
Integer lock ID suitable for PostgreSQL advisory locks
"""
pos = crc32(lock_string.encode("utf-8"))
lock_id = (2**31 - 1) & pos
if pos & 2**31:
lock_id -= 2**31
return lock_id


def advisory_lock_id_to_debug_info(lock_id: int) -> dict:
"""Convert an advisory lock ID back to debug information.

Note: This cannot reverse the string due to CRC32 hash collisions,
but provides debugging information about the lock ID.

Args:
lock_id: The integer lock ID

Returns:
Dictionary with debug information about the lock ID
"""
# Convert back to unsigned for analysis
if lock_id < 0:
unsigned_value = lock_id + 2**31
had_high_bit = True
else:
unsigned_value = lock_id
had_high_bit = False

return {
'lock_id': lock_id,
'unsigned_crc32': unsigned_value,
'had_high_bit_set': had_high_bit,
'hex_representation': hex(lock_id),
}


def get_active_advisory_locks(using=None) -> list:
"""Get a list of all currently held advisory locks.

Args:
using: Database alias to use (defaults to DEFAULT_DB_ALIAS)

Returns:
List of dictionaries containing lock information
"""
if using is None:
using = DEFAULT_DB_ALIAS

conn = get_connection(using)
if conn.vendor != "postgresql":
return []

with conn.cursor() as cursor:
cursor.execute(
"""
SELECT locktype, classid, objid, objsubid, pid, mode, granted
FROM pg_locks
WHERE locktype = 'advisory'
"""
)

locks = []
for row in cursor.fetchall():
locktype, classid, objid, objsubid, pid, mode, granted = row
locks.append(
{
'locktype': locktype,
'classid': classid,
'objid': objid,
'objsubid': objsubid,
'pid': pid,
'mode': mode,
'granted': granted,
}
)

return locks


@contextmanager
def django_pglocks_advisory_lock(lock_id, shared=False, wait=True, using=None):

Expand Down Expand Up @@ -81,13 +173,7 @@ def django_pglocks_advisory_lock(lock_id, shared=False, wait=True, using=None):

tuple_format = True
elif isinstance(lock_id, str):
# Generates an id within postgres integer range (-2^31 to 2^31 - 1).
# crc32 generates an unsigned integer in Py3, we convert it into
# a signed integer using 2's complement (this is a noop in Py2)
pos = crc32(lock_id.encode("utf-8"))
lock_id = (2**31 - 1) & pos
if pos & 2**31:
lock_id -= 2**31
lock_id = string_to_advisory_lock_id(lock_id)
elif not isinstance(lock_id, int):
raise ValueError("Cannot use %s as a lock id" % lock_id)

Expand All @@ -104,7 +190,7 @@ def django_pglocks_advisory_lock(lock_id, shared=False, wait=True, using=None):
acquire_params = (function_name,) + params

command = base % acquire_params
cursor = connections[using].cursor()
cursor = get_connection(using).cursor()

cursor.execute(command)

Expand Down
37 changes: 37 additions & 0 deletions docs/lib/advisory_lock.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,40 @@ is the choice of the programmer in the specific case.
In this case, the `return` would be okay in the situation where `my_task` is idempotent,
and there is a "fallback" schedule in case a call was missed.
The blocking/non-blocking choices are very dependent on the specific design and situation.

## Debugging Advisory Locks

For debugging purposes, several utility functions are available to inspect active advisory locks:

### Get Active Advisory Locks

```python
from ansible_base.lib.utils.db import get_active_advisory_locks

# Get all active advisory locks
active_locks = get_active_advisory_locks()
for lock in active_locks:
print(f"Lock ID: {lock['objid']}, PID: {lock['pid']}")
```

### Convert String to Lock ID

```python
from ansible_base.lib.utils.db import string_to_advisory_lock_id

# Convert a string to the corresponding advisory lock ID
lock_id = string_to_advisory_lock_id('my_task_lock')
print(f"Lock ID for 'my_task_lock': {lock_id}")
```

### Convert Lock ID to Debug Info

```python
from ansible_base.lib.utils.db import advisory_lock_id_to_debug_info

# Get debug information for a specific lock ID
debug_info = advisory_lock_id_to_debug_info(lock_id)
print(f"Original string: {debug_info}")
```

These debugging utilities are particularly useful when troubleshooting stuck locks or understanding which processes are holding specific advisory locks in a PostgreSQL database.
8 changes: 8 additions & 0 deletions test_app/management/commands/advisory_locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Generated by Claude Sonnet 4
from ansible_base.lib.management.advisory_locks import AdvisoryLocksCommand


class Command(AdvisoryLocksCommand):
"""Management command to show active PostgreSQL advisory locks."""

pass
115 changes: 115 additions & 0 deletions test_app/tests/lib/utils/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@

from ansible_base.lib.utils.db import (
advisory_lock,
advisory_lock_id_to_debug_info,
get_active_advisory_locks,
get_pg_notify_params,
migrations_are_complete,
psycopg_conn_string_from_settings_dict,
psycopg_connection_from_django,
psycopg_kwargs_from_settings_dict,
string_to_advisory_lock_id,
)


Expand Down Expand Up @@ -148,6 +151,118 @@ def test_psycopg_connection_from_django_new_conn(self):
assert isinstance(psycopg_connection_from_django(), psycopg.Connection)


class TestStringToAdvisoryLockId:
"""Test the string to advisory lock ID conversion function.

Generated by Claude Code (Sonnet 4)
"""

def test_string_to_advisory_lock_id_basic(self):
"""Test basic string to lock ID conversion."""
lock_id = string_to_advisory_lock_id("test_string")
assert isinstance(lock_id, int)
assert -(2**31) <= lock_id <= 2**31 - 1

def test_string_to_advisory_lock_id_consistency(self):
"""Test that the same string always produces the same lock ID."""
test_string = "consistent_test"
lock_id1 = string_to_advisory_lock_id(test_string)
lock_id2 = string_to_advisory_lock_id(test_string)
assert lock_id1 == lock_id2

def test_string_to_advisory_lock_id_different_strings(self):
"""Test that different strings produce different lock IDs."""
lock_id1 = string_to_advisory_lock_id("string1")
lock_id2 = string_to_advisory_lock_id("string2")
assert lock_id1 != lock_id2

def test_string_to_advisory_lock_id_unicode(self):
"""Test string to lock ID conversion with unicode characters."""
lock_id = string_to_advisory_lock_id("test_🔒_unicode")
assert isinstance(lock_id, int)
assert -(2**31) <= lock_id <= 2**31 - 1


class TestAdvisoryLockIdToDebugInfo:
"""Test the advisory lock ID to debug info function.

Generated by Claude Code (Sonnet 4)
"""

def test_advisory_lock_id_to_debug_info_positive(self):
"""Test debug info for positive lock ID."""
lock_id = 12345
debug_info = advisory_lock_id_to_debug_info(lock_id)

assert debug_info['lock_id'] == lock_id
assert debug_info['unsigned_crc32'] == lock_id
assert debug_info['had_high_bit_set'] is False
assert debug_info['hex_representation'] == hex(lock_id)

def test_advisory_lock_id_to_debug_info_negative(self):
"""Test debug info for negative lock ID."""
lock_id = -12345
debug_info = advisory_lock_id_to_debug_info(lock_id)

assert debug_info['lock_id'] == lock_id
assert debug_info['unsigned_crc32'] == lock_id + 2**31
assert debug_info['had_high_bit_set'] is True
assert debug_info['hex_representation'] == hex(lock_id)

def test_advisory_lock_id_to_debug_info_roundtrip(self):
"""Test debug info for a string-generated lock ID."""
test_string = "test_debug_roundtrip"
lock_id = string_to_advisory_lock_id(test_string)
debug_info = advisory_lock_id_to_debug_info(lock_id)

assert debug_info['lock_id'] == lock_id
assert isinstance(debug_info['unsigned_crc32'], int)
assert isinstance(debug_info['had_high_bit_set'], bool)
assert debug_info['hex_representation'] == hex(lock_id)


class TestGetActiveAdvisoryLocks(SkipIfSqlite):
"""Test the get active advisory locks function.

Generated by Claude Code (Sonnet 4)
"""

@pytest.mark.django_db
def test_get_active_advisory_locks_empty(self):
"""Test getting active locks when none are held."""
locks = get_active_advisory_locks()
assert isinstance(locks, list)
# We can't guarantee no locks since other tests might be running

@pytest.mark.django_db
def test_get_active_advisory_locks_with_lock(self):
"""Test getting active locks when we hold one."""
test_lock_name = "test_get_active_locks"

with advisory_lock(test_lock_name):
locks = get_active_advisory_locks()
assert isinstance(locks, list)
# Should have at least our lock
assert len(locks) >= 1

# Check that lock entries have expected structure
for lock in locks:
assert 'locktype' in lock
assert 'classid' in lock
assert 'objid' in lock
assert 'objsubid' in lock
assert 'pid' in lock
assert 'mode' in lock
assert 'granted' in lock
assert lock['locktype'] == 'advisory'

@pytest.mark.django_db
def test_get_active_advisory_locks_sqlite_returns_empty(self):
"""Test that SQLite returns empty list."""
# This test will be skipped by SkipIfSqlite for completeness
pass

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: SQLite Advisory Lock Test Skipped

The test_get_active_advisory_locks_sqlite_returns_empty test aims to verify SQLite behavior, but its parent class inherits SkipIfSqlite. This causes the test to be skipped on SQLite, preventing verification that get_active_advisory_locks() returns an empty list for that database.

Fix in Cursor Fix in Web


class TestAdvisoryLock(SkipIfSqlite):
THREAD_WAIT_TIME = 0.1

Expand Down
Loading