Skip to content

Commit 3d15dfb

Browse files
authored
AAP-43949 Add psycopg db utils for dispatcherd use (#715)
## Description We have pushed out our objective merge date for DAB task in the general, which is at #702, and doing this creates a need to be able to feature-flag a release using https://github.com/ansible/dispatcherd Using dispatcherd by itself doesn't need most of the content from DAB task, but it needs _some utility_ type methods. As we flip-flopped on the exact order of operations for this, I had previously put up #686, and closed it _in favor of_ DAB task. Some content was added while I worked on DAB task, so I created a new branch (this one) and put the updated content there. Why? Merging this will allow the EDA-server and AWX work to de-duplicate code that was copy+pasted there. ansible/awx@devel...feature_dispatcher#diff-60fa214144754e9e3cf12cc5c7054d8b0ce04f43da33181c0fe9f0d927627038 ansible/eda-server@main...dispatcher-poc-v2#diff-ff14e1e048588b54cbac15edc61d4c830abc8a87cdae26b7fb603d52f3fe1960 Those were always intended to go live in DAB. Next question - why can't we just put them in the dispatcherd repo? Because _it does not have Django_. This is very important. I do intend to write some docs in dispatcherd that will reference this content. Ping @bzwei for review ## Type of Change - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Documentation update - [x] Test update - [x] Refactoring (no functional changes) - [ ] Development environment change - [ ] Configuration change ## Self-Review Checklist - [x] I have performed a self-review of my code - [x] I have added relevant comments to complex code sections - [ ] I have updated documentation where needed - [ ] I have considered the security impact of these changes - [ ] I have considered performance implications - [ ] I have thought about error handling and edge cases - [x] I have tested the changes in my local environment ## Testing Instructions This probably needs to be tested with the respective EDA or AWX branches. They have been working, this just consolidates these methods. ### Prerequisites <!-- List any specific setup required --> ### Steps to Test 1. 2. 3. ### Expected Results <!-- Describe what should happen after following the steps --> ## Additional Context <!-- Optional but helpful information --> ### Required Actions - [ ] Requires documentation updates - [x] Requires downstream repository changes - [ ] Requires infrastructure/deployment changes - [x] Requires coordination with other teams - [ ] Blocked by PR/MR: #XXX ### Screenshots/Logs <!-- Add if relevant to demonstrate the changes -->
1 parent a1f565d commit 3d15dfb

File tree

2 files changed

+213
-2
lines changed

2 files changed

+213
-2
lines changed

ansible_base/lib/utils/db.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import logging
22
from contextlib import contextmanager
3+
from copy import deepcopy
4+
from typing import Union
35
from zlib import crc32
46

7+
import psycopg
8+
from django.conf import settings
59
from django.db import DEFAULT_DB_ALIAS, OperationalError, connection, connections, transaction
10+
from django.db.backends.postgresql.base import DatabaseWrapper as PsycopgDatabaseWrapper
611
from django.db.migrations.executor import MigrationExecutor
712

813
logger = logging.getLogger(__name__)
@@ -164,3 +169,82 @@ def advisory_lock(*args, lock_session_timeout_milliseconds=0, **kwargs):
164169
yield True
165170
else:
166171
raise RuntimeError(f'Advisory lock not implemented for database type {connection.vendor}')
172+
173+
174+
# Django settings.DATABASES['alias'] dictionary type
175+
dj_db_dict = dict[str, Union[str, int]]
176+
177+
178+
def psycopg_connection_from_django(**kwargs) -> psycopg.Connection:
179+
"""Compatibility with dispatcherd connection factory, just returns the Django connection
180+
181+
dispatcherd passes config info as kwargs, but in this case we just want to ignore then.
182+
Because the point of this it to not reconnect, but rely on existing Django connection management.
183+
"""
184+
if connection.connection is None:
185+
connection.ensure_connection()
186+
return connection.connection
187+
188+
189+
def psycopg_kwargs_from_settings_dict(settings_dict: dj_db_dict) -> dict:
190+
"""Return psycopg connection creation kwargs given Django db settings info
191+
192+
:param dict setting_dict: DATABASES in Django settings
193+
:return: kwargs that can be passed to psycopg.connect, or connection classes"""
194+
psycopg_params = PsycopgDatabaseWrapper(settings_dict).get_connection_params().copy()
195+
psycopg_params.pop('cursor_factory', None)
196+
psycopg_params.pop('context', None)
197+
return psycopg_params
198+
199+
200+
def psycopg_conn_string_from_settings_dict(settings_dict: dj_db_dict) -> str:
201+
"""Returns a string that psycopg can take as conninfo for Connection class.
202+
203+
Example return value: "dbname=postgres user=postgres"
204+
"""
205+
conn_params = psycopg_kwargs_from_settings_dict(settings_dict)
206+
return psycopg.conninfo.make_conninfo(**conn_params)
207+
208+
209+
def combine_settings_dict(settings_dict1: dj_db_dict, settings_dict2: dj_db_dict, **extra_options) -> dj_db_dict:
210+
"""Given two Django database settings dictionaries, combine them and return a new settings_dict"""
211+
settings_dict = deepcopy(settings_dict1)
212+
213+
# Apply overrides specifically for the listener connection
214+
for k, v in settings_dict2.items():
215+
if k != 'OPTIONS':
216+
settings_dict[k] = v
217+
218+
# Merge the database OPTIONS
219+
# https://docs.djangoproject.com/en/5.2/ref/databases/#postgresql-connection-settings
220+
# These are not expected to be nested, as they are psycopg params
221+
settings_dict.setdefault('OPTIONS', {})
222+
# extra_options are used by AWX to set application_name, which is generally a good idea
223+
settings_dict['OPTIONS'].update(extra_options)
224+
# Apply overrides from nested OPTIONS for the listener connection
225+
for k, v in settings_dict2.get('OPTIONS', {}).items():
226+
settings_dict['OPTIONS'][k] = v
227+
228+
return settings_dict
229+
230+
231+
def get_pg_notify_params(alias: str = DEFAULT_DB_ALIAS, **extra_options) -> dict:
232+
"""Returns a dictionary that can be used as kwargs to create a psycopg.Connection
233+
234+
This should use the same connection parameters as Django does.
235+
However, this also allows overrides specified by
236+
- PG_NOTIFY_DATABASES, higher precedence, preferred setting
237+
- LISTENER_DATABASES, lower precedence, deprecated AWX setting.
238+
"""
239+
pg_notify_overrides = {}
240+
if hasattr(settings, 'PG_NOTIFY_DATABASES'):
241+
pg_notify_overrides = settings.PG_NOTIFY_DATABASES.get(alias, {})
242+
elif hasattr(settings, 'LISTENER_DATABASES'):
243+
pg_notify_overrides = settings.LISTENER_DATABASES.get(alias, {})
244+
245+
settings_dict = combine_settings_dict(settings.DATABASES[alias], pg_notify_overrides, **extra_options)
246+
247+
# Reuse the Django postgres DB backend to create params for the psycopg library
248+
psycopg_params = psycopg_kwargs_from_settings_dict(settings_dict)
249+
250+
return psycopg_params

test_app/tests/lib/utils/test_db.py

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,21 @@
11
import threading
22
import time
33

4+
import psycopg
45
import pytest
6+
from django.conf import settings
57
from django.db import connection
68
from django.db.utils import OperationalError
9+
from django.test import override_settings
710

8-
from ansible_base.lib.utils.db import advisory_lock, migrations_are_complete
11+
from ansible_base.lib.utils.db import (
12+
advisory_lock,
13+
get_pg_notify_params,
14+
migrations_are_complete,
15+
psycopg_conn_string_from_settings_dict,
16+
psycopg_connection_from_django,
17+
psycopg_kwargs_from_settings_dict,
18+
)
919

1020

1121
@pytest.mark.django_db
@@ -21,6 +31,123 @@ def skip_if_sqlite(self):
2131
pytest.skip('Advisory lock is not written for sqlite')
2232

2333

34+
class TestPGNotifyConnection(SkipIfSqlite):
35+
TEST_DATABASE_DICT = {
36+
"default": {
37+
"ENGINE": "django.db.backends.postgresql",
38+
"HOST": "https://foo.invalid",
39+
"PORT": 55434,
40+
"USER": "dab_user",
41+
"PASSWORD": "dabbing",
42+
"NAME": "dab_db",
43+
}
44+
}
45+
PSYCOPG_KWARGS = {
46+
'dbname': 'dab_db',
47+
'client_encoding': 'UTF8',
48+
# kwargs containing objects can not be compared so they will be ignored
49+
# 'cursor_factory': <class 'django.db.backends.postgresql.base.Cursor'>,
50+
'user': 'dab_user',
51+
'password': 'dabbing',
52+
'host': 'https://foo.invalid',
53+
'port': 55434,
54+
# 'context': <psycopg.adapt.AdaptersMap object at 0x7f537f2d9f70>,
55+
'prepare_threshold': None,
56+
}
57+
58+
@pytest.fixture
59+
def mock_settings(self):
60+
with override_settings(DATABASES=self.TEST_DATABASE_DICT, USE_TZ=False):
61+
yield
62+
63+
def test_default_behavior(self, mock_settings):
64+
params = get_pg_notify_params()
65+
assert params == self.PSYCOPG_KWARGS
66+
67+
def test_pg_notify_extra_options(self, mock_settings):
68+
params = get_pg_notify_params(application_name='joe_connection')
69+
expected = self.PSYCOPG_KWARGS.copy()
70+
expected['application_name'] = 'joe_connection'
71+
assert params == expected
72+
73+
def test_lister_databases(self, mock_settings):
74+
LISTENER_DATABASES = {"default": {"HOST": "https://foo.anotherhost.invalid"}}
75+
with override_settings(LISTENER_DATABASES=LISTENER_DATABASES):
76+
params = get_pg_notify_params()
77+
assert params['host'] == "https://foo.anotherhost.invalid"
78+
79+
def test_pg_notify_databases(self, mock_settings):
80+
PG_NOTIFY_DATABASES = {"default": {"HOST": "https://foo.anotherhost2.invalid"}}
81+
with override_settings(PG_NOTIFY_DATABASES=PG_NOTIFY_DATABASES):
82+
params = get_pg_notify_params()
83+
assert params['host'] == "https://foo.anotherhost2.invalid"
84+
85+
def test_psycopg_kwargs_from_settings_dict(self):
86+
"More of a unit test, doing the same thing"
87+
test_dict = self.TEST_DATABASE_DICT["default"].copy()
88+
test_dict['OPTIONS'] = {'autocommit': True}
89+
with override_settings(USE_TZ=False):
90+
psycopg_params = psycopg_kwargs_from_settings_dict(test_dict)
91+
expected_kwargs = self.PSYCOPG_KWARGS.copy()
92+
expected_kwargs['autocommit'] = True
93+
assert psycopg_params == expected_kwargs
94+
95+
def test_psycopg_kwargs_use(self):
96+
"This assures that the data we get for the kwargs are usable, and demos how to use"
97+
if connection.vendor == 'sqlite':
98+
pytest.skip('Test needs to connect to postgres which is not running')
99+
100+
test_dict = settings.DATABASES['default'].copy()
101+
test_dict['OPTIONS'] = {'autocommit': True}
102+
with override_settings(USE_TZ=False):
103+
psycopg_params = psycopg_kwargs_from_settings_dict(test_dict)
104+
105+
psycopg.connect(**psycopg_params)
106+
107+
def test_listener_string_production(self):
108+
"This is a test to correspond to PG_NOTIFY_DSN_SERVER type settings in eda-server"
109+
with override_settings(USE_TZ=False):
110+
args = psycopg_conn_string_from_settings_dict(
111+
{
112+
"ENGINE": "django.db.backends.postgresql",
113+
"HOST": "127.0.0.1",
114+
"PORT": 5432,
115+
"USER": "postgres",
116+
"PASSWORD": "DB_PASSWORD",
117+
"NAME": "eda",
118+
"OPTIONS": {
119+
"sslmode": "allow",
120+
"sslcert": "",
121+
"sslkey": "",
122+
"sslrootcert": "",
123+
},
124+
}
125+
)
126+
assert args == (
127+
"dbname=eda sslmode=allow sslcert='' sslkey='' sslrootcert='' client_encoding=UTF8 user=postgres password=DB_PASSWORD host=127.0.0.1 port=5432"
128+
)
129+
130+
def test_listener_string_production_use(self):
131+
"This assures that the data we get for the connection string is usable, and demos how to use"
132+
if connection.vendor == 'sqlite':
133+
pytest.skip('Test needs to connect to postgres which is not running')
134+
args = psycopg_conn_string_from_settings_dict(settings.DATABASES['default'])
135+
psycopg.connect(conninfo=args)
136+
137+
@pytest.mark.django_db
138+
def test_psycopg_connection_from_django_existing_conn(self):
139+
if connection.vendor == 'sqlite':
140+
pytest.skip('Advisory lock is not written for sqlite')
141+
assert isinstance(psycopg_connection_from_django(), psycopg.Connection)
142+
143+
@pytest.mark.django_db(transaction=True)
144+
def test_psycopg_connection_from_django_new_conn(self):
145+
if connection.vendor == 'sqlite':
146+
pytest.skip('Advisory lock is not written for sqlite')
147+
connection.close()
148+
assert isinstance(psycopg_connection_from_django(), psycopg.Connection)
149+
150+
24151
class TestAdvisoryLock(SkipIfSqlite):
25152
THREAD_WAIT_TIME = 0.1
26153

@@ -40,7 +167,7 @@ def background_task(django_db_blocker):
40167
def test_determine_lock_is_held(self, django_db_blocker):
41168
thread = threading.Thread(target=TestAdvisoryLock.background_task, args=(django_db_blocker,))
42169
thread.start()
43-
for _ in range(5):
170+
for _ in range(20):
44171
with advisory_lock('background_task_lock', wait=False) as held:
45172
if held is False:
46173
break

0 commit comments

Comments
 (0)