Skip to content

Commit d3de6bd

Browse files
committed
Initialize task app
Move psycopg param generator to DAB utility Store standard type annotation Make a solution to getting conninfo as a string Add another test and wrap up linters Init migration, connect things Add task container Demo via a test_app view Forgot procfile Forgot to include tasks Change name to durable_task Get subprocess test to run Get multi-server working Adopt new bind=True kwarg Pass through args and kwargs Write task fallback logic Move config into its own module, pop inside utils method Basics of multi-queue working
1 parent f93c8aa commit d3de6bd

32 files changed

+818
-23
lines changed

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ RUN dnf -y install \
1313
libtool-ltdl-devel \
1414
libpq-devel \
1515
libpq \
16-
postgresql
16+
postgresql \
17+
git
1718

1819
# Create /etc/ansible-automation-platform/testapp folder
1920
RUN mkdir -p /etc/ansible-automation-platform/testapp

Procfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
web: python manage.py runserver
2+
task: python manage.py run_dispatcher

ansible_base/lib/dynamic_config/settings_logic.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -329,20 +329,17 @@ def get_mergeable_dab_settings(settings: dict) -> dict: # NOSONAR
329329
}
330330
)
331331

332-
# Set back only changed base keys so the inspect history is accurate
333-
if installed_apps != settings["INSTALLED_APPS"]:
334-
dab_data["INSTALLED_APPS"] = installed_apps
335-
if middleware != settings["MIDDLEWARE"]:
336-
dab_data["MIDDLEWARE"] = middleware
337-
if rest_framework != settings["REST_FRAMEWORK"]:
338-
dab_data["REST_FRAMEWORK"] = rest_framework
339-
if oauth2_provider != settings.get("OAUTH2_PROVIDER", {}):
340-
dab_data["OAUTH2_PROVIDER"] = oauth2_provider
341-
if templates != settings.get("TEMPLATES", []):
342-
dab_data["TEMPLATES"] = templates
343-
if authentication_backends != settings.get("AUTHENTICATION_BACKENDS", []):
344-
dab_data["AUTHENTICATION_BACKENDS"] = authentication_backends
345-
if spectacular_settings != settings.get('SPECTACULAR_SETTINGS', {}):
346-
dab_data['SPECTACULAR_SETTINGS'] = spectacular_settings
332+
if 'ansible_base.task' in installed_apps:
333+
# Queue used for dab_task tasks to re-schedule themselves if there is more work to do
334+
# if you want to use a local queue instead, that would be more performant
335+
dab_data['DAB_TASK_ADMIN_QUEUE'] = 'dab_broadcast'
336+
337+
# Queues that the local node will listen to
338+
# useful to manage different pools of work and types of nodes
339+
# this should probably include the admin queue
340+
dab_data['DAB_TASK_LISTEN_QUEUES'] = ['dab_broadcast']
341+
342+
# Dispatcher spawns python subprocesses, so you can control the number of them here
343+
dab_data['DAB_TASK_MAX_WORKERS'] = 4
347344

348345
return dab_data

ansible_base/lib/utils/db.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
from contextlib import contextmanager
2+
from copy import deepcopy
3+
from typing import Union
24
from zlib import crc32
35

6+
import psycopg
7+
from django.conf import settings
48
from django.db import DEFAULT_DB_ALIAS, connection, connections, transaction
9+
from django.db.backends.postgresql.base import DatabaseWrapper as PsycopgDatabaseWrapper
510
from django.db.migrations.executor import MigrationExecutor
611

712

@@ -149,3 +154,62 @@ def advisory_lock(*args, lock_session_timeout_milliseconds=0, **kwargs):
149154
yield True
150155
else:
151156
raise RuntimeError(f'Advisory lock not implemented for database type {connection.vendor}')
157+
158+
159+
# Django settings.DATABASES['alias'] dictionary type
160+
dj_db_dict = dict[str, Union[str, int]]
161+
162+
163+
def psycopg_connection_from_django(**kwargs) -> psycopg.Connection:
164+
"Compatibility with dispatcher connection factory, just returns the Django connection"
165+
return connection.connection
166+
167+
168+
def psycopg_kwargs_from_settings_dict(settings_dict: dj_db_dict) -> dict:
169+
"""Return psycopg connection creation kwargs given Django db settings info
170+
171+
:param dict setting_dict: DATABASES in Django settings
172+
:return: kwargs that can be passed to psycopg.connect, or connection classes"""
173+
psycopg_params = PsycopgDatabaseWrapper(settings_dict).get_connection_params().copy()
174+
psycopg_params.pop('cursor_factory', None)
175+
psycopg_params.pop('context', None)
176+
return psycopg_params
177+
178+
179+
def psycopg_conn_string_from_settings_dict(settings_dict: dj_db_dict) -> str:
180+
conn_params = psycopg_kwargs_from_settings_dict(settings_dict)
181+
return psycopg.conninfo.make_conninfo(**conn_params)
182+
183+
184+
def combine_settings_dict(settings_dict1: dj_db_dict, settings_dict2: dj_db_dict, **extra_options) -> dj_db_dict:
185+
"""Given two Django settings dictionaries, combine them and return a new settings_dict"""
186+
settings_dict = deepcopy(settings_dict1)
187+
settings_dict['OPTIONS'] = deepcopy(settings_dict.get('OPTIONS', {}))
188+
189+
# These extra options are used by AWX to set application_name
190+
settings_dict['OPTIONS'].update(extra_options)
191+
192+
# Apply overrides specifically for the listener connection
193+
for k, v in settings_dict2.items():
194+
if k != 'OPTIONS':
195+
settings_dict[k] = v
196+
197+
for k, v in settings_dict2.get('OPTIONS', {}).items():
198+
settings_dict['OPTIONS'][k] = v
199+
200+
return settings_dict
201+
202+
203+
def get_pg_notify_params(alias: str = DEFAULT_DB_ALIAS, **extra_options) -> dict:
204+
pg_notify_overrides = {}
205+
if hasattr(settings, 'PG_NOTIFY_DATABASES'):
206+
pg_notify_overrides = settings.PG_NOTIFY_DATABASES.get(alias, {})
207+
elif hasattr(settings, 'LISTENER_DATABASES'):
208+
pg_notify_overrides = settings.LISTENER_DATABASES.get(alias, {})
209+
210+
settings_dict = combine_settings_dict(settings.DATABASES[alias], pg_notify_overrides, **extra_options)
211+
212+
# Reuse the Django postgres DB backend to create params for the psycopg library
213+
psycopg_params = psycopg_kwargs_from_settings_dict(settings_dict)
214+
215+
return psycopg_params

ansible_base/task/__init__.py

Whitespace-only changes.

ansible_base/task/admin.py

Whitespace-only changes.

ansible_base/task/apps.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from dispatcher.config import setup
2+
from django.apps import AppConfig
3+
4+
from ansible_base.task.config import get_config
5+
6+
7+
class TaskConfig(AppConfig):
8+
default_auto_field = 'django.db.models.BigAutoField'
9+
name = 'ansible_base.task'
10+
label = 'dab_tas'
11+
verbose_name = 'DAB tasking system'
12+
13+
def ready(self):
14+
# Set up dispatcher
15+
setup(get_config())

ansible_base/task/config.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from django.conf import settings
2+
3+
from ansible_base.lib.utils.db import get_pg_notify_params
4+
5+
6+
def get_config() -> dict:
7+
"Returns dispatcher config from what is in Django settings"
8+
psycopg_params = get_pg_notify_params()
9+
10+
return {
11+
"version": 2,
12+
"brokers": {
13+
"pg_notify": {
14+
"config": psycopg_params, # used to create the service async connection
15+
"sync_connection_factory": "ansible_base.lib.utils.db.psycopg_connection_from_django",
16+
"channels": settings.DAB_TASK_LISTEN_QUEUES,
17+
# The default publish channel allows using .delay without more arguments
18+
# this may still have task-specific overrides
19+
"default_publish_channel": settings.DAB_TASK_ADMIN_QUEUE,
20+
}
21+
},
22+
"producers": {
23+
"ScheduledProducer": {
24+
"task_schedule": {
25+
"ansible_base.task.tasks.run_task_from_queue": {"schedule": 60},
26+
"ansible_base.task.tasks.manage_lost_tasks": {"schedule": 60 * 10},
27+
}
28+
},
29+
},
30+
"pool": {"max_workers": settings.DAB_TASK_MAX_WORKERS},
31+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import logging
2+
3+
from dispatcher import run_service
4+
from django.core.cache import cache
5+
from django.core.management.base import BaseCommand
6+
from django.db import connection
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class Command(BaseCommand):
12+
help = "Runs bug checking sanity checks, gets scale metrics, and recommendations for Role Based Access Control"
13+
14+
def handle(self, *args, **options):
15+
# Borrowed from eda-server, ensure the database connection is closed
16+
connection.close()
17+
cache.close()
18+
19+
# Configuration is already handled in app .ready method
20+
run_service()
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Generated by Django 4.2.16 on 2025-02-04 21:22
2+
3+
from django.db import migrations, models
4+
import uuid
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
initial = True
10+
11+
dependencies = [
12+
]
13+
14+
operations = [
15+
migrations.CreateModel(
16+
name='Task',
17+
fields=[
18+
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, help_text='UUID that corresponds to the dispatcher task uuid', primary_key=True, serialize=False)),
19+
('state', models.CharField(choices=[('completed', 'Completed'), ('failed', 'Failed'), ('running', 'Running'), ('waiting', 'Waiting')], default='waiting', help_text='Choices of this field track with acknowledgement and completion of a task', max_length=15)),
20+
('name', models.TextField(help_text='Importable path for class or method')),
21+
('created', models.DateTimeField(auto_now_add=True, help_text='Time the publisher (submitter) of this task call created it, approximately the time of submission as well')),
22+
('started_at', models.DateTimeField(help_text='Time of acknowledgement, also approximately the time the task starts', null=True)),
23+
('finished_at', models.DateTimeField(help_text='Time task is cleared (whether failed or succeeded), may be unused if set to auto-delete', null=True)),
24+
],
25+
),
26+
]

0 commit comments

Comments
 (0)