Skip to content

Commit bce1dc0

Browse files
committed
Fix: Task is removed from group on retry
1 parent 228afd7 commit bce1dc0

File tree

5 files changed

+43
-19
lines changed

5 files changed

+43
-19
lines changed

eb_sqs/redis/redis_group_client.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,20 @@ def _key_name(self, group_id):
1717
# type: (unicode) -> None
1818
return '{}{}'.format(settings.REDIS_KEY_PREFIX, group_id)
1919

20+
def _task_identifier(self, worker_task):
21+
# type: (WorkerTask) -> unicode
22+
if worker_task.retry_id:
23+
return '{}-{}'.format(worker_task.id, worker_task.retry_id)
24+
else:
25+
return worker_task.id
26+
2027
def add(self, worker_task):
2128
# type: (WorkerTask) -> None
2229
name = self._key_name(worker_task.group_id)
30+
value = self._task_identifier(worker_task)
31+
2332
pipe = self._redis_client.pipeline()
24-
pipe.sadd(name, worker_task.id)\
33+
pipe.sadd(name, value)\
2534
.expire(name, settings.REDIS_EXPIRY)\
2635
.execute()
2736

@@ -31,7 +40,9 @@ def remove(self, worker_task):
3140
:return: True if last task in group
3241
"""
3342
name = self._key_name(worker_task.group_id)
34-
if self._redis_client.srem(name, worker_task.id) > 0:
43+
value = self._task_identifier(worker_task)
44+
45+
if self._redis_client.srem(name, value) > 0:
3546
return self._redis_client.scard(name) == 0
3647
else:
3748
return False

eb_sqs/tests/worker/tests_worker.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,13 @@ def test_group(self):
113113
settings.GROUP_CALLBACK_TASK = Mock()
114114

115115
group_set = set()
116-
self.group_mock.add.side_effect = lambda task: group_set.add(task.id)
117-
self.group_mock.remove.side_effect = lambda task: len(group_set) == 0 if group_set.discard(task.id) is None else False
116+
self.group_mock.add.side_effect = lambda task: group_set.add('{}-{}'.format(task.id, task.retry_id))
117+
self.group_mock.remove.side_effect = lambda task: len(group_set) == 0 if group_set.discard(
118+
'{}-{}'.format(task.id, task.retry_id)) is None else False
118119

119120
repeating_group_task.delay(3, group_id='group-id', execute_inline=True)
120121

122+
self.assertEqual(len(group_set), 0)
121123
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
122124

123125
settings.GROUP_CALLBACK_TASK = None
@@ -126,30 +128,34 @@ def test_group_match_retries_reached(self):
126128
settings.GROUP_CALLBACK_TASK = Mock()
127129

128130
group_set = set()
129-
self.group_mock.add.side_effect = lambda task: group_set.add(task.id)
131+
self.group_mock.add.side_effect = lambda task: group_set.add('{}-{}'.format(task.id, task.retry_id))
130132
self.group_mock.remove.side_effect = lambda task: len(group_set) == 0 if group_set.discard(
131-
task.id) is None else False
133+
'{}-{}'.format(task.id, task.retry_id)) is None else False
132134

133135
with self.assertRaises(MaxRetriesReachedException):
134136
max_retries_group_task.delay(group_id='group-id', execute_inline=True)
135137

138+
self.assertEqual(len(group_set), 0)
136139
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
137140

138141
settings.GROUP_CALLBACK_TASK = None
139142

140143
def test_retry_execution(self):
141-
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, 0, False)
144+
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, 0, None, False)
142145
self.assertEqual(dummy_task.retry_num, 0)
143146

144147
self.worker.retry(task, 0, False, True)
145148

146149
self.queue_mock.add_message.assert_called_once()
150+
self.assertEqual(task.id, 'id')
151+
self.assertEqual(task.retry, 1)
152+
self.assertIsNotNone(task.retry_id)
147153

148154
def test_retry_max_reached_execution(self):
149155
dummy_task.retry_num = 0
150156

151157
with self.assertRaises(MaxRetriesReachedException):
152-
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 2, 0, False)
158+
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 2, 0, None, False)
153159
self.assertEqual(dummy_task.retry_num, 0)
154160

155161
self.worker.retry(task, 0, True, True)
@@ -163,7 +169,7 @@ def test_retry_max_reached_execution(self):
163169
def test_retry_no_limit(self):
164170
dummy_task.retry_num = 0
165171

166-
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 2, 0, False)
172+
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 2, 0, None, False)
167173
self.assertEqual(dummy_task.retry_num, 0)
168174

169175
self.worker.retry(task, 0, True, False)

eb_sqs/tests/worker/tests_worker_task.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ def dummy_function():
1818

1919
class WorkerTaskTest(TestCase):
2020
def setUp(self):
21-
self.dummy_msg = '{"queue": "default", "retry": 0, "func": "eb_sqs.tests.worker.tests_worker_task.dummy_function", "kwargs": {}, "maxRetries": 5, "args": [], "pickle": false, "id": "id-1", "groupId": "group-5"}'
21+
self.dummy_msg = '{"queue": "default", "retryId": "retry-uuid", "retry": 0, "func": "eb_sqs.tests.worker.tests_worker_task.dummy_function", "kwargs": {}, "maxRetries": 5, "args": [], "pickle": false, "id": "id-1", "groupId": "group-5"}'
2222

2323
def test_serialize_worker_task(self):
24-
worker_task = WorkerTask('id-1', 'group-5', 'default', dummy_function, [], {}, 5, 0, False)
24+
worker_task = WorkerTask('id-1', 'group-5', 'default', dummy_function, [], {}, 5, 0, 'retry-uuid', False)
2525
msg = worker_task.serialize()
2626

2727
self.assertEqual(msg, self.dummy_msg)
@@ -37,9 +37,10 @@ def test_deserialize_worker_task(self):
3737
self.assertEqual(worker_task.kwargs, {})
3838
self.assertEqual(worker_task.max_retries, 5)
3939
self.assertEqual(worker_task.retry, 0)
40+
self.assertEqual(worker_task.retry_id, 'retry-uuid')
4041

4142
def test_serialize_pickle(self):
42-
worker_task1 = WorkerTask('id-1', None, 'default', dummy_function, [], {'object': TestObject()}, 5, 0, True)
43+
worker_task1 = WorkerTask('id-1', None, 'default', dummy_function, [], {'object': TestObject()}, 5, 0, None, True)
4344
msg = worker_task1.serialize()
4445

4546
worker_task2 = WorkerTask.deserialize(msg)

eb_sqs/worker/worker.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def execute(self, msg):
7373
def delay(self, group_id, queue_name, func, args, kwargs, max_retries, use_pickle, delay, execute_inline):
7474
# type: (unicode, unicode, Any, tuple, dict, int, bool, int, bool) -> Any
7575
id = unicode(uuid.uuid4())
76-
worker_task = WorkerTask(id, group_id, queue_name, func, args, kwargs, max_retries, 0, use_pickle)
76+
worker_task = WorkerTask(id, group_id, queue_name, func, args, kwargs, max_retries, 0, None, use_pickle)
7777
return self._enqueue_task(worker_task, delay, execute_inline, False, True)
7878

7979
def retry(self, worker_task, delay, execute_inline, count_retries):
@@ -87,9 +87,9 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
8787
if count_retries:
8888
worker_task.retry += 1
8989
if worker_task.retry > worker_task.max_retries:
90-
self._remove_from_group(worker_task)
9190
raise MaxRetriesReachedException(worker_task.retry)
9291
worker_task.retry_scheduled = True
92+
worker_task.retry_id = unicode(uuid.uuid4())
9393

9494
self._add_to_group(worker_task)
9595

@@ -111,6 +111,7 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
111111
self.queue_client.add_message(worker_task.queue, worker_task.serialize(), delay)
112112
return None
113113
except MaxRetriesReachedException:
114+
self._remove_from_group(worker_task)
114115
raise
115116
except QueueDoesNotExistException as ex:
116117
self._remove_from_group(worker_task)
@@ -140,9 +141,10 @@ def _add_to_group(self, worker_task):
140141
# type: (WorkerTask) -> None
141142
if worker_task.group_id and not worker_task.retry_scheduled:
142143
logger.debug(
143-
'Add task %s (%s) to group %s',
144+
'Add task %s (%s, retry-id: %s) to group %s',
144145
worker_task.abs_func_name,
145146
worker_task.id,
147+
worker_task.retry_id,
146148
worker_task.group_id,
147149
)
148150

@@ -152,9 +154,10 @@ def _remove_from_group(self, worker_task):
152154
# type: (WorkerTask) -> None
153155
if worker_task.group_id and not worker_task.retry_scheduled:
154156
logger.debug(
155-
'Remove task %s (%s) from group %s',
157+
'Remove task %s (%s, retry-id: %s) from group %s',
156158
worker_task.abs_func_name,
157159
worker_task.id,
160+
worker_task.retry_id,
158161
worker_task.group_id,
159162
)
160163

eb_sqs/worker/worker_task.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111

1212

1313
class WorkerTask(object):
14-
def __init__(self, id, group_id, queue, func, args, kwargs, max_retries, retry, use_pickle):
15-
# type: (unicode, unicode, unicode, Any, tuple, dict, int, int, bool) -> None
14+
def __init__(self, id, group_id, queue, func, args, kwargs, max_retries, retry, retry_id, use_pickle):
15+
# type: (unicode, unicode, unicode, Any, tuple, dict, int, int, unicode, bool) -> None
1616
super(WorkerTask, self).__init__()
1717
self.id = id
1818
self.group_id = group_id
@@ -22,6 +22,7 @@ def __init__(self, id, group_id, queue, func, args, kwargs, max_retries, retry,
2222
self.kwargs = kwargs
2323
self.max_retries = max_retries
2424
self.retry = retry
25+
self.retry_id = retry_id
2526
self.use_pickle = use_pickle
2627

2728
self.abs_func_name = '{}.{}'.format(self.func.__module__, self.func.func_name)
@@ -48,6 +49,7 @@ def serialize(self):
4849
'kwargs': kwargs,
4950
'maxRetries': self.max_retries,
5051
'retry': self.retry,
52+
'retryId': self.retry_id,
5153
'pickle': self.use_pickle,
5254
}
5355

@@ -79,8 +81,9 @@ def deserialize(msg):
7981
kwargs = WorkerTask._unpickle_args(task['kwargs']) if use_pickle else task['kwargs']
8082
max_retries = task['maxRetries']
8183
retry = task['retry']
84+
retry_id = task.get('retryId')
8285

83-
return WorkerTask(id, group_id, queue, func, args, kwargs, max_retries, retry, use_pickle)
86+
return WorkerTask(id, group_id, queue, func, args, kwargs, max_retries, retry, retry_id, use_pickle)
8487

8588
@staticmethod
8689
def _unpickle_args(args):

0 commit comments

Comments
 (0)