Skip to content

Commit 93371e4

Browse files
committed
Add psycopg db utils for dispatcherd use
1 parent f93c8aa commit 93371e4

File tree

2 files changed

+179
-2
lines changed

2 files changed

+179
-2
lines changed

ansible_base/lib/utils/db.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
from contextlib import contextmanager
2+
from copy import deepcopy
3+
from typing import Union
24
from zlib import crc32
35

6+
import psycopg
7+
from django.conf import settings
48
from django.db import DEFAULT_DB_ALIAS, connection, connections, transaction
9+
from django.db.backends.postgresql.base import DatabaseWrapper as PsycopgDatabaseWrapper
510
from django.db.migrations.executor import MigrationExecutor
611

712

@@ -149,3 +154,62 @@ def advisory_lock(*args, lock_session_timeout_milliseconds=0, **kwargs):
149154
yield True
150155
else:
151156
raise RuntimeError(f'Advisory lock not implemented for database type {connection.vendor}')
157+
158+
159+
# Django settings.DATABASES['alias'] dictionary type
160+
dj_db_dict = dict[str, Union[str, int]]
161+
162+
163+
def psycopg_connection_from_django(**kwargs) -> psycopg.Connection:
164+
"Compatibility with dispatcher connection factory, just returns the Django connection"
165+
return connection.connection
166+
167+
168+
def psycopg_kwargs_from_settings_dict(settings_dict: dj_db_dict) -> dict:
169+
"""Return psycopg connection creation kwargs given Django db settings info
170+
171+
:param dict setting_dict: DATABASES in Django settings
172+
:return: kwargs that can be passed to psycopg.connect, or connection classes"""
173+
psycopg_params = PsycopgDatabaseWrapper(settings_dict).get_connection_params().copy()
174+
psycopg_params.pop('cursor_factory', None)
175+
psycopg_params.pop('context', None)
176+
return psycopg_params
177+
178+
179+
def psycopg_conn_string_from_settings_dict(settings_dict: dj_db_dict) -> str:
180+
conn_params = psycopg_kwargs_from_settings_dict(settings_dict)
181+
return psycopg.conninfo.make_conninfo(**conn_params)
182+
183+
184+
def combine_settings_dict(settings_dict1: dj_db_dict, settings_dict2: dj_db_dict, **extra_options) -> dj_db_dict:
185+
"""Given two Django settings dictionaries, combine them and return a new settings_dict"""
186+
settings_dict = deepcopy(settings_dict1)
187+
settings_dict['OPTIONS'] = deepcopy(settings_dict.get('OPTIONS', {}))
188+
189+
# These extra options are used by AWX to set application_name
190+
settings_dict['OPTIONS'].update(extra_options)
191+
192+
# Apply overrides specifically for the listener connection
193+
for k, v in settings_dict2.items():
194+
if k != 'OPTIONS':
195+
settings_dict[k] = v
196+
197+
for k, v in settings_dict2.get('OPTIONS', {}).items():
198+
settings_dict['OPTIONS'][k] = v
199+
200+
return settings_dict
201+
202+
203+
def get_pg_notify_params(alias: str = DEFAULT_DB_ALIAS, **extra_options) -> dict:
204+
pg_notify_overrides = {}
205+
if hasattr(settings, 'PG_NOTIFY_DATABASES'):
206+
pg_notify_overrides = settings.PG_NOTIFY_DATABASES.get(alias, {})
207+
elif hasattr(settings, 'LISTENER_DATABASES'):
208+
pg_notify_overrides = settings.LISTENER_DATABASES.get(alias, {})
209+
210+
settings_dict = combine_settings_dict(settings.DATABASES[alias], pg_notify_overrides, **extra_options)
211+
212+
# Reuse the Django postgres DB backend to create params for the psycopg library
213+
psycopg_params = psycopg_kwargs_from_settings_dict(settings_dict)
214+
215+
return psycopg_params

test_app/tests/lib/utils/test_db.py

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
11
import threading
22
import time
33

4+
import psycopg
45
import pytest
6+
from django.conf import settings
57
from django.db import connection
68
from django.db.utils import OperationalError
9+
from django.test import override_settings
710

8-
from ansible_base.lib.utils.db import advisory_lock, migrations_are_complete
11+
from ansible_base.lib.utils.db import (
12+
advisory_lock,
13+
get_pg_notify_params,
14+
migrations_are_complete,
15+
psycopg_conn_string_from_settings_dict,
16+
psycopg_kwargs_from_settings_dict,
17+
)
918

1019

1120
@pytest.mark.django_db
@@ -14,6 +23,110 @@ def test_migrations_are_complete():
1423
assert migrations_are_complete()
1524

1625

26+
class TestPGNotifyConnection:
27+
TEST_DATABASE_DICT = {
28+
"default": {
29+
"ENGINE": "django.db.backends.postgresql",
30+
"HOST": "https://foo.invalid",
31+
"PORT": 55434,
32+
"USER": "dab_user",
33+
"PASSWORD": "dabbing",
34+
"NAME": "dab_db",
35+
}
36+
}
37+
PSYCOPG_KWARGS = {
38+
'dbname': 'dab_db',
39+
'client_encoding': 'UTF8',
40+
# kwargs containing objects can not be compared so they will be ignored
41+
# 'cursor_factory': <class 'django.db.backends.postgresql.base.Cursor'>,
42+
'user': 'dab_user',
43+
'password': 'dabbing',
44+
'host': 'https://foo.invalid',
45+
'port': 55434,
46+
# 'context': <psycopg.adapt.AdaptersMap object at 0x7f537f2d9f70>,
47+
'prepare_threshold': None,
48+
}
49+
50+
@pytest.fixture
51+
def mock_settings(self):
52+
with override_settings(DATABASES=self.TEST_DATABASE_DICT, USE_TZ=False):
53+
yield
54+
55+
def test_default_behavior(self, mock_settings):
56+
params = get_pg_notify_params()
57+
assert params == self.PSYCOPG_KWARGS
58+
59+
def test_pg_notify_extra_options(self, mock_settings):
60+
params = get_pg_notify_params(application_name='joe_connection')
61+
expected = self.PSYCOPG_KWARGS.copy()
62+
expected['application_name'] = 'joe_connection'
63+
assert params == expected
64+
65+
def test_lister_databases(self, mock_settings):
66+
LISTENER_DATABASES = {"default": {"HOST": "https://foo.anotherhost.invalid"}}
67+
with override_settings(LISTENER_DATABASES=LISTENER_DATABASES):
68+
params = get_pg_notify_params()
69+
assert params['host'] == "https://foo.anotherhost.invalid"
70+
71+
def test_pg_notify_databases(self, mock_settings):
72+
PG_NOTIFY_DATABASES = {"default": {"HOST": "https://foo.anotherhost2.invalid"}}
73+
with override_settings(PG_NOTIFY_DATABASES=PG_NOTIFY_DATABASES):
74+
params = get_pg_notify_params()
75+
assert params['host'] == "https://foo.anotherhost2.invalid"
76+
77+
def test_psycopg_kwargs_from_settings_dict(self):
78+
"More of a unit test, doing the same thing"
79+
test_dict = self.TEST_DATABASE_DICT["default"].copy()
80+
test_dict['OPTIONS'] = {'autocommit': True}
81+
with override_settings(USE_TZ=False):
82+
psycopg_params = psycopg_kwargs_from_settings_dict(test_dict)
83+
expected_kwargs = self.PSYCOPG_KWARGS.copy()
84+
expected_kwargs['autocommit'] = True
85+
assert psycopg_params == expected_kwargs
86+
87+
def test_psycopg_kwargs_use(self):
88+
"This assures that the data we get for the kwargs are usable, and demos how to use"
89+
if connection.vendor == 'sqlite':
90+
pytest.skip('Test needs to connect to postgres which is not running')
91+
92+
test_dict = settings.DATABASES['default'].copy()
93+
test_dict['OPTIONS'] = {'autocommit': True}
94+
with override_settings(USE_TZ=False):
95+
psycopg_params = psycopg_kwargs_from_settings_dict(test_dict)
96+
97+
psycopg.connect(**psycopg_params)
98+
99+
def test_listener_string_production(self):
100+
"This is a test to correspond to PG_NOTIFY_DSN_SERVER type settings in eda-server"
101+
with override_settings(USE_TZ=False):
102+
args = psycopg_conn_string_from_settings_dict(
103+
{
104+
"ENGINE": "django.db.backends.postgresql",
105+
"HOST": "127.0.0.1",
106+
"PORT": 5432,
107+
"USER": "postgres",
108+
"PASSWORD": "DB_PASSWORD",
109+
"NAME": "eda",
110+
"OPTIONS": {
111+
"sslmode": "allow",
112+
"sslcert": "",
113+
"sslkey": "",
114+
"sslrootcert": "",
115+
},
116+
}
117+
)
118+
assert args == (
119+
"dbname=eda sslmode=allow sslcert='' sslkey='' sslrootcert='' client_encoding=UTF8 user=postgres password=DB_PASSWORD host=127.0.0.1 port=5432"
120+
)
121+
122+
def test_listener_string_production_use(self):
123+
"This assures that the data we get for the connection string is usable, and demos how to use"
124+
if connection.vendor == 'sqlite':
125+
pytest.skip('Test needs to connect to postgres which is not running')
126+
args = psycopg_conn_string_from_settings_dict(settings.DATABASES['default'])
127+
psycopg.connect(conninfo=args)
128+
129+
17130
class TestAdvisoryLock:
18131
@pytest.fixture(autouse=True)
19132
def skip_if_sqlite(self):
@@ -36,7 +149,7 @@ def background_task(django_db_blocker):
36149
def test_determine_lock_is_held(self, django_db_blocker):
37150
thread = threading.Thread(target=TestAdvisoryLock.background_task, args=(django_db_blocker,))
38151
thread.start()
39-
for _ in range(5):
152+
for _ in range(20):
40153
with advisory_lock('background_task_lock', wait=False) as held:
41154
if held is False:
42155
break

0 commit comments

Comments
 (0)