Skip to content

Commit 028ce33

Browse files
committed
wip
1 parent 30d41a2 commit 028ce33

File tree

6 files changed

+80
-28
lines changed

6 files changed

+80
-28
lines changed

galahad/celery.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@
1212

1313
def jitter():
1414
"""Return a random number between 0 and 1."""
15-
return random.randrange(1)
15+
return random.randrange(2)
1616

1717

1818
def backoff(retries):
1919
"""Return an exponentially growing number limited to 600 plus a random jitter."""
2020
return min(600, 2 ** retries) + jitter()
2121

2222

23-
@shared_task(bind=True, ignore_results=True)
24-
def task_wrapper(self, task_pk, process_pk, retries=0):
23+
@shared_task(bind=True, ignore_results=True, max_retries=None)
24+
def task_wrapper(self, task_pk, process_pk):
2525
with locking.lock(process_pk) as lock_result:
26+
countdown = backoff(self.request.retries)
2627
if not lock_result:
27-
countdown = backoff(self.request.retries)
2828
logger.info("Process is locked, retrying in %s seconds", countdown)
2929
self.retry(countdown=countdown)
3030
Task = apps.get_model('galahad', 'Task')
@@ -43,9 +43,8 @@ def task_wrapper(self, task_pk, process_pk, retries=0):
4343
logger.exception("Execution of %r failed", task)
4444
else:
4545
if result is False:
46-
countdown = backoff(retries)
4746
logger.info("Task returned False, retrying in %s seconds", countdown)
48-
transaction.on_commit(lambda: task.enqueue(countdown=countdown, retires=retries+1))
47+
transaction.on_commit(lambda: self.retry(countdown=countdown))
4948
return
5049
elif result is True:
5150
result = None

galahad/conf.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,3 @@ class GalahadAppConfig(AppConf):
3030
"""
3131
Queue name in which all machine tasks will be queued.
3232
"""
33-
34-
GALAHAD_CELERY_QUEUE_NAME = 'celery'
35-
"""
36-
Queue name in which all machine tasks will be queued.
37-
"""

galahad/migrations/0001_initial.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ class Migration(migrations.Migration):
2323
('started', models.DateTimeField(auto_now_add=True, db_index=True)),
2424
('completed', models.DateTimeField(blank=True, db_index=True, editable=False, null=True)),
2525
],
26-
options={
27-
'abstract': False,
28-
},
2926
),
3027
migrations.CreateModel(
3128
name='Task',

galahad/models.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,7 @@ def __new__(cls, name, bases, attrs, **kwargs):
5959
return klass
6060

6161

62-
class AbstractProcessState(models.Model, metaclass=BaseProcess):
63-
id = models.BigAutoField(primary_key=True, editable=False)
64-
started = models.DateTimeField(auto_now_add=True, db_index=True)
65-
completed = models.DateTimeField(blank=True, null=True, editable=False, db_index=True)
66-
67-
task_set = GenericRelation('galahad.Task', object_id_field='_process_id')
68-
69-
class Meta:
70-
abstract = True
71-
72-
73-
class Process(AbstractProcessState):
62+
class Process(models.Model, metaclass=BaseProcess):
7463
"""
7564
The `Process` object holds the state of a workflow instances.
7665
@@ -80,6 +69,11 @@ class Process(AbstractProcessState):
8069
Processes are also the vehicle for the other two components tasks and
8170
:attr:`.edges`.
8271
"""
72+
id = models.BigAutoField(primary_key=True, editable=False)
73+
started = models.DateTimeField(auto_now_add=True, db_index=True)
74+
completed = models.DateTimeField(blank=True, null=True, editable=False, db_index=True)
75+
76+
task_set = GenericRelation('galahad.Task', object_id_field='_process_id')
8377

8478
edges = None
8579
"""
@@ -363,7 +357,7 @@ def fail(self):
363357
self.stacktrace = "".join(tb)
364358
self.save(update_fields=['failed', 'exception', 'stacktrace'])
365359

366-
def enqueue(self, countdown=None, eta=None, retries=0):
360+
def enqueue(self, countdown=None, eta=None):
367361
"""
368362
Schedule the tasks for execution.
369363
@@ -379,7 +373,7 @@ def enqueue(self, countdown=None, eta=None, retries=0):
379373
380374
"""
381375
return celery.task_wrapper.apply_async(
382-
args=(self.pk, self._process_id, retries),
376+
args=(self.pk, self._process_id),
383377
countdown=countdown,
384378
eta=eta,
385379
queue=settings.GALAHAD_CELERY_QUEUE_NAME,
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Generated by Django 2.1.3 on 2018-11-15 10:51
2+
3+
from django.conf import settings
4+
from django.db import migrations, models
5+
import django.db.models.deletion
6+
7+
8+
class Migration(migrations.Migration):
9+
10+
initial = True
11+
12+
dependencies = [
13+
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
14+
('galahad', '0001_initial'),
15+
]
16+
17+
operations = [
18+
migrations.CreateModel(
19+
name='GatewayProcess',
20+
fields=[
21+
('process_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='galahad.Process')),
22+
],
23+
bases=('galahad.process',),
24+
),
25+
migrations.CreateModel(
26+
name='LoopProcess',
27+
fields=[
28+
('process_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='galahad.Process')),
29+
('counter', models.PositiveIntegerField(default=0)),
30+
],
31+
bases=('galahad.process',),
32+
),
33+
migrations.CreateModel(
34+
name='SimpleProcess',
35+
fields=[
36+
('process_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='galahad.Process')),
37+
],
38+
bases=('galahad.process',),
39+
),
40+
migrations.CreateModel(
41+
name='SplitJoinProcess',
42+
fields=[
43+
('process_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='galahad.Process')),
44+
('parallel_task_value', models.PositiveIntegerField(default=0)),
45+
],
46+
bases=('galahad.process',),
47+
),
48+
migrations.CreateModel(
49+
name='WaitProcess',
50+
fields=[
51+
('process_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='galahad.Process')),
52+
('parallel_task_value', models.PositiveIntegerField(default=0)),
53+
],
54+
bases=('galahad.process',),
55+
),
56+
migrations.CreateModel(
57+
name='WelcomeProcess',
58+
fields=[
59+
('process_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='galahad.Process')),
60+
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
61+
],
62+
options={
63+
'abstract': False,
64+
},
65+
bases=('galahad.process', models.Model),
66+
),
67+
]

tests/testapp/migrations/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)