Skip to content

Commit 54f8227

Browse files
committed
Django Signals to expose the WorkerService progress
1 parent cad568f commit 54f8227

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ python manage.py process_queue --queues queue1,queue2 # process queue1 and queue
113113
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'
114114
```
115115

116+
Use the signals `MESSAGES_RECEIVED`, `MESSAGES_PROCESSED`, `MESSAGES_DELETED` of the `WorkerService` to get informed about the current SQS batch being processed by the management command.
117+
116118
#### Group Tasks
117119
Multiple tasks can be grouped by specifying the `group_id` argument when calling `delay` on a task.
118120
If all tasks of a specific group are executed then the group callback task specified by `EB_SQS_GROUP_CALLBACK_TASK` is executed.

eb_sqs/worker/service.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import boto3
77
from botocore.config import Config
88
from botocore.exceptions import ClientError
9+
import django.dispatch
910
from django.utils import timezone
1011

1112
from eb_sqs import settings
@@ -16,10 +17,13 @@
1617

1718
logger = logging.getLogger(__name__)
1819

20+
MESSAGES_RECEIVED = django.dispatch.Signal(providing_args=['messages'])
21+
MESSAGES_PROCESSED = django.dispatch.Signal(providing_args=['messages'])
22+
MESSAGES_DELETED = django.dispatch.Signal(providing_args=['messages'])
23+
1924

2025
class WorkerService(object):
2126
_PREFIX_STR = 'prefix:'
22-
2327
_RECEIVE_COUNT_ATTRIBUTE = 'ApproximateReceiveCount'
2428

2529
def process_queues(self, queue_names):
@@ -70,16 +74,21 @@ def process_messages(self, queues, worker, static_queues):
7074
messages = self.poll_messages(queue)
7175
logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages)))
7276

73-
msg_entries = []
77+
MESSAGES_RECEIVED.send(sender=self.__class__, messages=messages)
7478

79+
msg_entries = []
7580
for msg in messages:
7681
self.process_message(msg, worker)
7782
msg_entries.append({
7883
'Id': msg.message_id,
7984
'ReceiptHandle': msg.receipt_handle
8085
})
8186

87+
MESSAGES_PROCESSED.send(sender=self.__class__, messages=messages)
88+
8289
self.delete_messages(queue, msg_entries)
90+
91+
MESSAGES_DELETED.send(sender=self.__class__, messages=messages)
8392
except ClientError as exc:
8493
error_code = exc.response.get('Error', {}).get('Code', None)
8594
if error_code == 'AWS.SimpleQueueService.NonExistentQueue' and queue not in static_queues:

0 commit comments

Comments
 (0)