Skip to content

Commit f395b48

Browse files
authored
Merge pull request #39 from cuda-networks/handle_signals_better
error handling and db management for signals
2 parents 6c45b64 + e5187a9 commit f395b48

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

eb_sqs/worker/service.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,21 +81,21 @@ def process_messages(self, queues, worker, static_queues):
8181
messages = self.poll_messages(queue)
8282
logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages)))
8383

84-
MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages)
84+
self._send_signal(MESSAGES_RECEIVED, messages=messages)
8585

8686
msg_entries = []
8787
for msg in messages:
88-
self.process_message(msg, worker)
88+
self._execute_user_code(lambda: self._process_message(msg, worker))
8989
msg_entries.append({
9090
'Id': msg.message_id,
9191
'ReceiptHandle': msg.receipt_handle
9292
})
9393

94-
MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages)
94+
self._send_signal(MESSAGES_PROCESSED, messages=messages)
9595

9696
self.delete_messages(queue, msg_entries)
9797

98-
MESSAGES_DELETED.send(sender=self.__class__, messages=messages)
98+
self._send_signal(MESSAGES_DELETED, messages=messages)
9999
except ClientError as exc:
100100
error_code = exc.response.get('Error', {}).get('Code', None)
101101
if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues:
@@ -124,18 +124,29 @@ def poll_messages(self, queue):
124124
AttributeNames=[self._RECEIVE_COUNT_ATTRIBUTE]
125125
)
126126

127-
def process_message(self, msg, worker):
127+
def _send_signal(self, signal, messages):
128+
# type: (django.dispatch.Signal, list) -> None
129+
if signal.has_listeners(sender=self.__class__):
130+
self._execute_user_code(lambda: signal.send(sender=self.__class__, messages=messages))
131+
132+
def _process_message(self, msg, worker):
128133
# type: (Message, Worker) -> None
129134
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
130135
try:
131136
receive_count = int(msg.attributes[self._RECEIVE_COUNT_ATTRIBUTE])
132137

133-
with django_db_management():
134-
worker.execute(msg.body, receive_count)
138+
worker.execute(msg.body, receive_count)
135139

136140
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
137141
except ExecutionFailedException as exc:
138142
logger.warning('[django-eb-sqs] Handling message {} got error: {}'.format(msg.message_id, repr(exc)))
143+
144+
@staticmethod
145+
def _execute_user_code(function):
146+
# type: (Any) -> None
147+
try:
148+
with django_db_management():
149+
function()
139150
except Exception as exc:
140151
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
141152

@@ -153,5 +164,6 @@ def get_queues_by_prefixes(self, sqs, prefixes):
153164
return queues
154165

155166
def write_healthcheck_file(self):
167+
# type: () -> None
156168
with open(settings.HEALTHCHECK_FILE_NAME, 'w') as file:
157169
file.write(timezone.now().isoformat())

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
setup(
88
name='django-eb-sqs',
9-
version='1.32',
9+
version='1.33',
1010
package_dir={'eb_sqs': 'eb_sqs'},
1111
include_package_data=True,
1212
packages=find_packages(),

0 commit comments

Comments
 (0)