Skip to content

Commit 112540e

Browse files
committed
Move config into its own module, pop inside utils method
1 parent 7a7a928 commit 112540e

File tree

9 files changed

+59
-55
lines changed

9 files changed

+59
-55
lines changed

ansible_base/lib/dynamic_config/settings_logic.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,4 +331,9 @@ def get_dab_settings(
331331
}
332332
)
333333

334+
if 'ansible_base.task' in installed_apps:
335+
# Queue used for dab_task tasks to re-schedule themselves if there is more work to do
336+
# if you want to use a local queue instead, that would be more performant
337+
dab_data['DAB_TASK_ADMIN_QUEUE'] = 'dab_broadcast'
338+
334339
return dab_data

ansible_base/lib/utils/db.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,14 @@ def psycopg_kwargs_from_settings_dict(settings_dict: dj_db_dict) -> dict:
165165
166166
:param dict setting_dict: DATABASES in Django settings
167167
:return: kwargs that can be passed to psycopg.connect, or connection classes"""
168-
return PsycopgDatabaseWrapper(settings_dict).get_connection_params()
168+
psycopg_params = PsycopgDatabaseWrapper(settings_dict).get_connection_params().copy()
169+
psycopg_params.pop('cursor_factory', None)
170+
psycopg_params.pop('context', None)
171+
return psycopg_params
169172

170173

171174
def psycopg_conn_string_from_settings_dict(settings_dict: dj_db_dict) -> str:
172175
conn_params = psycopg_kwargs_from_settings_dict(settings_dict)
173-
conn_params.pop('cursor_factory')
174-
conn_params.pop('context')
175176
return psycopg.conninfo.make_conninfo(**conn_params)
176177

177178

@@ -205,6 +206,5 @@ def get_pg_notify_params(alias: str = DEFAULT_DB_ALIAS, **extra_options) -> dict
205206

206207
# Reuse the Django postgres DB backend to create params for the psycopg library
207208
psycopg_params = psycopg_kwargs_from_settings_dict(settings_dict)
208-
psycopg_params['autocommit'] = True
209209

210210
return psycopg_params

ansible_base/task/__init__.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +0,0 @@
1-
from ansible_base.lib.utils.db import get_pg_notify_params
2-
3-
4-
def get_config():
5-
psycopg_params = get_pg_notify_params()
6-
psycopg_params.pop('autocommit') # dispatcher automatically adds this, causes error, TODO: need pre-check
7-
psycopg_params.pop('cursor_factory')
8-
psycopg_params.pop('context') # TODO: remove in inner method, makes non-async, not good
9-
10-
return {
11-
"version": 2,
12-
"brokers": {
13-
"pg_notify": {
14-
"config": psycopg_params
15-
# TODO: hook in synchronous connection factory
16-
}
17-
},
18-
"producers": {
19-
"ScheduledProducer": {
20-
"task_schedule": {
21-
"ansible_base.task.tasks.run_task_from_queue": {"schedule": 60},
22-
"ansible_base.task.tasks.manage_lost_tasks": {"schedule": 60*10}
23-
}
24-
},
25-
},
26-
"pool": {"max_workers": 4}, # TODO: to settings
27-
}

ansible_base/task/apps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from dispatcher.config import setup
44

5-
from ansible_base.task import get_config
5+
from ansible_base.task.config import get_config
66

77

88
class TaskConfig(AppConfig):

ansible_base/task/config.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from django.conf import settings
2+
3+
from ansible_base.lib.utils.db import get_pg_notify_params
4+
5+
6+
def get_config() -> dict:
7+
"Returns dispatcher config from what is in Django settings"
8+
psycopg_params = get_pg_notify_params()
9+
10+
return {
11+
"version": 2,
12+
"brokers": {
13+
"pg_notify": {
14+
"config": psycopg_params
15+
# TODO: hook in synchronous connection factory
16+
}
17+
},
18+
"producers": {
19+
"ScheduledProducer": {
20+
"task_schedule": {
21+
"ansible_base.task.tasks.run_task_from_queue": {"schedule": 60},
22+
"ansible_base.task.tasks.manage_lost_tasks": {"schedule": 60*10}
23+
}
24+
},
25+
},
26+
"pool": {"max_workers": 4}, # TODO: to settings
27+
}

ansible_base/task/tasks.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from dispatcher.control import Control
66

77
from django.db import transaction
8+
from django.conf import settings
89
from django.utils.timezone import now, timedelta
910

1011
from ansible_base.lib.utils.db import get_pg_notify_params
@@ -14,7 +15,7 @@
1415
logger = logging.getLogger(__name__)
1516

1617

17-
@task(queue='dab_broadcast', bind=True)
18+
@task(queue=settings.DAB_TASK_ADMIN_QUEUE, bind=True)
1819
def run_task_from_queue(dispatcher):
1920
with transaction.atomic():
2021
task = Task.objects.filter(state=TASK_STATES.WAITING).select_for_update().first()
@@ -39,7 +40,7 @@ def run_task_from_queue(dispatcher):
3940
task.delete()
4041

4142

42-
@task(queue='dab_broadcast')
43+
@task(queue=settings.DAB_TASK_ADMIN_QUEUE)
4344
def manage_lost_tasks(grace_period: int = 10):
4445
cutoff_time = now() - timedelta(minutes=grace_period)
4546
for task in Task.objects.filter(state=TASK_STATES.RUNNING, started_at__lt=cutoff_time).iterator():
@@ -48,9 +49,7 @@ def manage_lost_tasks(grace_period: int = 10):
4849
psycopg_params.pop('cursor_factory')
4950
psycopg_params.pop('context') # TODO: remove in inner method, makes non-async, not good
5051

51-
ctl = Control('dab_broadcast', config=psycopg_params)
52-
53-
# TODO: row-level lock for the task
52+
ctl = Control(settings.DAB_TASK_ADMIN_QUEUE, config=psycopg_params)
5453

5554
running_tasks = ctl.control_with_reply('running', data={'uuid': str(task.wrapper_uuid)})
5655

@@ -65,8 +64,13 @@ def manage_lost_tasks(grace_period: int = 10):
6564

6665
if not found:
6766
# TODO: feature of retry policy
68-
logger.warning(f'Could not find task {task.name} {task.wrapper_uuid}, deleting entry')
69-
task.delete()
67+
try:
68+
with transaction.atomic():
69+
task = Task.objects.get(uuid=task.uuid).select_for_update()
70+
logger.warning(f'Could not find task {task.name} {task.wrapper_uuid}, deleting entry')
71+
task.delete()
72+
except Task.DoesNotExist:
73+
logger.debug(f'task {task.name} uuid={task.uuid} already deleted, doing nothing')
7074
else:
7175
delta = now() - task.started_at
7276
logger.info(f'Noticed {task.name} {task.wrapper_uuid} running for {delta} seconds, seems to be fine')

test_app/tests/lib/utils/test_db.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,49 +45,44 @@ class TestPGNotifyConnection:
4545
'port': 55434,
4646
# 'context': <psycopg.adapt.AdaptersMap object at 0x7f537f2d9f70>,
4747
'prepare_threshold': None,
48-
'autocommit': True,
4948
}
5049

5150
@pytest.fixture
5251
def mock_settings(self):
5352
with override_settings(DATABASES=self.TEST_DATABASE_DICT, USE_TZ=False):
5453
yield
5554

56-
def _trim_python_objects(self, psycopg_params):
57-
# These remove those commented-out kwargs in PSYCOPG_KWARGS
58-
psycopg_params.pop('cursor_factory')
59-
psycopg_params.pop('context')
60-
return psycopg_params
61-
6255
def test_default_behavior(self, mock_settings):
63-
params = self._trim_python_objects(get_pg_notify_params())
56+
params = get_pg_notify_params()
6457
assert params == self.PSYCOPG_KWARGS
6558

6659
def test_pg_notify_extra_options(self, mock_settings):
67-
params = self._trim_python_objects(get_pg_notify_params(application_name='joe_connection'))
60+
params = get_pg_notify_params(application_name='joe_connection')
6861
expected = self.PSYCOPG_KWARGS.copy()
6962
expected['application_name'] = 'joe_connection'
7063
assert params == expected
7164

7265
def test_lister_databases(self, mock_settings):
7366
LISTENER_DATABASES = {"default": {"HOST": "https://foo.anotherhost.invalid"}}
7467
with override_settings(LISTENER_DATABASES=LISTENER_DATABASES):
75-
params = self._trim_python_objects(get_pg_notify_params())
68+
params = get_pg_notify_params()
7669
assert params['host'] == "https://foo.anotherhost.invalid"
7770

7871
def test_pg_notify_databases(self, mock_settings):
7972
PG_NOTIFY_DATABASES = {"default": {"HOST": "https://foo.anotherhost2.invalid"}}
8073
with override_settings(PG_NOTIFY_DATABASES=PG_NOTIFY_DATABASES):
81-
params = self._trim_python_objects(get_pg_notify_params())
74+
params = get_pg_notify_params()
8275
assert params['host'] == "https://foo.anotherhost2.invalid"
8376

8477
def test_psycopg_kwargs_from_settings_dict(self):
8578
"More of a unit test, doing the same thing"
8679
test_dict = self.TEST_DATABASE_DICT["default"].copy()
8780
test_dict['OPTIONS'] = {'autocommit': True}
8881
with override_settings(USE_TZ=False):
89-
psycopg_params = self._trim_python_objects(psycopg_kwargs_from_settings_dict(test_dict))
90-
assert psycopg_params == self.PSYCOPG_KWARGS
82+
psycopg_params = psycopg_kwargs_from_settings_dict(test_dict)
83+
expected_kwargs = self.PSYCOPG_KWARGS.copy()
84+
expected_kwargs['autocommit'] = True
85+
assert psycopg_params == expected_kwargs
9186

9287
def test_psycopg_kwargs_use(self):
9388
"This assures that the data we get for the kwargs are usable, and demos how to use"
@@ -97,7 +92,7 @@ def test_psycopg_kwargs_use(self):
9792
test_dict = settings.DATABASES['default'].copy()
9893
test_dict['OPTIONS'] = {'autocommit': True}
9994
with override_settings(USE_TZ=False):
100-
psycopg_params = self._trim_python_objects(psycopg_kwargs_from_settings_dict(test_dict))
95+
psycopg_params = psycopg_kwargs_from_settings_dict(test_dict)
10196

10297
psycopg.connect(**psycopg_params)
10398

test_app/tests/task/test_basic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import pytest
55

6-
from ansible_base.task import get_config
6+
from ansible_base.task.config import get_config
77

88
from test_app.tasks import create_uuid_entry
99
from test_app.models import UUIDModel

test_app/tests/task/test_multi_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import pytest
55

6-
from ansible_base.task import get_config
6+
from ansible_base.task.config import get_config
77

88
from test_app.tasks import create_uuid_entry
99
from test_app.models import UUIDModel

0 commit comments

Comments
 (0)