@@ -81,7 +81,7 @@ def process_messages(self, queues, worker, static_queues):
8181 messages = self .poll_messages (queue )
8282 logger .debug ('[django-eb-sqs] Polled {} messages' .format (len (messages )))
8383
84- self ._execute_user_code ( lambda : MESSAGES_RECEIVED . send ( sender = self . __class__ , messages = messages ) )
84+ self ._send_signal ( MESSAGES_RECEIVED , messages = messages )
8585
8686 msg_entries = []
8787 for msg in messages :
@@ -91,11 +91,11 @@ def process_messages(self, queues, worker, static_queues):
9191 'ReceiptHandle' : msg .receipt_handle
9292 })
9393
94- self ._execute_user_code ( lambda : MESSAGES_PROCESSED . send ( sender = self . __class__ , messages = messages ) )
94+ self ._send_signal ( MESSAGES_PROCESSED , messages = messages )
9595
9696 self .delete_messages (queue , msg_entries )
9797
98- self ._execute_user_code ( lambda : MESSAGES_DELETED . send ( sender = self . __class__ , messages = messages ) )
98+ self ._send_signal ( MESSAGES_DELETED , messages = messages )
9999 except ClientError as exc :
100100 error_code = exc .response .get ('Error' , {}).get ('Code' , None )
101101 if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues :
@@ -124,6 +124,11 @@ def poll_messages(self, queue):
124124 AttributeNames = [self ._RECEIVE_COUNT_ATTRIBUTE ]
125125 )
126126
127+ def _send_signal (self , signal , messages ):
128+ # type: (django.dispatch.Signal, list) -> None
129+ if signal .has_listeners (sender = self .__class__ ):
130+ self ._execute_user_code (lambda : signal .send (sender = self .__class__ , messages = messages ))
131+
127132 def _process_message (self , msg , worker ):
128133 # type: (Message, Worker) -> None
129134 logger .debug ('[django-eb-sqs] Read message {}' .format (msg .message_id ))
0 commit comments