Skip to content

Commit f3a8621

Browse files
author
Itay Bleier
committed
Fixes to the process_queue command
1 parent 23ff900 commit f3a8621

File tree

2 files changed

+20
-14
lines changed

2 files changed

+20
-14
lines changed

eb_sqs/management/commands/process_queue.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from django.core.management import BaseCommand, CommandError
77

88
from eb_sqs import settings
9+
from eb_sqs.worker.worker import Worker
910
from eb_sqs.worker.worker_factory import WorkerFactory
1011

1112
logger = logging.getLogger(__name__)
@@ -23,14 +24,16 @@ def handle(self, *args, **options):
2324
if not options['queue_names']:
2425
raise CommandError('Queue names (--queues) not specified')
2526

26-
queue_names = options['queue_names'].split(',')
27+
queue_names = [queue_name.trim() for queue_name in options['queue_names'].split(',')]
2728

28-
logger.debug('Connecting to SQS: {}'.format(', '.join(queue_names)))
29+
logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))
2930

3031
sqs = boto3.resource('sqs', region_name=settings.AWS_REGION)
3132
queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
3233

33-
logger.debug('Connected to SQS: {}'.format(', '.join(queue_names)))
34+
logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))
35+
36+
worker = WorkerFactory.default().create()
3437

3538
while True:
3639
for queue in queues:
@@ -40,13 +43,16 @@ def handle(self, *args, **options):
4043
)
4144

4245
for msg in messages:
43-
logger.debug('Read message {}'.format(msg.message_id))
44-
self._process_message(msg)
45-
logger.debug('Processed message {}'.format(msg.message_id))
46-
msg.delete()
47-
logger.debug('Deleted message {}'.format(msg.message_id))
48-
49-
def _process_message(self, message):
50-
# type: (Message) -> None
51-
worker = WorkerFactory.default().create()
52-
worker.execute(message.body)
46+
self._process_message(msg, worker)
47+
48+
def _process_message(self, msg, worker):
49+
# type: (Any, Worker) -> None
50+
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
51+
try:
52+
worker.execute(msg.body)
53+
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
54+
except Exception as exc:
55+
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
56+
finally:
57+
msg.delete()
58+
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))

eb_sqs/settings.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from django.conf import settings
44

5-
AWS_REGION = getattr(settings, 'AWS_REGION', 'us-east-1') # type: unicode
5+
AWS_REGION = getattr(settings, 'EB_AWS_REGION', 'us-east-1') # type: unicode
66

77
MAX_NUMBER_OF_MESSAGES = getattr(settings, 'EB_SQS_MAX_NUMBER_OF_MESSAGES', 10) # type: int
88
WAIT_TIME_S = getattr(settings, 'EB_SQS_WAIT_TIME_S', 2) # type: int

0 commit comments

Comments
 (0)