Skip to content

Commit f3bfb73

Browse files
committed
Add advisory_lock utility from AWX
1 parent 7e0c013 commit f3bfb73

File tree

3 files changed

+179
-2
lines changed

3 files changed

+179
-2
lines changed

ansible_base/lib/utils/db.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from contextlib import contextmanager
2+
from zlib import crc32
23

3-
from django.db import connection, transaction
4+
from django.db import DEFAULT_DB_ALIAS, connection, connections, transaction
45
from django.db.migrations.executor import MigrationExecutor
56

67

@@ -25,3 +26,114 @@ def migrations_are_complete() -> bool:
2526
executor = MigrationExecutor(connection)
2627
plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
2728
return not bool(plan)
29+
30+
31+
# NOTE: the django_pglocks_advisory_lock context manager was forked from the django-pglocks v1.0.4
32+
# that was licensed under the MIT license
33+
34+
35+
@contextmanager
36+
def django_pglocks_advisory_lock(lock_id, shared=False, wait=True, using=None):
37+
38+
if using is None:
39+
using = DEFAULT_DB_ALIAS
40+
41+
# Assemble the function name based on the options.
42+
43+
function_name = 'pg_'
44+
45+
if not wait:
46+
function_name += 'try_'
47+
48+
function_name += 'advisory_lock'
49+
50+
if shared:
51+
function_name += '_shared'
52+
53+
release_function_name = 'pg_advisory_unlock'
54+
if shared:
55+
release_function_name += '_shared'
56+
57+
# Format up the parameters.
58+
59+
tuple_format = False
60+
61+
if isinstance(
62+
lock_id,
63+
(
64+
list,
65+
tuple,
66+
),
67+
):
68+
if len(lock_id) != 2:
69+
raise ValueError("Tuples and lists as lock IDs must have exactly two entries.")
70+
71+
if not isinstance(lock_id[0], int) or not isinstance(lock_id[1], int):
72+
raise ValueError("Both members of a tuple/list lock ID must be integers")
73+
74+
tuple_format = True
75+
elif isinstance(lock_id, str):
76+
# Generates an id within postgres integer range (-2^31 to 2^31 - 1).
77+
# crc32 generates an unsigned integer in Py3, we convert it into
78+
# a signed integer using 2's complement (this is a noop in Py2)
79+
pos = crc32(lock_id.encode("utf-8"))
80+
lock_id = (2**31 - 1) & pos
81+
if pos & 2**31:
82+
lock_id -= 2**31
83+
elif not isinstance(lock_id, int):
84+
raise ValueError("Cannot use %s as a lock id" % lock_id)
85+
86+
if tuple_format:
87+
base = "SELECT %s(%d, %d)"
88+
params = (
89+
lock_id[0],
90+
lock_id[1],
91+
)
92+
else:
93+
base = "SELECT %s(%d)"
94+
params = (lock_id,)
95+
96+
acquire_params = (function_name,) + params
97+
98+
command = base % acquire_params
99+
cursor = connections[using].cursor()
100+
101+
cursor.execute(command)
102+
103+
if not wait:
104+
acquired = cursor.fetchone()[0]
105+
else:
106+
acquired = True
107+
108+
try:
109+
yield acquired
110+
finally:
111+
if acquired:
112+
release_params = (release_function_name,) + params
113+
114+
command = base % release_params
115+
cursor.execute(command)
116+
117+
cursor.close()
118+
119+
120+
@contextmanager
121+
def advisory_lock(*args, lock_session_timeout_milliseconds=0, **kwargs):
122+
if connection.vendor == "postgresql":
123+
cur = None
124+
idle_in_transaction_session_timeout = None
125+
idle_session_timeout = None
126+
if lock_session_timeout_milliseconds > 0:
127+
with connection.cursor() as cur:
128+
idle_in_transaction_session_timeout = cur.execute("SHOW idle_in_transaction_session_timeout").fetchone()[0]
129+
idle_session_timeout = cur.execute("SHOW idle_session_timeout").fetchone()[0]
130+
cur.execute(f"SET idle_in_transaction_session_timeout = '{lock_session_timeout_milliseconds}'")
131+
cur.execute(f"SET idle_session_timeout = '{lock_session_timeout_milliseconds}'")
132+
with django_pglocks_advisory_lock(*args, **kwargs) as internal_lock:
133+
yield internal_lock
134+
if lock_session_timeout_milliseconds > 0:
135+
with connection.cursor() as cur:
136+
cur.execute(f"SET idle_in_transaction_session_timeout = '{idle_in_transaction_session_timeout}'")
137+
cur.execute(f"SET idle_session_timeout = '{idle_session_timeout}'")
138+
else:
139+
yield True

docs/lib/advisory_lock.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
## Database Named Locks
2+
3+
Django-ansible-base hosts its own specialized utility for obtaining named locks.
4+
This follows the same contract as documented in the django-pglocks library
5+
6+
https://pypi.org/project/django-pglocks/
7+
8+
Due to a multitude of needs relevant to production use, discovered through its
9+
use in AWX, a number of points of divergence have emerged such as:
10+
11+
- the need to have it not error when running sqlite3 tests
12+
- stuck processes holding the lock forever (adding pg-level idle timeout)
13+
14+
The use for the purpose of a task would typically look like this
15+
16+
```python
17+
from ansible_base.lib.utils.db import advisory_lock
18+
19+
20+
def my_task():
21+
with advisory_lock('my_task_lock', wait=False) as held:
22+
if held is False:
23+
return
24+
# continue to run logic in my_task
25+
```
26+
27+
This is very useful to assure that no other process _in the cluster_ connected
28+
to the same postgres instance runs `my_task` at the same time as the process
29+
calling it here.
30+
31+
The specific choice of `wait=False` and what to do when another task holds the lock,
32+
is the choice of the programmer in the specific case.
33+
In this case, the `return` would be okay in the situation where `my_task` is idempotent,
34+
and there is a "fallback" schedule in case a call was missed.
35+
The blocking/non-blocking choices are very dependent on the specific design and situation.
Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,39 @@
1+
import threading
2+
import time
3+
14
import pytest
25

3-
from ansible_base.lib.utils.db import migrations_are_complete
6+
from ansible_base.lib.utils.db import advisory_lock, migrations_are_complete
47

58

69
@pytest.mark.django_db
710
def test_migrations_are_complete():
811
"If you are running tests, migrations (test database) should be complete"
912
assert migrations_are_complete()
13+
14+
15+
@pytest.mark.django_db
16+
def test_get_unclaimed_lock():
17+
with advisory_lock('test_get_unclaimed_lock'):
18+
pass
19+
20+
21+
def background_task(django_db_blocker):
22+
# HACK: as a thread the pytest.mark.django_db will not work
23+
django_db_blocker.unblock()
24+
with advisory_lock('background_task_lock'):
25+
time.sleep(0.1)
26+
27+
28+
@pytest.mark.django_db
29+
def test_determine_lock_is_held(django_db_blocker):
30+
thread = threading.Thread(target=background_task, args=(django_db_blocker,))
31+
thread.start()
32+
for _ in range(5):
33+
with advisory_lock('background_task_lock', wait=False) as held:
34+
if held is False:
35+
break
36+
time.sleep(0.01)
37+
else:
38+
raise RuntimeError('Other thread never obtained lock')
39+
thread.join()

0 commit comments

Comments
 (0)