Skip to content

Commit 91cc137

Browse files
committed
Basics of multi-queue working
1 parent 112540e commit 91cc137

File tree

9 files changed

+78
-28
lines changed

9 files changed

+78
-28
lines changed

ansible_base/lib/dynamic_config/settings_logic.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,4 +336,12 @@ def get_dab_settings(
336336
# if you want to use a local queue instead, that would be more performant
337337
dab_data['DAB_TASK_ADMIN_QUEUE'] = 'dab_broadcast'
338338

339+
# Queues that the local node will listen to
340+
# useful to manage different pools of work and types of nodes
341+
# this should probably include the admin queue
342+
dab_data['DAB_TASK_LISTEN_QUEUES'] = ['dab_broadcast']
343+
344+
# Dispatcher spawns python subprocesses, so you can control the number of them here
345+
dab_data['DAB_TASK_MAX_WORKERS'] = 4
346+
339347
return dab_data

ansible_base/lib/utils/db.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ def advisory_lock(*args, lock_session_timeout_milliseconds=0, **kwargs):
160160
dj_db_dict = dict[str, Union[str, int]]
161161

162162

163+
def psycopg_connection_from_django(**kwargs) -> psycopg.Connection:
164+
"Compatibility with dispatcher connection factory, just returns the Django connection"
165+
return connection.connection
166+
167+
163168
def psycopg_kwargs_from_settings_dict(settings_dict: dj_db_dict) -> dict:
164169
"""Return psycopg connection creation kwargs given Django db settings info
165170

ansible_base/task/config.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ def get_config() -> dict:
1111
"version": 2,
1212
"brokers": {
1313
"pg_notify": {
14-
"config": psycopg_params
15-
# TODO: hook in synchronous connection factory
14+
"config": psycopg_params, # used to create the service async connection
15+
"sync_connection_factory": "ansible_base.lib.utils.db.psycopg_connection_from_django",
16+
"channels": settings.DAB_TASK_LISTEN_QUEUES,
17+
# The default publish channel allows using .delay without more arguments
18+
# this may still have task-specific overrides
19+
"default_publish_channel": settings.DAB_TASK_ADMIN_QUEUE
1620
}
1721
},
1822
"producers": {
@@ -23,5 +27,5 @@ def get_config() -> dict:
2327
}
2428
},
2529
},
26-
"pool": {"max_workers": 4}, # TODO: to settings
30+
"pool": {"max_workers": settings.DAB_TASK_MAX_WORKERS},
2731
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Generated by Django 4.2.16 on 2025-02-25 21:32
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('dab_tas', '0003_task_args_task_kwargs'),
10+
]
11+
12+
operations = [
13+
migrations.AddField(
14+
model_name='task',
15+
name='queue',
16+
field=models.TextField(default='', help_text='The work queue this task is submitted to, only nodes listening to this queue may process the task'),
17+
),
18+
migrations.AlterField(
19+
model_name='task',
20+
name='wrapper_uuid',
21+
field=models.CharField(blank=True, default='', editable=False, help_text='Adopted UUID of DAB task app wrapper, attached when background service starts task', max_length=40),
22+
),
23+
]

ansible_base/task/models.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,17 @@ class Task(models.Model):
2525
"""
2626

2727
uuid = models.UUIDField(primary_key=True, default=uuid4, editable=False, help_text=_('Original dispatcher UUID generated at time of task publishing'))
28-
wrapper_uuid = models.UUIDField(null=True, default=None, editable=False, help_text=_('Adopted UUID of DAB task app wrapper, attached when background service starts task'))
28+
# TODO: change back to UUIDField, workaround dispatcher using non-UUID uuids for now
29+
# null=True
30+
wrapper_uuid = models.CharField(blank=True, default='', max_length=40, editable=False, help_text=_('Adopted UUID of DAB task app wrapper, attached when background service starts task'))
2931
state = models.CharField(
3032
choices=[(s, s.title()) for s in sorted(vars(TASK_STATES).values())],
3133
default=TASK_STATES.WAITING,
3234
max_length=15,
3335
help_text=_('Choices of this field track with acknowledgement and completion of a task'),
3436
)
3537
name = models.TextField(help_text=_('Importable path for class or method'))
38+
queue = models.TextField(default='', help_text=_('The work queue this task is submitted to, only nodes listening to this queue may process the task'))
3639

3740
args = models.JSONField(
3841
default=dict, null=False, blank=True, help_text=_("The arguments for the task run.")

ansible_base/task/publish.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from dispatcher.utils import serialize_task
22
from django.db import transaction
3+
from django.conf import settings
34

45
from ansible_base.task.models import Task
56
from ansible_base.task.tasks import run_task_from_queue
@@ -8,38 +9,44 @@
89

910

1011
class TaskPublisher:
11-
def __init__(self, fn):
12+
def __init__(self, fn, queue=None):
1213
self.fn = fn
14+
self.queue = queue
1315

1416
@property
1517
def task_name(self):
1618
return serialize_task(self.fn)
1719

20+
def submit_wrapper_task(self):
21+
run_task_from_queue.apply_async(queue=self.queue)
22+
1823
def apply_async(self, args=None, kwargs=None):
1924
# this function may allow additional arguments in the future, but not now
20-
Task.objects.create(name=self.task_name, args=args, kwargs=kwargs)
25+
if self.queue is None:
26+
queue = settings.DAB_TASK_ADMIN_QUEUE
27+
else:
28+
queue = self.queue
29+
Task.objects.create(name=self.task_name, args=args, kwargs=kwargs, queue=queue)
2130

2231
# pg_notify message (probably) to wake up
23-
transaction.on_commit(run_task_from_queue.delay)
32+
transaction.on_commit(self.submit_wrapper_task)
2433

2534
def delay(self, *args, **kwargs):
2635
return self.apply_async(args=args, kwargs=kwargs)
2736

2837

2938
class TaskDecorator:
30-
def __init__(self, *args, **kwargs):
31-
# TODO: this will process a task timeout, just not yet set up
32-
self.args = args
33-
self.kwargs = kwargs
39+
def __init__(self, queue=None):
40+
self.queue = queue
3441

3542
def __call__(self, fn):
36-
publisher = TaskPublisher(fn)
43+
publisher = TaskPublisher(fn, queue=self.queue)
3744

3845
setattr(fn, 'apply_async', publisher.apply_async)
3946
setattr(fn, 'delay', publisher.delay)
4047

4148
return fn
4249

4350

44-
def durable_task():
45-
return TaskDecorator()
51+
def durable_task(queue=None):
52+
return TaskDecorator(queue=queue)

ansible_base/task/tasks.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@
22

33
from dispatcher.utils import resolve_callable
44
from dispatcher.publish import task
5-
from dispatcher.control import Control
5+
from dispatcher.factories import get_control_from_settings
66

77
from django.db import transaction
88
from django.conf import settings
99
from django.utils.timezone import now, timedelta
1010

11-
from ansible_base.lib.utils.db import get_pg_notify_params
12-
1311
from ansible_base.task.models import TASK_STATES, Task
1412

1513
logger = logging.getLogger(__name__)
@@ -18,11 +16,11 @@
1816
@task(queue=settings.DAB_TASK_ADMIN_QUEUE, bind=True)
1917
def run_task_from_queue(dispatcher):
2018
with transaction.atomic():
21-
task = Task.objects.filter(state=TASK_STATES.WAITING).select_for_update().first()
19+
task = Task.objects.filter(state=TASK_STATES.WAITING, queue__in=settings.DAB_TASK_LISTEN_QUEUES).select_for_update().first()
2220
if task:
2321
task.state = TASK_STATES.RUNNING
2422
task.started_at = now()
25-
task.wrapper_uuid = dispatcher.uuid
23+
task.wrapper_uuid = str(dispatcher.uuid)
2624
task.save(update_fields=['state', 'started_at', 'wrapper_uuid'])
2725
else:
2826
return
@@ -35,21 +33,16 @@ def run_task_from_queue(dispatcher):
3533
task_callable = resolve_callable(task.name)
3634
task_callable(*task.args, **task.kwargs)
3735
except Exception:
38-
logger.traceback(f'Failed to run and complete {task.name}')
36+
logger.exception(f'Failed to run and complete {task.name}')
3937

4038
task.delete()
4139

4240

4341
@task(queue=settings.DAB_TASK_ADMIN_QUEUE)
4442
def manage_lost_tasks(grace_period: int = 10):
4543
cutoff_time = now() - timedelta(minutes=grace_period)
46-
for task in Task.objects.filter(state=TASK_STATES.RUNNING, started_at__lt=cutoff_time).iterator():
47-
psycopg_params = get_pg_notify_params()
48-
psycopg_params.pop('autocommit') # dispatcher automatically adds this, causes error, TODO: need pre-check
49-
psycopg_params.pop('cursor_factory')
50-
psycopg_params.pop('context') # TODO: remove in inner method, makes non-async, not good
51-
52-
ctl = Control(settings.DAB_TASK_ADMIN_QUEUE, config=psycopg_params)
44+
for task in Task.objects.filter(state=TASK_STATES.RUNNING, started_at__lt=cutoff_time, queue__in=settings.DAB_TASK_LISTEN_QUEUES).iterator():
45+
ctl = get_control_from_settings(default_publish_channel=settings.DAB_TASK_ADMIN_QUEUE)
5346

5447
running_tasks = ctl.control_with_reply('running', data={'uuid': str(task.wrapper_uuid)})
5548

@@ -66,7 +59,7 @@ def manage_lost_tasks(grace_period: int = 10):
6659
# TODO: feature of retry policy
6760
try:
6861
with transaction.atomic():
69-
task = Task.objects.get(uuid=task.uuid).select_for_update()
62+
task = Task.objects.select_for_update().get(uuid=task.uuid)
7063
logger.warning(f'Could not find task {task.name} {task.wrapper_uuid}, deleting entry')
7164
task.delete()
7265
except Task.DoesNotExist:

test_app/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@
196196
ALLOW_SHARED_RESOURCE_CUSTOM_ROLES = True # Allow making custom roles with org change permission, for example
197197
ALLOW_LOCAL_ASSIGNING_JWT_ROLES = False
198198

199+
DAB_TASK_LISTEN_QUEUES = ['dab_broadcast', 'dab_task_tasks']
200+
199201
ANSIBLE_BASE_USER_VIEWSET = 'test_app.views.UserViewSet'
200202

201203
LOGIN_URL = "/login/login"

test_app/tasks.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ def hello_world():
1111
print('hello world')
1212

1313

14+
@durable_task(queue='dab_task_tasks')
15+
def hello_world_other_queue():
16+
print('hello world, sent from test_app_tasks queue')
17+
18+
1419
@durable_task()
1520
def create_uuid_entry(uuid: UUID):
1621
UUIDModel.objects.create(id=uuid)

0 commit comments

Comments
 (0)