Skip to content

Commit 5dc88d0

Browse files
committed
Task is removed from group during retry
1 parent 8d7de7f commit 5dc88d0

File tree

4 files changed

+48
-31
lines changed

4 files changed

+48
-31
lines changed

eb_sqs/tests/worker/tests_worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ def repeating_group_task(count):
2626
@task(max_retries=5)
2727
def max_retries_group_task():
2828
repeating_group_task.delay(3, group_id='group-id', execute_inline=True)
29-
max_retries_group_task.retry()
29+
max_retries_group_task.retry(execute_inline=True)
3030

3131
global_group_mock = Mock()
3232

3333
class WorkerTest(TestCase):
3434
def setUp(self):
35+
settings.FORCE_SERIALIZATION = True
3536
settings.DEAD_LETTER_MODE = False
3637

3738
self.queue_mock = Mock(autospec=QueueClient)
@@ -129,7 +130,8 @@ def test_group_match_retries_reached(self):
129130
self.group_mock.remove.side_effect = lambda task: len(group_set) == 0 if group_set.discard(
130131
task.id) is None else False
131132

132-
max_retries_group_task.delay(group_id='group-id', execute_inline=True)
133+
with self.assertRaises(MaxRetriesReachedException):
134+
max_retries_group_task.delay(group_id='group-id', execute_inline=True)
133135

134136
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
135137

eb_sqs/worker/worker.py

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def execute(self, msg):
4343
worker_task.id,
4444
)
4545

46-
self._group_callback(worker_task)
46+
self._remove_from_group(worker_task)
4747
else:
4848
logger.info(
4949
'Execute task %s (%s) with args: %s and kwargs: %s',
@@ -83,20 +83,15 @@ def retry(self, worker_task, delay, execute_inline, count_retries):
8383
def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retries):
8484
# type: (WorkerTask, int, bool, bool, bool) -> Any
8585
try:
86-
if is_retry and count_retries:
87-
worker_task.retry += 1
88-
if worker_task.retry > worker_task.max_retries:
89-
self._group_callback(worker_task)
90-
raise MaxRetriesReachedException(worker_task.retry)
86+
if is_retry:
87+
if count_retries:
88+
worker_task.retry += 1
89+
if worker_task.retry > worker_task.max_retries:
90+
self._remove_from_group(worker_task)
91+
raise MaxRetriesReachedException(worker_task.retry)
92+
worker_task.retry_scheduled = True
9193

92-
if worker_task.group_id:
93-
logger.info(
94-
'Add task %s (%s) to group %s',
95-
worker_task.abs_func_name,
96-
worker_task.id,
97-
worker_task.group_id,
98-
)
99-
self.group_client.add(worker_task)
94+
self._add_to_group(worker_task)
10095

10196
logger.info('%s task %s (%s): %s, %s (%s%s)',
10297
'Retrying' if is_retry else 'Delaying',
@@ -106,6 +101,7 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
106101
worker_task.kwargs,
107102
worker_task.queue,
108103
', inline' if execute_inline else '')
104+
109105
if execute_inline:
110106
if settings.FORCE_SERIALIZATION:
111107
return self._execute_task(WorkerTask.deserialize(worker_task.serialize()))
@@ -114,11 +110,13 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
114110
else:
115111
self.queue_client.add_message(worker_task.queue, worker_task.serialize(), delay)
116112
return None
113+
except MaxRetriesReachedException:
114+
raise
117115
except QueueDoesNotExistException as ex:
118-
self._group_callback(worker_task)
116+
self._remove_from_group(worker_task)
119117
raise InvalidQueueException(ex.queue_name)
120118
except QueueClientException as ex:
121-
self._group_callback(worker_task)
119+
self._remove_from_group(worker_task)
122120

123121
logger.exception('Task %s (%s) failed to enqueue to %s: %s',
124122
worker_task.abs_func_name,
@@ -128,28 +126,44 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
128126

129127
raise QueueException()
130128
except Exception:
131-
self._group_callback(worker_task)
129+
self._remove_from_group(worker_task)
132130
raise
133131

134132
def _execute_task(self, worker_task):
135133
# type: (WorkerTask) -> Any
134+
worker_task.retry_scheduled = False
136135
result = worker_task.execute()
137-
self._group_callback(worker_task)
136+
self._remove_from_group(worker_task)
138137
return result
139138

140-
def _group_callback(self, worker_task):
139+
def _add_to_group(self, worker_task):
141140
# type: (WorkerTask) -> None
142-
if not worker_task.group_id:
143-
return
141+
if worker_task.group_id and not worker_task.retry_scheduled:
142+
logger.info(
143+
'Add task %s (%s) to group %s',
144+
worker_task.abs_func_name,
145+
worker_task.id,
146+
worker_task.group_id,
147+
)
148+
149+
self.group_client.add(worker_task)
150+
151+
def _remove_from_group(self, worker_task):
152+
# type: (WorkerTask) -> None
153+
if worker_task.group_id and not worker_task.retry_scheduled:
154+
logger.info(
155+
'Remove task %s (%s) from group %s',
156+
worker_task.abs_func_name,
157+
worker_task.id,
158+
worker_task.group_id,
159+
)
144160

145-
logger.info(
146-
'Remove task %s (%s) from group %s',
147-
worker_task.abs_func_name,
148-
worker_task.id,
149-
worker_task.group_id,
150-
)
161+
if self.group_client.remove(worker_task):
162+
self._execute_group_callback(worker_task)
151163

152-
if self.group_client.remove(worker_task) and settings.GROUP_CALLBACK_TASK:
164+
def _execute_group_callback(self, worker_task):
165+
# type: (WorkerTask) -> None
166+
if settings.GROUP_CALLBACK_TASK:
153167
callback = settings.GROUP_CALLBACK_TASK
154168

155169
if isinstance(callback, basestring):

eb_sqs/worker/worker_task.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def __init__(self, id, group_id, queue, func, args, kwargs, max_retries, retry,
2525
self.use_pickle = use_pickle
2626

2727
self.abs_func_name = '{}.{}'.format(self.func.__module__, self.func.func_name)
28+
self.retry_scheduled = False
2829

2930
def execute(self):
3031
# type: () -> Any

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
setup(
88
name='django-eb-sqs',
9-
version='0.91',
9+
version='0.92',
1010
package_dir={'eb_sqs': 'eb_sqs'},
1111
include_package_data=True,
1212
packages=find_packages(),

0 commit comments

Comments
 (0)