Skip to content

Commit 2540ad8

Browse files
authored
Merge pull request #33 from cuda-networks/fix_requeued_log
print only relevant fields to log in case of SQS re-queueing
2 parents 2c5dff0 + 456dfb9 commit 2540ad8

File tree

4 files changed

+23
-12
lines changed

4 files changed

+23
-12
lines changed

eb_sqs/tests/worker/tests_worker.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,44 +12,54 @@
1212
from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException
1313
from eb_sqs.worker.worker_factory import WorkerFactory
1414

15+
1516
class TestException(Exception):
1617
pass
1718

19+
1820
@task()
1921
def dummy_task(msg):
2022
return msg
2123

24+
2225
@task(max_retries=100)
2326
def retries_task(num_of_retries):
2427
if retries_task.retry_num < num_of_retries:
2528
retries_task.retry(execute_inline=True)
2629

30+
2731
@task(max_retries=5)
2832
def max_retries_task():
2933
max_retries_task.retry(execute_inline=True)
3034

35+
3136
@task(max_retries=100)
3237
def repeating_group_task(num_of_retries):
3338
if repeating_group_task.retry_num < num_of_retries:
3439
repeating_group_task.retry(execute_inline=True)
3540

41+
3642
@task()
3743
def exception_group_task():
3844
raise TestException()
3945

46+
4047
@task(max_retries=100)
4148
def exception_repeating_group_task(num_of_retries):
4249
if exception_repeating_group_task.retry_num == num_of_retries:
4350
raise TestException()
4451
else:
4552
exception_repeating_group_task.retry(execute_inline=True)
4653

54+
4755
@task(max_retries=5)
4856
def max_retries_group_task():
4957
max_retries_group_task.retry(execute_inline=True)
5058

59+
5160
global_group_mock = Mock()
5261

62+
5363
class WorkerTest(TestCase):
5464
def setUp(self):
5565
settings.DEAD_LETTER_MODE = False
@@ -65,14 +75,14 @@ def setUp(self):
6575

6676
def setUpGroupsHandling(self):
6777
self.group_set = set()
68-
self.group_mock.add.side_effect = lambda task: self.group_set.add('{}-{}'.format(task.id, task.retry_id))
69-
self.group_mock.remove.side_effect = lambda task: len(self.group_set) == 0 if self.group_set.discard(
70-
'{}-{}'.format(task.id, task.retry_id)) is None else False
78+
self.group_mock.add.side_effect = lambda tsk: self.group_set.add('{}-{}'.format(tsk.id, tsk.retry_id))
79+
self.group_mock.remove.side_effect = lambda tsk: len(self.group_set) == 0 if self.group_set.discard(
80+
'{}-{}'.format(tsk.id, tsk.retry_id)) is None else False
7181

7282
def test_worker_execution_no_group(self):
7383
msg = '{"id": "id-1", "retry": 0, "queue": "default", "maxRetries": 5, "args": [], "func": "eb_sqs.tests.worker.tests_worker.dummy_task", "kwargs": {"msg": "Hello World!"}}'
7484

75-
result = self.worker.execute(msg)
85+
result = self.worker.execute(msg, 2)
7686

7787
self.assertEqual(result, 'Hello World!')
7888
self.group_mock.remove.assert_not_called()

eb_sqs/worker/service.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,9 @@ def process_message(self, msg, worker):
122122
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
123123
try:
124124
receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE])
125-
if receive_count > 1:
126-
logger.warning('[django-eb-sqs] SQS re-queued message {} times: Msg Id: {} Body: {}'.format(
127-
receive_count, msg.message_id, msg.body
128-
))
129125

130126
with django_db_management():
131-
worker.execute(msg.body)
127+
worker.execute(msg.body, receive_count)
132128

133129
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
134130
except ExecutionFailedException as exc:

eb_sqs/worker/worker.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,15 @@ def __init__(self, queue_client, group_client):
2323
self.queue_client = queue_client
2424
self.group_client = group_client
2525

26-
def execute(self, msg):
27-
# type: (unicode) -> Any
26+
def execute(self, msg, receive_count=1):
27+
# type: (unicode, int) -> Any
2828
try:
2929
worker_task = WorkerTask.deserialize(msg)
30+
31+
if receive_count > 1:
32+
logger.warning('[django-eb-sqs] SQS re-queued message {} times - queue: {} func: {} retry: {}'.format(
33+
receive_count, worker_task.queue, worker_task.func, worker_task.retry
34+
))
3035
except Exception as ex:
3136
logger.exception(
3237
'Message %s is not a valid worker task: %s',

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
setup(
88
name='django-eb-sqs',
9-
version='1.14',
9+
version='1.15',
1010
package_dir={'eb_sqs': 'eb_sqs'},
1111
include_package_data=True,
1212
packages=find_packages(),

0 commit comments

Comments
 (0)