Skip to content

Commit de7ad1b

Browse files
committed
Initialize task app
1 parent 26c318a commit de7ad1b

File tree

13 files changed

+257
-1
lines changed

13 files changed

+257
-1
lines changed

ansible_base/task/__init__.py

Whitespace-only changes.

ansible_base/task/admin.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from django.contrib import admin
2+
3+
# Register your models here.

ansible_base/task/apps.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from django.apps import AppConfig
2+
3+
4+
class TaskConfig(AppConfig):
5+
default_auto_field = 'django.db.models.BigAutoField'
6+
name = 'ansible_base.task'
7+
label = 'dab_tas'
8+
verbose_name = 'DAB tasking system'
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from dispatcher.main import DispatcherMain
2+
3+
from django.core.management.base import BaseCommand
4+
5+
6+
class Command(BaseCommand):
7+
help = "Runs bug checking sanity checks, gets scale metrics, and recommendations for Role Based Access Control"
8+
9+
def handle(self, *args, **options):
10+
dispatcher_config = {
11+
"producers": {
12+
"brokers": {
13+
"pg_notify": {"conninfo": settings.PG_NOTIFY_DSN_SERVER},
14+
# TODO: sanitize or escape channel names on dispatcher side
15+
"channels": [
16+
"dab_broadcast"
17+
],
18+
},
19+
# NOTE: I would prefer to move the activation monitoring
20+
# from worker to activation, but that is more work
21+
"scheduled": {},
22+
},
23+
"pool": {"max_workers": 4},
24+
}
25+
26+
DispatcherMain()

ansible_base/task/migrations/__init__.py

Whitespace-only changes.

ansible_base/task/models.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from uuid import uuid4
2+
from types import SimpleNamespace
3+
4+
from django.db import models
5+
from django.utils.translation import gettext_lazy as _
6+
from django.utils.timezone import now
7+
8+
# NOTE: tasks are not registered in the database, but in a dispatcher registry
9+
10+
TASK_STATES = SimpleNamespace(
11+
WAITING="waiting",
12+
# SKIPPED="skipped",
13+
RUNNING="running",
14+
COMPLETED="completed",
15+
FAILED="failed",
16+
# CANCELED="canceled",
17+
# CANCELING="canceling",
18+
)
19+
20+
21+
class Task(models.Model):
22+
"""
23+
Corresponds to a call of a task, as a higher-level abstraction around the dispatcher.
24+
Loosely modeled after pulpcore.Task
25+
"""
26+
uuid = models.UUIDField(
27+
primary_key=True, default=uuid4, editable=False,
28+
help_text=_('UUID that corresponds to the dispatcher task uuid')
29+
)
30+
state = models.CharField(
31+
choices=[(s, s.title()) for s in sorted(vars(TASK_STATES).values())],
32+
default=TASK_STATES.WAITING,
33+
help_text=_('Choices of this field track with acknowledgement and completion of a task')
34+
)
35+
name = models.TextField(
36+
help_text=_('Importable path for class or method')
37+
)
38+
39+
created = models.DateTimeField(
40+
auto_now_add=True,
41+
help_text=_('Time the publisher (submitter) of this task call created it, approximately the time of submission as well')
42+
)
43+
# pulp has unblocking logic, like unblocked_at, we have no plans for that here
44+
started_at = models.DateTimeField(
45+
null=True,
46+
help_text=_('Time of acknowledgement, also approximately the time the task starts')
47+
)
48+
finished_at = models.DateTimeField(
49+
null=True,
50+
help_text=_('Time task is cleared (whether failed or succeeded), may be unused if set to auto-delete')
51+
)
52+
53+
def mark_ack(self):
54+
self.state = TASK_STATES.RUNNING
55+
self.started_at = now()
56+
self.save(update_fields=['state', 'started_at'])
57+
58+
def mark_completed(self):
59+
self.state = TASK_STATES.COMPLETED
60+
self.finished_at = now()
61+
62+
def mark_failed(self):
63+
self.state = TASK_STATES.FAILED
64+
self.finished_at = now()

ansible_base/task/publish.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from django.db import transaction
2+
3+
from dispatcher.utils import serialize_task
4+
5+
from ansible_base.task.tasks import run_task_from_queue
6+
from ansible_base.task.models import Task
7+
8+
# decorator structure is taken from dispatcher.publish
9+
10+
class TaskPublisher:
11+
def __init__(self, fn):
12+
self.fn = fn
13+
14+
@property
15+
def task_name(self):
16+
return serialize_task(self.fn)
17+
18+
def apply_async(self, args=None, kwargs=None):
19+
# this function may allow additional arguments in the future, but not now
20+
Task.objects.create(name=self.task_name)
21+
22+
transaction.on_commit(run_task_from_queue.delay)
23+
24+
def delay(self, *args, **kwargs):
25+
return self.apply_async(args=args, kwargs=kwargs)
26+
27+
28+
class TaskDecorator:
29+
def __init__(self, *args, **kwargs):
30+
self.args = args
31+
self.kwargs = kwargs
32+
33+
def __call__(self, fn):
34+
publisher = TaskPublisher(fn)
35+
36+
setattr(fn, 'apply_async', publisher.apply_async)
37+
setattr(fn, 'delay', publisher.delay)
38+
39+
return fn

ansible_base/task/tasks.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import logging
2+
3+
from ansible_base.task.models import Task, TASK_STATES
4+
5+
from dispatcher.utils import resolve_callable
6+
7+
from django.db import transaction
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def run_task_from_queue():
13+
with transaction.atomic():
14+
task = Task.objects.filter(state=TASK_STATES.WAITING).select_for_update().first()
15+
if task:
16+
task.mark_ack()
17+
else:
18+
return
19+
20+
# for responsiveness with bursts of tasks
21+
if Task.objects.filter(state=TASK_STATES.WAITING).exists():
22+
run_task_from_queue.delay()
23+
24+
try:
25+
task_callable = resolve_callable(task.name)
26+
task_callable()
27+
except Exception:
28+
logger.traceback(f'Failed to run and complete {task.name}')
29+
30+
task.delete()

docs/apps/task/design.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
## DAB Task System Design
2+
3+
The closest analog to the goals of this project may be the pulpcore task system
4+
5+
https://github.com/pulp/pulpcore/tree/main/pulpcore/tasking
6+
7+
The main loop is in the `worker` module under that. Like AWX, they use `select.select`,
8+
and combined with `add_notify_handler`.
9+
10+
This accomplishes the goal of getting timely "wakeups" when a task is submitted.
11+
12+
https://www.psycopg.org/psycopg3/docs/advanced/async.html
13+
14+
> Alternatively, you can use add_notify_handler() to register a callback function, which will be invoked whenever a notification is received, during the normal query processing; you will be then able to use the connection normally. Please note that in this case notifications will not be received immediately, but only during a connection operation, such as a query.
15+
16+
After control is returned, `unblock_tasks` loops over `Task` objects in order of created time.
17+
Based on the `Task` `status` field, it will take an action.
18+
19+
Since uuid primary keys are standard in pulp, the default `pulp_id` field is a valid uuid for the task.
20+
21+
While doing this, postgres advisory locks are used significantly to avoid errors from multiple simultaneous processes doing something.

docs/apps/task/usage.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
## DAB Task
2+
3+
This app "dab_task" provides a tasking system (it can run tasks in the background).
4+
This is an overlay on top of the dispatcher library:
5+
6+
https://github.com/ansible/dispatcher
7+
8+
The key differences of using dab_task as opposed to the dispatcher directly are:
9+
10+
- Tasks are submitted to a true queue backed by postgres
11+
- Formal message ACK de-duplicates submissions based on its UUID
12+
- Rollbacks for database changes are possible, allowing for a number of retries
13+
14+
The dispatcher library, itself, is intended to be lower-level,
15+
to accomidate cases where an app implements these things on its own.
16+
17+
Obtaining these things come at a cost, mainly that almost all interactions
18+
require an additional database interaction.
19+
20+
### Configuration
21+
22+
Decorate tasks to allow them to be ran as background tasks.
23+
24+
```python
25+
from ansible_base.dispatcher.publish import task
26+
27+
@task()
28+
def hello_world():
29+
print('hello world')
30+
```
31+
32+
### Service
33+
34+
To run the service, use the management command
35+
36+
```
37+
python manage.py run_dispatcher
38+
```
39+
40+
### Publisher
41+
42+
Submit a task by importing it and using the interface from celery.
43+
44+
```python
45+
from test_app.tasks import hello_world
46+
47+
hello_world.delay()
48+
```
49+
50+
This will print "hello world" in the dispatcher service.

0 commit comments

Comments
 (0)