1616
1717
1818class WorkerService (object ):
19- PREFIX_STR = 'prefix:'
19+ _PREFIX_STR = 'prefix:'
20+
21+ _RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount'
2022
2123 def process_queues (self , queue_names ):
2224 # type: (list) -> None
@@ -28,10 +30,10 @@ def process_queues(self, queue_names):
2830 config = Config (retries = {'max_attempts' : settings .AWS_MAX_RETRIES })
2931 )
3032
31- prefixes = list (filter (lambda qn : qn .startswith (self .PREFIX_STR ), queue_names ))
33+ prefixes = list (filter (lambda qn : qn .startswith (self ._PREFIX_STR ), queue_names ))
3234 queues = self .get_queues_by_names (sqs , list (set (queue_names ) - set (prefixes )))
3335
34- queue_prefixes = [prefix .split (self .PREFIX_STR )[1 ] for prefix in prefixes ]
36+ queue_prefixes = [prefix .split (self ._PREFIX_STR )[1 ] for prefix in prefixes ]
3537 static_queues = queues
3638 last_update_time = timezone .make_aware (datetime .min )
3739
@@ -95,12 +97,19 @@ def poll_messages(self, queue):
9597 return queue .receive_messages (
9698 MaxNumberOfMessages = settings .MAX_NUMBER_OF_MESSAGES ,
9799 WaitTimeSeconds = settings .WAIT_TIME_S ,
100+ AttributeNames = [self ._RECEIVE_COUNT_ATTRIBUTE ]
98101 )
99102
100103 def process_message (self , msg , worker ):
101104 # type: (Message, Worker) -> None
102105 logger .debug ('[django-eb-sqs] Read message {}' .format (msg .message_id ))
103106 try :
107+ receive_count = int (msg .attributes [self ._RECEIVE_COUNT_ATTRIBUTE ])
108+ if receive_count > 1 :
109+ logger .warning ('[django-eb-sqs] SQS re-queued message {} times: Msg Id: {} Body: {}' .format (
110+ receive_count , msg .message_id , msg .body
111+ ))
112+
104113 worker .execute (msg .body )
105114 logger .debug ('[django-eb-sqs] Processed message {}' .format (msg .message_id ))
106115 except Exception as exc :
0 commit comments