Skip to content

Commit 99c822e

Browse files
committed
Add Dramatiq support
1 parent 5b19baa commit 99c822e

23 files changed

+306
-135
lines changed

.github/workflows/ci.yml

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,31 @@ jobs:
77
docs:
88
runs-on: ubuntu-latest
99
steps:
10-
- uses: actions/checkout@v1
1110
- name: Set up Python
12-
uses: actions/setup-python@v1
11+
uses: actions/setup-python@v2
12+
- uses: actions/checkout@v2
1313
- name: Install binary dependencies
1414
run: sudo apt-get install -y python3-enchant graphviz
1515
- name: Install Python dependencies
1616
run: |
1717
python -m pip install --upgrade pip setuptools
1818
pip install sphinxcontrib-spelling
19-
python setup.py develop
19+
pip install -e '.[docs]'
2020
- name: Build documentation
2121
run: python setup.py build_sphinx -W -b spelling
2222

23-
tests:
23+
dist:
24+
runs-on: ubuntu-latest
25+
steps:
26+
- uses: actions/setup-python@v2
27+
- run: python -m pip install --upgrade pip setuptools wheel twine readme-renderer
28+
- uses: actions/checkout@v2
29+
- run: python setup.py sdist bdist_wheel
30+
- run: python -m twine check dist/*
31+
32+
pytest:
33+
needs:
34+
- dist
2435
runs-on: ubuntu-latest
2536
strategy:
2637
matrix:
@@ -31,26 +42,49 @@ jobs:
3142
django-version:
3243
- 2.2.*
3344
- 3.0.*
34-
extras:
35-
- test
36-
- test,reversion
3745
steps:
38-
- uses: actions/checkout@v1
3946
- name: Set up Python ${{ matrix.python-version }}
40-
uses: actions/setup-python@v1
47+
uses: actions/setup-python@v2
4148
with:
4249
python-version: ${{ matrix.python-version }}
50+
- uses: actions/checkout@v2
4351
- name: Install binary dependencies
4452
run: |
4553
sudo apt-get update
4654
sudo apt-get install -y graphviz redis-server
4755
- name: Install Python dependencies
4856
run: |
4957
python -m pip install --upgrade pip setuptools wheel codecov
50-
python -m pip install -e .[${{ matrix.extras }}]
58+
python -m pip install -e .[dramatiq]
5159
python -m pip install django==${{ matrix.django-version }}
5260
- name: Run tests
53-
run: PATH=$PATH:$(pwd)/bin py.test
61+
run: python setup.py test
5462
- run: codecov
5563
env:
5664
CODECOV_TOKEN: ${{secrets.CODECOV_TOKEN}}
65+
66+
extras:
67+
runs-on: ubuntu-latest
68+
strategy:
69+
matrix:
70+
extras:
71+
- dramatiq
72+
- celery
73+
- dramatiq,reversion
74+
steps:
75+
- name: Set up Python ${{ matrix.python-version }}
76+
uses: actions/setup-python@v2
77+
- uses: actions/checkout@v2
78+
- name: Install binary dependencies
79+
run: |
80+
sudo apt-get update
81+
sudo apt-get install -y graphviz redis-server
82+
- name: Install Python dependencies
83+
run: |
84+
python -m pip install --upgrade pip setuptools wheel codecov
85+
python -m pip install -e .[${{ matrix.extras }}]
86+
- name: Run tests
87+
run: python setup.py test
88+
- run: codecov
89+
env:
90+
CODECOV_TOKEN: ${{secrets.CODECOV_TOKEN}}

docs/tutorial/index.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ the PyPi package…
1212

1313
.. code:: shell
1414
15-
pip install joeflow[reversion]
15+
python3 -m pip install "joeflow[reversion,dramatiq,celery]"
1616
1717
…and add ``joeflow`` to the ``INSTALLED_APP`` setting. You will also need to have
1818
celery setup.

docs/urls.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ pattern consisting of the process name (lowercase) and task name, e.g.:
99

1010
.. code-block:: python
1111
12-
>>> reverse(process_name:task_name, args=[task.pk])
12+
>>> from django.urls import reverse
13+
>>> reverse("process_name:task_name", args=[task.pk])
1314
'/url/to/process/task/1'
1415
1516
All task URLs need the `.Task` primary key as an argument. There are some

joeflow/celery/__init__.py

Whitespace-only changes.

joeflow/conf.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,22 @@ class JoeflowAppConfig(AppConf):
2626
process state.
2727
"""
2828

29-
JOEFLOW_CELERY_QUEUE_NAME = 'celery'
29+
JOEFLOW_TASK_RUNNER = 'joeflow.runner.dramatiq.task_runner'
30+
"""
31+
Task runner is used to execute machine tasks.
32+
33+
JoeFlow supports two different asynchronous task runners – Dramatiq_ and Celery_.
34+
35+
To use either of the task runners change this setting to:
36+
37+
* ``joeflow.runner.dramatiq.task_runner``
38+
* ``joeflow.runner.celery.task_runner``
39+
40+
.. _Dramatiq: https://dramatiq.io/
41+
.. _Celery: http://www.celeryproject.org/
42+
"""
43+
44+
JOEFLOW_CELERY_QUEUE_NAME = 'joeflow'
3045
"""
3146
Queue name in which all machine tasks will be queued.
3247
"""

joeflow/locking.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def _lock(process_pk):
2323
The lock is not blocking to free CPU time for other tasks on celery
2424
workers.
2525
"""
26-
connection = redis.StrictRedis.from_url(settings.JOEFLOW_REDIS_LOCK_URL)
26+
connection = redis.Redis.from_url(settings.JOEFLOW_REDIS_LOCK_URL)
2727
__lock = connection.lock('joeflow_process_{}'.format(process_pk), timeout=settings.JOEFLOW_REDIS_LOCK_TIMEOUT)
2828
successful = __lock.acquire(blocking=False)
2929
try:
@@ -33,6 +33,7 @@ def _lock(process_pk):
3333
finally:
3434
if successful:
3535
__lock.release()
36+
connection.close()
3637

3738

3839
# little hack for testing purposes

joeflow/models.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
from django.db.models.functions import Now
99
from django.urls import NoReverseMatch, path, reverse
1010
from django.utils import timezone
11+
from django.utils.module_loading import import_string
1112
from django.utils.safestring import SafeString
1213
from django.utils.translation import ugettext_lazy as t
1314
from django.views.generic.edit import BaseCreateView
1415

15-
from joeflow import celery, tasks, utils, views
16-
16+
from . import tasks, utils, views
1717
from .conf import settings
1818

1919
logger = logging.getLogger(__name__)
@@ -527,11 +527,12 @@ def enqueue(self, countdown=None, eta=None):
527527
'exception',
528528
'stacktrace',
529529
])
530-
transaction.on_commit(lambda: celery.task_wrapper.apply_async(
531-
args=(self.pk, self._process_id),
530+
task_runner = import_string(settings.JOEFLOW_TASK_RUNNER)
531+
transaction.on_commit(lambda: task_runner(
532+
task_pk=self.pk,
533+
process_pk=self._process_id,
532534
countdown=countdown,
533535
eta=eta,
534-
queue=settings.JOEFLOW_CELERY_QUEUE_NAME,
535536
))
536537

537538
@transaction.atomic()

joeflow/runner/__init__.py

Whitespace-only changes.

joeflow/celery.py renamed to joeflow/runner/celery.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,24 @@
11
import logging
2-
import random
32

43
from celery import shared_task
54
from django.apps import apps
65
from django.db import transaction
76

7+
from joeflow.conf import settings
88
from joeflow.contrib.reversion import with_reversion
99

10-
from . import locking
10+
from .. import locking, utils
1111

12-
logger = logging.getLogger('joeflow')
12+
logger = logging.getLogger(__name__)
1313

1414

15-
def jitter():
16-
"""Return a random number between 0 and 1."""
17-
return random.randrange(2) # nosec
18-
19-
20-
def backoff(retries):
21-
"""Return an exponentially growing number limited to 600 plus a random jitter."""
22-
return min(600, 2 ** retries) + jitter()
15+
__all__ = ['task_runner']
2316

2417

2518
@shared_task(bind=True, ignore_results=True, max_retries=None)
26-
def task_wrapper(self, task_pk, process_pk):
19+
def _celery_task_runner(self, task_pk, process_pk):
2720
with locking.lock(process_pk) as lock_result:
28-
countdown = backoff(self.request.retries)
21+
countdown = utils.backoff(self.request.retries)
2922
if not lock_result:
3023
logger.info("Process is locked, retrying in %s seconds", countdown)
3124
self.retry(countdown=countdown)
@@ -55,3 +48,13 @@ def task_wrapper(self, task_pk, process_pk):
5548
logger.info("Task completed successful, starting next tasks: %s", result)
5649
task.start_next_tasks(next_nodes=result)
5750
task.finish()
51+
52+
53+
def task_runner(*, task_pk, process_pk, countdown, eta):
54+
"""Schedule asynchronous machine task using celery."""
55+
_celery_task_runner.apply_async(
56+
args=(task_pk, process_pk),
57+
countdown=countdown,
58+
eta=eta,
59+
queue=settings.JOEFLOW_CELERY_QUEUE_NAME,
60+
)

joeflow/runner/dramatiq.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import logging
2+
3+
import dramatiq
4+
from django.apps import apps
5+
from django.db import OperationalError, transaction
6+
7+
from .. import locking, utils
8+
from ..conf import settings
9+
from ..contrib.reversion import with_reversion
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
def task_runner(*, task_pk, process_pk, countdown=None, eta=None, retries=0):
15+
"""Schedule asynchronous machine task using celery."""
16+
_dramatiq_task_runner.send_with_options(
17+
args=(task_pk, process_pk),
18+
delay=countdown,
19+
retries=retries,
20+
)
21+
22+
23+
@dramatiq.actor(queue_name=settings.JOEFLOW_CELERY_QUEUE_NAME, retry_when=lambda a, b: isinstance(b, OperationalError))
24+
def _dramatiq_task_runner(task_pk, process_pk, retries=0):
25+
with locking.lock(process_pk) as lock_result:
26+
countdown = utils.backoff(retries)
27+
if not lock_result:
28+
logger.info("Process is locked, retrying in %s seconds", countdown)
29+
task_runner(task_pk=task_pk, process_pk=process_pk, countdown=countdown, retries=retries + 1)
30+
return
31+
Task = apps.get_model('joeflow', 'Task')
32+
task = Task.objects.get(pk=task_pk, completed=None)
33+
process = task.process
34+
35+
try:
36+
logger.info("Executing %r", task)
37+
node = getattr(type(process), task.name)
38+
with_task = getattr(node, 'with_task', False)
39+
kwargs = {}
40+
if with_task:
41+
kwargs['task'] = task
42+
with with_reversion(task):
43+
result = node(process, **kwargs)
44+
except: # NoQA
45+
task.fail()
46+
logger.exception("Execution of %r failed", task)
47+
else:
48+
if result is False:
49+
logger.info("Task returned False, retrying in %s seconds", countdown)
50+
transaction.on_commit(lambda: task_runner(
51+
task_pk=task_pk,
52+
process_pk=process_pk,
53+
countdown=countdown,
54+
retries=retries + 1,
55+
))
56+
return
57+
elif result is True:
58+
result = None
59+
logger.info("Task completed successful, starting next tasks: %s", result)
60+
task.start_next_tasks(next_nodes=result)
61+
task.finish()

0 commit comments

Comments
 (0)