Skip to content

Commit 8eac2ac

Browse files
Viatrakauvipy
authored andcommitted
Adds support for message headers for periodic tasks (#98)
* Add headers field support to execution options * Add migration for headers on periodictask model * Fix json loading from model field * Dumps headers * Fix migrations for new model field * Fix for 'priority' field not showing up in django admin execution options * Fix flake8 errors and future imports on migration file * Add tests for saving and checking priority field and headers field in options * Update tests * Flake 8 fixes * Fix migrations & dependency tree for PR * Add python2 compatibility imports and flake8 fixes into migration file
1 parent 75491bf commit 8eac2ac

File tree

5 files changed

+40
-3
lines changed

5 files changed

+40
-3
lines changed

django_celery_beat/admin.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ class PeriodicTaskAdmin(admin.ModelAdmin):
137137
'classes': ('extrapretty', 'wide', 'collapse', 'in'),
138138
}),
139139
('Execution Options', {
140-
'fields': ('expires', 'queue', 'exchange', 'routing_key'),
140+
'fields': ('expires', 'queue', 'exchange', 'routing_key',
141+
'priority', 'headers'),
141142
'classes': ('extrapretty', 'wide', 'collapse', 'in'),
142143
}),
143144
)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Generated by Django 2.1.5 on 2019-02-09 19:33
2+
from __future__ import absolute_import, unicode_literals
3+
4+
from django.db import migrations, models
5+
6+
7+
class Migration(migrations.Migration):
8+
9+
dependencies = [
10+
('django_celery_beat', '0006_periodictask_priority'),
11+
('django_celery_beat', '0008_auto_20180914_1922'),
12+
]
13+
14+
operations = [
15+
migrations.AddField(
16+
model_name='periodictask',
17+
name='headers',
18+
field=models.TextField(
19+
blank=True,
20+
default='{}',
21+
help_text='JSON encoded message headers',
22+
verbose_name='Message headers'
23+
),
24+
),
25+
]

django_celery_beat/models.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,10 @@ class PeriodicTask(models.Model):
293293
routing_key = models.CharField(
294294
_('routing key'), max_length=200, blank=True, null=True, default=None,
295295
)
296+
headers = models.TextField(
297+
_('Message headers'), blank=True, default='{}',
298+
help_text=_('JSON encoded message headers'),
299+
)
296300
priority = models.PositiveIntegerField(
297301
_('priority'), default=None, validators=[MaxValueValidator(255)],
298302
blank=True, null=True
@@ -353,6 +357,7 @@ def save(self, *args, **kwargs):
353357
self.exchange = self.exchange or None
354358
self.routing_key = self.routing_key or None
355359
self.queue = self.queue or None
360+
self.headers = self.headers or None
356361
if not self.enabled:
357362
self.last_run_at = None
358363
super(PeriodicTask, self).save(*args, **kwargs)

django_celery_beat/schedulers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def __init__(self, model, app=None):
8686
if value is None:
8787
continue
8888
self.options[option] = value
89+
self.options['headers'] = loads(model.headers or '{}')
8990

9091
self.total_run_count = model.total_run_count
9192
self.model = model
@@ -197,12 +198,13 @@ def _unpack_fields(cls, schedule,
197198

198199
@classmethod
199200
def _unpack_options(cls, queue=None, exchange=None, routing_key=None,
200-
priority=None, **kwargs):
201+
priority=None, headers=None, **kwargs):
201202
return {
202203
'queue': queue,
203204
'exchange': exchange,
204205
'routing_key': routing_key,
205-
'priority': priority
206+
'priority': priority,
207+
'headers': dumps(headers or {}),
206208
}
207209

208210
def __repr__(self):

t/unit/test_schedulers.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ def create_model(self, Model=PeriodicTask, **kwargs):
9797
kwargs='{"callback": "foo"}',
9898
queue='xaz',
9999
routing_key='cpu',
100+
priority=1,
101+
headers='{"_schema_name": "foobar"}',
100102
exchange='foo',
101103
)
102104
return Model(**dict(entry, **kwargs))
@@ -118,6 +120,8 @@ def test_entry(self):
118120
assert e.options['queue'] == 'xaz'
119121
assert e.options['exchange'] == 'foo'
120122
assert e.options['routing_key'] == 'cpu'
123+
assert e.options['priority'] == 1
124+
assert e.options['headers'] == {'_schema_name': 'foobar'}
121125

122126
right_now = self.app.now()
123127
m2 = self.create_model_interval(

0 commit comments

Comments
 (0)