Skip to content

Commit 87132fe

Browse files
author
Alexey Tsitkin
committed
error handling and db management for signals
1 parent 6c45b64 commit 87132fe

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

eb_sqs/worker/service.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,21 @@ def process_messages(self, queues, worker, static_queues):
8181
messages = self.poll_messages(queue)
8282
logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages)))
8383

84-
MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages)
84+
self._execute_user_code(lambda: MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages))
8585

8686
msg_entries = []
8787
for msg in messages:
88-
self.process_message(msg, worker)
88+
self._execute_user_code(lambda: self._process_message(msg, worker))
8989
msg_entries.append({
9090
'Id': msg.message_id,
9191
'ReceiptHandle': msg.receipt_handle
9292
})
9393

94-
MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages)
94+
self._execute_user_code(lambda: MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages))
9595

9696
self.delete_messages(queue, msg_entries)
9797

98-
MESSAGES_DELETED.send(sender=self.__class__, messages=messages)
98+
self._execute_user_code(lambda: MESSAGES_DELETED.send(sender=self.__class__, messages=messages))
9999
except ClientError as exc:
100100
error_code = exc.response.get('Error', {}).get('Code', None)
101101
if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues:
@@ -124,18 +124,24 @@ def poll_messages(self, queue):
124124
AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE]
125125
)
126126

127-
def process_message(self, msg, worker):
127+
def _process_message(self, msg, worker):
128128
# type: (Message, Worker) -> None
129129
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
130130
try:
131131
receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE])
132132

133-
with django_db_management():
134-
worker.execute(msg.body, receive_count)
133+
worker.execute(msg.body, receive_count)
135134

136135
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
137136
except ExecutionFailedException as exc:
138137
logger.warning('[django-eb-sqs] Handling message {} got error: {}'.format(msg.message_id, repr(exc)))
138+
139+
@staticmethod
140+
def _execute_user_code(function):
141+
# type: (Any) -> None
142+
try:
143+
with django_db_management():
144+
function()
139145
except Exception as exc:
140146
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
141147

0 commit comments

Comments
 (0)