Skip to content

Commit 743b7c3

Browse files
Fix Default Periodic Task Unique ID (#147)
* add (failing) tests * fix default args/kwargs * improve error message for queue length assertions * recalculate id between periodic executions
1 parent c995461 commit 743b7c3

File tree

3 files changed

+159
-5
lines changed

3 files changed

+159
-5
lines changed

tasktiger/task.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import datetime
23
import json
34
import redis
@@ -75,6 +76,9 @@ def __init__(
7576
if max_queue_size is None:
7677
max_queue_size = getattr(func, '_task_max_queue_size', None)
7778

79+
# normalize falsy args/kwargs to empty structures
80+
args = args or []
81+
kwargs = kwargs or {}
7882
if unique:
7983
task_id = gen_unique_id(serialized_name, args, kwargs)
8084
else:
@@ -518,6 +522,18 @@ def delete(self):
518522
"""
519523
self._move(from_state=ERROR)
520524

525+
def clone(self):
526+
"""Returns a clone of the this task"""
527+
return type(self)(
528+
tiger=self.tiger,
529+
func=self.func,
530+
queue=self.queue,
531+
_state=self._state,
532+
_ts=self._ts,
533+
_executions=copy.copy(self._executions),
534+
_data=copy.copy(self._data),
535+
)
536+
521537
def _queue_for_next_period(self):
522538
now = datetime.datetime.utcnow()
523539
schedule = self.func._task_schedule
@@ -528,5 +544,11 @@ def _queue_for_next_period(self):
528544
schedule_func, schedule_args = schedule
529545
when = schedule_func(now, *schedule_args)
530546
if when:
531-
self.delay(when=when)
547+
# recalculate the unique id so that malformed ids don't persist
548+
# between executions
549+
task = self.clone()
550+
task._data['id'] = gen_unique_id(
551+
task.serialized_func, task.args, task.kwargs
552+
)
553+
task.delay(when=when)
532554
return when

tests/test_base.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,21 @@ def teardown_method(self, method):
5656
def _ensure_queues(
5757
self, queued=None, active=None, error=None, scheduled=None
5858
):
59+
60+
expected_queues = {
61+
'queued': {name for name, n in (queued or {}).items() if n},
62+
'active': {name for name, n in (active or {}).items() if n},
63+
'error': {name for name, n in (error or {}).items() if n},
64+
'scheduled': {name for name, n in (scheduled or {}).items() if n},
65+
}
66+
actual_queues = {
67+
i: self.conn.smembers('t:{}'.format(i))
68+
for i in ('queued', 'active', 'error', 'scheduled')
69+
}
70+
assert expected_queues == actual_queues
71+
5972
def _ensure_queue(typ, data):
6073
data = data or {}
61-
data_names = set(name for name, n in data.items() if n)
62-
assert self.conn.smembers('t:%s' % typ) == data_names
6374
ret = {}
6475
for name, n in data.items():
6576
task_ids = self.conn.zrange('t:%s:%s' % (typ, name), 0, -1)

tests/test_periodic.py

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,15 @@
44
import time
55

66
from tasktiger import Worker, periodic
7-
8-
from .tasks_periodic import tiger
7+
from tasktiger import (
8+
Task,
9+
gen_unique_id,
10+
serialize_func_name,
11+
QUEUED,
12+
SCHEDULED,
13+
)
14+
15+
from .tasks_periodic import tiger, periodic_task
916
from .test_base import BaseTestCase
1017

1118

@@ -107,3 +114,117 @@ def ensure_run(n):
107114
time.sleep(1)
108115

109116
ensure_run(2)
117+
118+
def test_periodic_execution_unique_ids(self):
119+
"""
120+
Test that periodic tasks generate the same unique ids
121+
122+
When a periodic task is scheduled initially as part of worker startup
123+
vs re-scheduled from within python the unique id generated should be
124+
the same. If they aren't it could result in duplicate tasks.
125+
"""
126+
# Sleep until the next second
127+
now = datetime.datetime.utcnow()
128+
time.sleep(1 - now.microsecond / 10.0 ** 6)
129+
130+
# After the first worker run, the periodic task will be queued.
131+
# Note that since periodic tasks register with the Tiger instance, it
132+
# must be the same instance that was used to decorate the task. We
133+
# therefore use `tiger` from the tasks module instead of `self.tiger`.
134+
self._ensure_queues()
135+
Worker(tiger).run(once=True)
136+
self._ensure_queues(scheduled={'periodic': 1})
137+
time.sleep(1)
138+
Worker(tiger).run(once=True)
139+
self._ensure_queues(queued={'periodic': 1})
140+
141+
# generate the expected unique id
142+
expected_unique_id = gen_unique_id(
143+
serialize_func_name(periodic_task), [], {}
144+
)
145+
146+
# pull task out of the queue by id. If found, then the id is correct
147+
task = Task.from_id(tiger, 'periodic', QUEUED, expected_unique_id)
148+
assert task is not None
149+
150+
# execute and reschedule the task
151+
self._ensure_queues(queued={'periodic': 1})
152+
Worker(tiger).run(once=True)
153+
self._ensure_queues(scheduled={'periodic': 1})
154+
155+
# wait for the task to need to be queued
156+
time.sleep(1)
157+
Worker(tiger).run(once=True)
158+
self._ensure_queues(queued={'periodic': 1})
159+
160+
# The unique id shouldn't change between executions. Try finding the
161+
# task by id again
162+
task = Task.from_id(tiger, 'periodic', QUEUED, expected_unique_id)
163+
assert task is not None
164+
165+
def test_periodic_execution_unique_ids_manual_scheduling(self):
166+
"""
167+
Periodic tasks should have the same unique ids when manually scheduled
168+
169+
When a periodic task is scheduled initially as part of worker startup
170+
vs ``.delay``'d manually, the unique id generated should be the same.
171+
If they aren't it could result in duplicate tasks.
172+
"""
173+
# Sleep until the next second
174+
now = datetime.datetime.utcnow()
175+
time.sleep(1 - now.microsecond / 10.0 ** 6)
176+
177+
# After the first worker run, the periodic task will be queued.
178+
# Note that since periodic tasks register with the Tiger instance, it
179+
# must be the same instance that was used to decorate the task. We
180+
# therefore use `tiger` from the tasks module instead of `self.tiger`.
181+
self._ensure_queues()
182+
Worker(tiger).run(once=True)
183+
self._ensure_queues(scheduled={'periodic': 1})
184+
time.sleep(1)
185+
Worker(tiger).run(once=True)
186+
self._ensure_queues(queued={'periodic': 1})
187+
188+
# schedule the task manually
189+
periodic_task.delay()
190+
191+
# make sure a duplicate wasn't scheduled
192+
self._ensure_queues(queued={'periodic': 1})
193+
194+
def test_periodic_execution_unique_ids_self_correct(self):
195+
"""
196+
Test that periodic tasks will self-correct unique ids
197+
"""
198+
# Sleep until the next second
199+
now = datetime.datetime.utcnow()
200+
time.sleep(1 - now.microsecond / 10.0 ** 6)
201+
202+
# generate the ids
203+
correct_unique_id = gen_unique_id(
204+
serialize_func_name(periodic_task), [], {}
205+
)
206+
malformed_unique_id = gen_unique_id(
207+
serialize_func_name(periodic_task), None, None
208+
)
209+
210+
task = Task(tiger, func=periodic_task)
211+
212+
# patch the id to something slightly wrong
213+
assert task.id == correct_unique_id
214+
task._data['id'] = malformed_unique_id
215+
assert task.id == malformed_unique_id
216+
217+
# schedule the task
218+
task.delay()
219+
self._ensure_queues(queued={'periodic': 1})
220+
221+
# pull task out of the queue by the malformed id
222+
task = Task.from_id(tiger, 'periodic', QUEUED, malformed_unique_id)
223+
assert task is not None
224+
225+
Worker(tiger).run(once=True)
226+
self._ensure_queues(scheduled={'periodic': 1})
227+
228+
# pull task out of the queue by the self-corrected id
229+
task = Task.from_id(tiger, 'periodic', SCHEDULED, correct_unique_id)
230+
assert task is not None

0 commit comments

Comments
 (0)