Skip to content

Commit 1cf551b

Browse files
authored
Add advisory_lock utility from AWX (ansible#661)
This migrates `advisory_lock` from AWX to DAB. This is similar in spirit to ansible#660 Importantly, I went the route of _not_ adding a new dependency, because there was only a single method from the library we were using, and I'm pretty confident that the MIT license lets you do whatever you want. In doing this, I deleted the use of `six` for python2 compatibility which further reduced the complexity of the code I ported.
1 parent 7e0c013 commit 1cf551b

File tree

3 files changed

+217
-2
lines changed

3 files changed

+217
-2
lines changed

ansible_base/lib/utils/db.py

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from contextlib import contextmanager
2+
from zlib import crc32
23

3-
from django.db import connection, transaction
4+
from django.db import DEFAULT_DB_ALIAS, connection, connections, transaction
45
from django.db.migrations.executor import MigrationExecutor
56

67

@@ -25,3 +26,126 @@ def migrations_are_complete() -> bool:
2526
executor = MigrationExecutor(connection)
2627
plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
2728
return not bool(plan)
29+
30+
31+
# NOTE: the django_pglocks_advisory_lock context manager was forked from the django-pglocks v1.0.4
32+
# that was licensed under the MIT license
33+
34+
35+
@contextmanager
36+
def django_pglocks_advisory_lock(lock_id, shared=False, wait=True, using=None):
37+
38+
if using is None:
39+
using = DEFAULT_DB_ALIAS
40+
41+
# Assemble the function name based on the options.
42+
43+
function_name = 'pg_'
44+
45+
if not wait:
46+
function_name += 'try_'
47+
48+
function_name += 'advisory_lock'
49+
50+
if shared:
51+
function_name += '_shared'
52+
53+
release_function_name = 'pg_advisory_unlock'
54+
if shared:
55+
release_function_name += '_shared'
56+
57+
# Format up the parameters.
58+
59+
tuple_format = False
60+
61+
if isinstance(
62+
lock_id,
63+
(
64+
list,
65+
tuple,
66+
),
67+
):
68+
if len(lock_id) != 2:
69+
raise ValueError("Tuples and lists as lock IDs must have exactly two entries.")
70+
71+
if not isinstance(lock_id[0], int) or not isinstance(lock_id[1], int):
72+
raise ValueError("Both members of a tuple/list lock ID must be integers")
73+
74+
tuple_format = True
75+
elif isinstance(lock_id, str):
76+
# Generates an id within postgres integer range (-2^31 to 2^31 - 1).
77+
# crc32 generates an unsigned integer in Py3, we convert it into
78+
# a signed integer using 2's complement (this is a noop in Py2)
79+
pos = crc32(lock_id.encode("utf-8"))
80+
lock_id = (2**31 - 1) & pos
81+
if pos & 2**31:
82+
lock_id -= 2**31
83+
elif not isinstance(lock_id, int):
84+
raise ValueError("Cannot use %s as a lock id" % lock_id)
85+
86+
if tuple_format:
87+
base = "SELECT %s(%d, %d)"
88+
params = (
89+
lock_id[0],
90+
lock_id[1],
91+
)
92+
else:
93+
base = "SELECT %s(%d)"
94+
params = (lock_id,)
95+
96+
acquire_params = (function_name,) + params
97+
98+
command = base % acquire_params
99+
cursor = connections[using].cursor()
100+
101+
cursor.execute(command)
102+
103+
if not wait:
104+
acquired = cursor.fetchone()[0]
105+
else:
106+
acquired = True
107+
108+
try:
109+
yield acquired
110+
finally:
111+
if acquired:
112+
release_params = (release_function_name,) + params
113+
114+
command = base % release_params
115+
cursor.execute(command)
116+
117+
cursor.close()
118+
119+
120+
@contextmanager
121+
def advisory_lock(*args, lock_session_timeout_milliseconds=0, **kwargs):
122+
"""Context manager that wraps the pglocks advisory lock
123+
124+
This obtains a named lock in postgres, idenfied by the args passed in
125+
usually the lock identifier is a simple string.
126+
127+
@param: wait If True, block until the lock is obtained
128+
@param: shared Whether or not the lock is shared
129+
@param: lock_session_timeout_milliseconds Postgres-level timeout
130+
@param: using django database identifier
131+
"""
132+
if connection.vendor == "postgresql":
133+
cur = None
134+
idle_in_transaction_session_timeout = None
135+
idle_session_timeout = None
136+
if lock_session_timeout_milliseconds > 0:
137+
with connection.cursor() as cur:
138+
idle_in_transaction_session_timeout = cur.execute("SHOW idle_in_transaction_session_timeout").fetchone()[0]
139+
idle_session_timeout = cur.execute("SHOW idle_session_timeout").fetchone()[0]
140+
cur.execute("SET idle_in_transaction_session_timeout = %s", (lock_session_timeout_milliseconds,))
141+
cur.execute("SET idle_session_timeout = %s", (lock_session_timeout_milliseconds,))
142+
with django_pglocks_advisory_lock(*args, **kwargs) as internal_lock:
143+
yield internal_lock
144+
if lock_session_timeout_milliseconds > 0:
145+
with connection.cursor() as cur:
146+
cur.execute("SET idle_in_transaction_session_timeout = %s", (idle_in_transaction_session_timeout,))
147+
cur.execute("SET idle_session_timeout = %s", (idle_session_timeout,))
148+
elif connection.vendor == "sqlite":
149+
yield True
150+
else:
151+
raise RuntimeError(f'Advisory lock not implemented for database type {connection.vendor}')

docs/lib/advisory_lock.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
## Database Named Locks
2+
3+
Django-ansible-base hosts its own specialized utility for obtaining named locks.
4+
This follows the same contract as documented in the django-pglocks library
5+
6+
https://pypi.org/project/django-pglocks/
7+
8+
Due to a multitude of needs relevant to production use, discovered through its
9+
use in AWX, a number of points of divergence have emerged such as:
10+
11+
- the need to have it not error when running sqlite3 tests
12+
- stuck processes holding the lock forever (adding pg-level idle timeout)
13+
14+
The use for the purpose of a task would typically look like this
15+
16+
```python
17+
from ansible_base.lib.utils.db import advisory_lock
18+
19+
20+
def my_task():
21+
with advisory_lock('my_task_lock', wait=False) as held:
22+
if held is False:
23+
return
24+
# continue to run logic in my_task
25+
```
26+
27+
This is very useful to assure that no other process _in the cluster_ connected
28+
to the same postgres instance runs `my_task` at the same time as the process
29+
calling it here.
30+
31+
The specific choice of `wait=False` and what to do when another task holds the lock,
32+
is the choice of the programmer in the specific case.
33+
In this case, the `return` would be okay in the situation where `my_task` is idempotent,
34+
and there is a "fallback" schedule in case a call was missed.
35+
The blocking/non-blocking choices are very dependent on the specific design and situation.
Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,65 @@
1+
import threading
2+
import time
3+
14
import pytest
5+
from django.db import connection
6+
from django.db.utils import OperationalError
27

3-
from ansible_base.lib.utils.db import migrations_are_complete
8+
from ansible_base.lib.utils.db import advisory_lock, migrations_are_complete
49

510

611
@pytest.mark.django_db
712
def test_migrations_are_complete():
813
"If you are running tests, migrations (test database) should be complete"
914
assert migrations_are_complete()
15+
16+
17+
class TestAdvisoryLock:
18+
@pytest.fixture(autouse=True)
19+
def skip_if_sqlite(self):
20+
if connection.vendor == 'sqlite':
21+
pytest.skip('Advisory lock is not written for sqlite')
22+
23+
@pytest.mark.django_db
24+
def test_get_unclaimed_lock(self):
25+
with advisory_lock('test_get_unclaimed_lock'):
26+
pass
27+
28+
@staticmethod
29+
def background_task(django_db_blocker):
30+
# HACK: as a thread the pytest.mark.django_db will not work
31+
django_db_blocker.unblock()
32+
with advisory_lock('background_task_lock'):
33+
time.sleep(0.1)
34+
35+
@pytest.mark.django_db
36+
def test_determine_lock_is_held(self, django_db_blocker):
37+
thread = threading.Thread(target=TestAdvisoryLock.background_task, args=(django_db_blocker,))
38+
thread.start()
39+
for _ in range(5):
40+
with advisory_lock('background_task_lock', wait=False) as held:
41+
if held is False:
42+
break
43+
time.sleep(0.01)
44+
else:
45+
raise RuntimeError('Other thread never obtained lock')
46+
thread.join()
47+
48+
@pytest.mark.django_db
49+
def test_tuple_lock(self):
50+
with advisory_lock([1234, 4321]):
51+
pass
52+
53+
@pytest.mark.django_db
54+
def test_invalid_tuple_name(self):
55+
with pytest.raises(ValueError):
56+
with advisory_lock(['test_invalid_tuple_name', 'foo']):
57+
pass
58+
59+
@pytest.mark.django_db
60+
def test_lock_session_timeout_milliseconds(self):
61+
with pytest.raises(OperationalError) as exc:
62+
# uses miliseconds units
63+
with advisory_lock('test_lock_session_timeout_milliseconds', lock_session_timeout_milliseconds=2):
64+
time.sleep(3)
65+
assert 'the connection is lost' in str(exc)

0 commit comments

Comments
 (0)