Skip to content

Commit 0f74649

Browse files
committed
Update model structure
1 parent 028ce33 commit 0f74649

File tree

10 files changed

+157
-170
lines changed

10 files changed

+157
-170
lines changed

galahad/admin.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def rerun(modeladmin, request, queryset):
1414
if succeeded:
1515
messages.warning(request, "Only failed tasks can be retried. %s tasks have been skipped" % succeeded)
1616
counter = 0
17-
for obj in queryset.filter(~queryset.is_completed).iterator():
17+
for obj in queryset.not_succeeded().iterator():
1818
obj.enqueue()
1919
counter += 1
2020
messages.success(request, "%s tasks have been successfully queued" % counter)
@@ -37,32 +37,48 @@ def child_tasks(self, obj):
3737
actions = ('rerun',)
3838

3939
list_display = (
40-
'content_type',
4140
'node_name',
41+
'status',
42+
'node_type',
43+
'content_type',
4244
'completed',
43-
'failed',
45+
'modified',
46+
'created',
4447
)
4548

4649
readonly_fields = (
4750
'process',
4851
'node_name',
52+
'node_type',
4953
'parent_task_set',
5054
'child_tasks',
5155
'completed',
52-
'failed',
56+
'created',
57+
'modified',
5358
'exception',
5459
'pretty_stacktrace',
5560
)
5661

5762
list_filter = (
58-
'completed',
59-
'failed',
63+
'status',
64+
'node_type',
6065
'content_type',
66+
'completed',
67+
'created',
6168
)
6269

6370
fieldsets = (
6471
(None, {
65-
'fields': ('process', 'node_name', 'parent_task_set', 'child_tasks', 'completed', 'failed', 'exception')
72+
'fields': (
73+
'process',
74+
'node_name',
75+
'parent_task_set',
76+
'child_tasks',
77+
'completed',
78+
'modified',
79+
'created',
80+
'exception'
81+
),
6682
}),
6783
(t('Traceback'), {
6884
'classes': ('collapse',),

galahad/migrations/0001_initial.py

Lines changed: 0 additions & 49 deletions
This file was deleted.

galahad/migrations/__init__.py

Whitespace-only changes.

galahad/models.py

Lines changed: 109 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import random
32
import sys
43
import traceback
54

@@ -70,8 +69,8 @@ class Process(models.Model, metaclass=BaseProcess):
7069
:attr:`.edges`.
7170
"""
7271
id = models.BigAutoField(primary_key=True, editable=False)
73-
started = models.DateTimeField(auto_now_add=True, db_index=True)
74-
completed = models.DateTimeField(blank=True, null=True, editable=False, db_index=True)
72+
created = models.DateTimeField(auto_now_add=True, db_index=True)
73+
modified = models.DateTimeField(auto_now=True, db_index=True)
7574

7675
task_set = GenericRelation('galahad.Task', object_id_field='_process_id')
7776

@@ -267,60 +266,112 @@ def get_instance_graph_svg(self, output_format='svg'):
267266
graph.format = output_format
268267
return SafeString(graph.pipe().decode('utf-8'))
269268

269+
def save(self, **kwargs):
270+
if self.pk:
271+
try:
272+
update_fields = kwargs['update_fields']
273+
except KeyError:
274+
pass
275+
else:
276+
update_fields.append('modified')
277+
super().save(**kwargs)
278+
270279

271280
def process_subclasses():
272281
from django.apps import apps
273282

274283
apps.check_models_ready()
275284
query = models.Q()
276-
for app_models in apps.all_models.values():
277-
for model in app_models.values():
278-
if issubclass(model, Process) and model is not Process:
279-
opts = model._meta
280-
query |= models.Q(app_label=opts.app_label, model=opts.model_name)
285+
for model in apps.get_models():
286+
if issubclass(model, Process) and model is not Process:
287+
opts = model._meta
288+
query |= models.Q(app_label=opts.app_label, model=opts.model_name)
281289
return query
282290

283291

284292
class TasksQuerySet(models.query.QuerySet):
285-
is_completed = models.Q(completed__isnull=False)
286-
is_failed = models.Q(failed__isnull=False)
287293

288294
def scheduled(self):
289-
return self.filter(~(self.is_failed | self.is_completed))
295+
return self.filter(status=self.model.SCHEDULED)
290296

291297
def succeeded(self):
292-
return self.filter(self.is_completed)
298+
return self.filter(status=self.model.SUCCEEDED)
299+
300+
def not_succeeded(self):
301+
return self.exclude(status=self.model.SUCCEEDED)
293302

294303
def failed(self):
295-
return self.filter(self.is_failed)
304+
return self.filter(status=self.model.FAILED)
296305

297306

298307
class Task(models.Model):
299308
id = models.BigAutoField(primary_key=True, editable=False)
300-
_process = models.ForeignKey('galahad.Process', on_delete=models.CASCADE, db_column='process_id', editable=False)
309+
_process = models.ForeignKey(
310+
'galahad.Process',
311+
on_delete=models.CASCADE,
312+
db_column='process_id'
313+
, editable=False,
314+
)
301315
content_type = models.ForeignKey(
302-
'contenttypes.ContentType', on_delete=models.CASCADE,
303-
editable=False, limit_choices_to=process_subclasses,
316+
'contenttypes.ContentType',
317+
on_delete=models.CASCADE,
318+
editable=False,
319+
limit_choices_to=process_subclasses,
304320
related_name='galahad_task_set',
305321
)
306322
process = GenericForeignKey('content_type', '_process_id')
323+
307324
node_name = models.TextField(db_index=True, editable=False)
308-
parent_task_set = models.ManyToManyField('self', related_name='child_task_set', editable=False, symmetrical=False)
309325

310-
assignees = models.ManyToManyField(settings.AUTH_USER_MODEL, verbose_name=t('assignees'), related_name='galahad_task_set')
326+
HUMAN = 'human'
327+
MACHINE = 'machine'
328+
_node_type_choices = (
329+
(HUMAN, t(HUMAN)),
330+
(MACHINE, t(MACHINE)),
331+
)
332+
node_type = models.TextField(
333+
choices=_node_type_choices,
334+
editable=False,
335+
db_index=True,
336+
)
337+
338+
parent_task_set = models.ManyToManyField(
339+
'self',
340+
related_name='child_task_set',
341+
editable=False,
342+
symmetrical=False,
343+
)
344+
345+
FAILED = 'failed'
346+
SUCCEEDED = 'succeeded'
347+
SCHEDULED = 'scheduled'
348+
_status_choices = (
349+
(FAILED, t(FAILED)),
350+
(SUCCEEDED, t(SUCCEEDED)),
351+
(SCHEDULED, t(SCHEDULED)),
352+
)
353+
status = models.TextField(
354+
choices=_status_choices,
355+
default=SCHEDULED,
356+
editable=False,
357+
db_index=True,
358+
)
359+
360+
assignees = models.ManyToManyField(
361+
settings.AUTH_USER_MODEL,
362+
verbose_name=t('assignees'),
363+
related_name='galahad_task_set',
364+
)
311365

312366
created = models.DateTimeField(auto_now_add=True, db_index=True)
367+
modified = models.DateTimeField(auto_now=True, db_index=True)
313368
completed = models.DateTimeField(blank=True, null=True, editable=False, db_index=True)
314-
failed = models.DateTimeField(blank=True, null=True, editable=False, db_index=True)
315369

316370
exception = models.TextField(blank=True)
317371
stacktrace = models.TextField(blank=True)
318372

319373
objects = TasksQuerySet.as_manager()
320374

321-
def __str__(self):
322-
return '%s (%s)' % (self.node_name, self.pk)
323-
324375
class Meta:
325376
ordering = ('-completed', '-created')
326377
get_latest_by = ('created',)
@@ -330,6 +381,21 @@ class Meta:
330381
)
331382
default_manager_name = 'objects'
332383

384+
def __str__(self):
385+
return '%s (%s)' % (self.node_name, self.pk)
386+
387+
def save(self, **kwargs):
388+
if self.pk:
389+
try:
390+
update_fields = kwargs['update_fields']
391+
except KeyError as e:
392+
raise ValueError(
393+
"You need to provide explicit 'update_fields' to avoid race conditions."
394+
) from e
395+
else:
396+
update_fields.append('modified')
397+
super().save(**kwargs)
398+
333399
def get_absolute_url(self):
334400
if self.completed:
335401
return
@@ -345,17 +411,19 @@ def node(self):
345411

346412
def finish(self):
347413
self.completed = timezone.now()
414+
self.status = self.SUCCEEDED
348415
if self.pk:
349-
self.save(update_fields=['completed'])
416+
self.save(update_fields=['status', 'completed'])
350417
else:
351418
self.save()
352419

353420
def fail(self):
354-
self.failed = timezone.now()
421+
self.completed = timezone.now()
422+
self.status = self.FAILED
355423
tb = traceback.format_exception(*sys.exc_info())
356424
self.exception = tb[-1].strip()
357425
self.stacktrace = "".join(tb)
358-
self.save(update_fields=['failed', 'exception', 'stacktrace'])
426+
self.save(update_fields=['status', 'exception', 'stacktrace'])
359427

360428
def enqueue(self, countdown=None, eta=None):
361429
"""
@@ -372,12 +440,22 @@ def enqueue(self, countdown=None, eta=None):
372440
celery.result.AsyncResult: Celery task result.
373441
374442
"""
375-
return celery.task_wrapper.apply_async(
443+
self.status = self.SCHEDULED
444+
self.completed = None
445+
self.exception = ''
446+
self.stacktrace = ''
447+
self.save(update_fields=[
448+
'status',
449+
'completed',
450+
'exception',
451+
'stacktrace',
452+
])
453+
transaction.on_commit(lambda: celery.task_wrapper.apply_async(
376454
args=(self.pk, self._process_id),
377455
countdown=countdown,
378456
eta=eta,
379457
queue=settings.GALAHAD_CELERY_QUEUE_NAME,
380-
)
458+
))
381459

382460
@transaction.atomic()
383461
def start_next_tasks(self, next_nodes: list = None):
@@ -400,11 +478,12 @@ def start_next_tasks(self, next_nodes: list = None):
400478
# Some nodes – like Join – implement their own method to create new tasks.
401479
task = node.create_task(self.process)
402480
except AttributeError:
403-
task = self.process.task_set.create(node_name=node.node_name, completed=None, failed=None)
481+
task = self.process.task_set.create(
482+
node_name=node.node_name,
483+
node_type=node.node_type,
484+
)
404485
task.parent_task_set.add(self)
405486
if callable(node):
406487
transaction.on_commit(task.enqueue)
407488
tasks.append(task)
408-
else:
409-
self.process.finish()
410489
return tasks

galahad/tasks/machine.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ def __call__(self, process, task):
101101
return set(task.parent_task_set.values_list('node_name', flat=True)) == self.parents
102102

103103
def create_task(self, process):
104-
return process.task_set.get_or_create(node_name=self.node_name, completed=None, failed=None)[0]
104+
return process.task_set.get_or_create(
105+
node_name=self.node_name,
106+
node_type=self.node_type,
107+
completed=None,
108+
)[0]
105109

106110

107111
class NonBlockingJoin(Join):
@@ -111,7 +115,10 @@ def __call__(self, process, task):
111115
return tuple()
112116

113117
def create_task(self, process):
114-
return process.task_set.get_or_create(node_name=self.node_name, failed=None)[0]
118+
return process.task_set.get_or_create(
119+
node_name=self.node_name,
120+
node_type=self.node_type,
121+
)[0]
115122

116123

117124
class Wait:

tests/tasks/test_machine.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def test_create_task(self, db):
3030
proc = models.SimpleProcess.start_method()
3131
node = tasks.Join()
3232
node.node_name = 'test'
33+
node.node_type = 'machine'
3334
obj = node.create_task(proc)
3435
obj2 = node.create_task(proc)
3536
assert obj == obj2

tests/test_integration.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""High level integration tests."""
22
import sys
33
import time
4+
from unittest.mock import patch
45

56
import pytest
67
from django.urls import reverse
@@ -61,7 +62,8 @@ def test_start_method(self, db):
6162

6263
class TestSplitJoinProcess:
6364

64-
def test_join(self, db, client):
65+
@patch('galahad.celery.task_wrapper.retry')
66+
def test_join(self, retry, db, client):
6567
url = reverse('splitjoinprocess:start')
6668
response = client.post(url)
6769
assert response.status_code == 302

0 commit comments

Comments
 (0)