Skip to content

Commit 0504a44

Browse files
committed
Fix failing task group removal
1 parent eae13c2 commit 0504a44

File tree

3 files changed

+110
-89
lines changed

3 files changed

+110
-89
lines changed

eb_sqs/tests/worker/tests_worker.py

Lines changed: 78 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,47 @@
1111
from eb_sqs.worker.worker import Worker
1212
from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException
1313
from eb_sqs.worker.worker_factory import WorkerFactory
14-
from eb_sqs.worker.worker_task import WorkerTask
1514

15+
class TestException(Exception):
16+
pass
1617

1718
@task()
1819
def dummy_task(msg):
1920
return msg
2021

22+
@task(max_retries=100)
23+
def retries_task(num_of_retries):
24+
if retries_task.retry_num < num_of_retries:
25+
retries_task.retry(execute_inline=True)
26+
27+
@task(max_retries=5)
28+
def max_retries_task():
29+
max_retries_task.retry(execute_inline=True)
30+
31+
@task(max_retries=100)
32+
def repeating_group_task(num_of_retries):
33+
if repeating_group_task.retry_num < num_of_retries:
34+
repeating_group_task.retry(execute_inline=True)
35+
2136
@task()
22-
def repeating_group_task(count):
23-
if count > 0:
24-
repeating_group_task.delay(count - 1, group_id='group-id', execute_inline=True)
37+
def exception_group_task():
38+
raise TestException()
39+
40+
@task(max_retries=100)
41+
def exception_repeating_group_task(num_of_retries):
42+
if exception_repeating_group_task.retry_num == num_of_retries:
43+
raise TestException()
44+
else:
45+
exception_repeating_group_task.retry(execute_inline=True)
2546

2647
@task(max_retries=5)
2748
def max_retries_group_task():
28-
repeating_group_task.delay(3, group_id='group-id', execute_inline=True)
2949
max_retries_group_task.retry(execute_inline=True)
3050

3151
global_group_mock = Mock()
3252

3353
class WorkerTest(TestCase):
3454
def setUp(self):
35-
settings.FORCE_SERIALIZATION = True
3655
settings.DEAD_LETTER_MODE = False
3756

3857
self.queue_mock = Mock(autospec=QueueClient)
@@ -44,6 +63,12 @@ def setUp(self):
4463
factory_mock.create.return_value = self.worker
4564
settings.WORKER_FACTORY = factory_mock
4665

66+
def setUpGroupsHandling(self):
67+
self.group_set = set()
68+
self.group_mock.add.side_effect = lambda task: self.group_set.add('{}-{}'.format(task.id, task.retry_id))
69+
self.group_mock.remove.side_effect = lambda task: len(self.group_set) == 0 if self.group_set.discard(
70+
'{}-{}'.format(task.id, task.retry_id)) is None else False
71+
4772
def test_worker_execution_no_group(self):
4873
msg = '{"id": "id-1", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}'
4974

@@ -81,6 +106,7 @@ def test_delay(self):
81106
def test_delay_inline(self):
82107
result = self.worker.delay(None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True)
83108

109+
self.group_mock.add.assert_not_called()
84110
self.queue_mock.add_message.assert_not_called()
85111
self.assertEqual(result, 'Hello World!')
86112

@@ -89,94 +115,82 @@ def test_delay_with_group(self):
89115

90116
self.group_mock.add.assert_called_once()
91117

92-
def test_group_callback(self):
118+
def test_retry_max_reached_execution(self):
119+
with self.assertRaises(MaxRetriesReachedException):
120+
max_retries_task.delay(execute_inline=True)
121+
122+
def test_retry_no_limit(self):
123+
retries_task.delay(10, execute_inline=True)
124+
125+
self.assertEqual(retries_task.retry_num, 10)
126+
127+
def test_group(self):
93128
settings.GROUP_CALLBACK_TASK = Mock()
94129

95130
self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True)
96131

97-
self.group_mock.remove.assert_called_once()
98132
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
99-
100133
settings.GROUP_CALLBACK_TASK = None
101134

102-
def test_group_callback_string(self):
103-
settings.GROUP_CALLBACK_TASK = 'eb_sqs.tests.worker.tests_worker.global_group_mock'
135+
def test_group_with_exception(self):
136+
settings.GROUP_CALLBACK_TASK = Mock()
137+
self.setUpGroupsHandling()
104138

105-
self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True)
139+
with self.assertRaises(TestException):
140+
exception_group_task.delay(group_id='group-id', execute_inline=True)
106141

107-
self.group_mock.remove.assert_called_once()
108-
global_group_mock.delay.assert_called_once()
142+
self.assertEqual(len(self.group_set), 0)
143+
self.assertEqual(self.group_mock.add.call_count, 1)
144+
self.assertEqual(self.group_mock.remove.call_count, 1)
109145

146+
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
110147
settings.GROUP_CALLBACK_TASK = None
111148

112-
def test_group(self):
149+
def test_group_retries(self):
113150
settings.GROUP_CALLBACK_TASK = Mock()
114-
115-
group_set = set()
116-
self.group_mock.add.side_effect = lambda task: group_set.add('{}-{}'.format(task.id, task.retry_id))
117-
self.group_mock.remove.side_effect = lambda task: len(group_set) == 0 if group_set.discard(
118-
'{}-{}'.format(task.id, task.retry_id)) is None else False
151+
self.setUpGroupsHandling()
119152

120153
repeating_group_task.delay(3, group_id='group-id', execute_inline=True)
121154

122-
self.assertEqual(len(group_set), 0)
123-
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
155+
self.assertEqual(len(self.group_set), 0)
156+
self.assertEqual(self.group_mock.add.call_count, 4)
157+
self.assertEqual(self.group_mock.remove.call_count, 4)
124158

159+
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
125160
settings.GROUP_CALLBACK_TASK = None
126161

127-
def test_group_match_retries_reached(self):
162+
def test_group_exception_in_retries(self):
128163
settings.GROUP_CALLBACK_TASK = Mock()
164+
self.setUpGroupsHandling()
129165

130-
group_set = set()
131-
self.group_mock.add.side_effect = lambda task: group_set.add('{}-{}'.format(task.id, task.retry_id))
132-
self.group_mock.remove.side_effect = lambda task: len(group_set) == 0 if group_set.discard(
133-
'{}-{}'.format(task.id, task.retry_id)) is None else False
166+
with self.assertRaises(TestException):
167+
exception_repeating_group_task.delay(2, group_id='group-id', execute_inline=True)
134168

135-
with self.assertRaises(MaxRetriesReachedException):
136-
max_retries_group_task.delay(group_id='group-id', execute_inline=True)
169+
self.assertEqual(len(self.group_set), 0)
170+
self.assertEqual(self.group_mock.add.call_count, 3)
171+
self.assertEqual(self.group_mock.remove.call_count, 3)
137172

138-
self.assertEqual(len(group_set), 0)
139173
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
140-
141174
settings.GROUP_CALLBACK_TASK = None
142175

143-
def test_retry_execution(self):
144-
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, 0, None, False)
145-
self.assertEqual(dummy_task.retry_num, 0)
146-
147-
self.worker.retry(task, 0, False, True)
148-
149-
self.queue_mock.add_message.assert_called_once()
150-
self.assertEqual(task.id, 'id')
151-
self.assertEqual(task.retry, 1)
152-
self.assertIsNotNone(task.retry_id)
153-
154-
def test_retry_max_reached_execution(self):
155-
dummy_task.retry_num = 0
176+
def test_group_match_retries_reached(self):
177+
settings.GROUP_CALLBACK_TASK = Mock()
178+
self.setUpGroupsHandling()
156179

157180
with self.assertRaises(MaxRetriesReachedException):
158-
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 2, 0, None, False)
159-
self.assertEqual(dummy_task.retry_num, 0)
160-
161-
self.worker.retry(task, 0, True, True)
162-
self.assertEqual(dummy_task.retry_num, 1)
163-
164-
self.worker.retry(task, 0, True, True)
165-
self.assertEqual(dummy_task.retry_num, 2)
166-
167-
self.worker.retry(task, 0, True, True)
181+
max_retries_group_task.delay(group_id='group-id', execute_inline=True)
168182

169-
def test_retry_no_limit(self):
170-
dummy_task.retry_num = 0
183+
self.assertEqual(len(self.group_set), 0)
184+
self.assertEqual(self.group_mock.add.call_count, 5)
185+
self.assertEqual(self.group_mock.remove.call_count, 5)
171186

172-
task = WorkerTask('id', None, 'queue', dummy_task, [], {'msg': 'Hello World!'}, 2, 0, None, False)
173-
self.assertEqual(dummy_task.retry_num, 0)
187+
settings.GROUP_CALLBACK_TASK.delay.assert_called_once()
188+
settings.GROUP_CALLBACK_TASK = None
174189

175-
self.worker.retry(task, 0, True, False)
176-
self.assertEqual(dummy_task.retry_num, 0)
190+
def test_group_callback_string(self):
191+
settings.GROUP_CALLBACK_TASK = 'eb_sqs.tests.worker.tests_worker.global_group_mock'
177192

178-
self.worker.retry(task, 0, True, False)
179-
self.assertEqual(dummy_task.retry_num, 0)
193+
self.worker.delay('group-id', 'queue', dummy_task, [], {'msg': 'Hello World!'}, 5, False, 0, True)
180194

181-
self.worker.retry(task, 0, True, False)
182-
self.assertEqual(dummy_task.retry_num, 0)
195+
global_group_mock.delay.assert_called_once()
196+
settings.GROUP_CALLBACK_TASK = None

eb_sqs/worker/worker.py

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,17 @@ def delay(self, group_id, queue_name, func, args, kwargs, max_retries, use_pickl
8181

8282
def retry(self, worker_task, delay, execute_inline, count_retries):
8383
# type: (WorkerTask, int, bool, bool) -> Any
84+
worker_task = worker_task.copy(settings.FORCE_SERIALIZATION)
85+
worker_task.retry_id = unicode(uuid.uuid4())
8486
return self._enqueue_task(worker_task, delay, execute_inline, True, count_retries)
8587

8688
def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retries):
8789
# type: (WorkerTask, int, bool, bool, bool) -> Any
8890
try:
89-
if is_retry:
90-
if count_retries:
91-
worker_task.retry += 1
92-
if worker_task.retry > worker_task.max_retries:
93-
raise MaxRetriesReachedException(worker_task.retry)
94-
worker_task.retry_scheduled = True
95-
worker_task.retry_id = unicode(uuid.uuid4())
91+
if is_retry and count_retries:
92+
worker_task.retry += 1
93+
if worker_task.retry >= worker_task.max_retries:
94+
raise MaxRetriesReachedException(worker_task.retry)
9695

9796
self._add_to_group(worker_task)
9897

@@ -107,22 +106,15 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
107106
', inline' if execute_inline else '')
108107

109108
if execute_inline:
110-
if settings.FORCE_SERIALIZATION:
111-
return self._execute_task(WorkerTask.deserialize(worker_task.serialize()))
112-
else:
113-
return self._execute_task(worker_task)
109+
return self._execute_task(worker_task)
114110
else:
115111
self.queue_client.add_message(worker_task.queue, worker_task.serialize(), delay)
116112
return None
117-
except MaxRetriesReachedException:
118-
self._remove_from_group(worker_task)
119-
raise
120113
except QueueDoesNotExistException as ex:
121114
self._remove_from_group(worker_task)
122115
raise InvalidQueueException(ex.queue_name)
123116
except QueueClientException as ex:
124117
self._remove_from_group(worker_task)
125-
126118
logger.exception('Task %s (%s, retry-id: %s) failed to enqueue to %s: %s',
127119
worker_task.abs_func_name,
128120
worker_task.id,
@@ -131,20 +123,18 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retr
131123
ex)
132124

133125
raise QueueException()
134-
except Exception:
135-
self._remove_from_group(worker_task)
136-
raise
137126

138127
def _execute_task(self, worker_task):
139128
# type: (WorkerTask) -> Any
140-
worker_task.retry_scheduled = False
141-
result = worker_task.execute()
142-
self._remove_from_group(worker_task)
143-
return result
129+
try:
130+
result = worker_task.execute()
131+
return result
132+
finally:
133+
self._remove_from_group(worker_task)
144134

145135
def _add_to_group(self, worker_task):
146136
# type: (WorkerTask) -> None
147-
if worker_task.group_id and not worker_task.retry_scheduled:
137+
if worker_task.group_id:
148138
logger.debug(
149139
'Add task %s (%s, retry-id: %s) to group %s',
150140
worker_task.abs_func_name,
@@ -157,7 +147,7 @@ def _add_to_group(self, worker_task):
157147

158148
def _remove_from_group(self, worker_task):
159149
# type: (WorkerTask) -> None
160-
if worker_task.group_id and not worker_task.retry_scheduled:
150+
if worker_task.group_id:
161151
logger.debug(
162152
'Remove task %s (%s, retry-id: %s) from group %s',
163153
worker_task.abs_func_name,

eb_sqs/worker/worker_task.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ def __init__(self, id, group_id, queue, func, args, kwargs, max_retries, retry,
2626
self.use_pickle = use_pickle
2727

2828
self.abs_func_name = '{}.{}'.format(self.func.__module__, self.func.func_name)
29-
self.retry_scheduled = False
3029

3130
def execute(self):
3231
# type: () -> Any
@@ -55,6 +54,24 @@ def serialize(self):
5554

5655
return json.dumps(task)
5756

57+
def copy(self, use_serialization):
58+
# type: (bool) -> WorkerTask
59+
if use_serialization:
60+
return WorkerTask.deserialize(self.serialize())
61+
else:
62+
return WorkerTask(
63+
self.id,
64+
self.group_id,
65+
self.queue,
66+
self.func,
67+
self.args,
68+
self.kwargs,
69+
self.max_retries,
70+
self.retry,
71+
self.retry_id,
72+
self.use_pickle,
73+
)
74+
5875
@staticmethod
5976
def _pickle_args(args):
6077
# type: (dict) -> unicode

0 commit comments

Comments
 (0)