Skip to content

Commit e19fa45

Browse files
authored
Merge pull request #17 from cuda-networks/refactor_command
refactor logic out of command to support instrumentation
2 parents 80853d2 + 8dae7a3 commit e19fa45

File tree

3 files changed

+123
-80
lines changed

3 files changed

+123
-80
lines changed
Lines changed: 2 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,13 @@
11
from __future__ import absolute_import, unicode_literals
22

3-
from datetime import timedelta, datetime
4-
5-
import boto3
6-
import logging
7-
8-
from botocore.config import Config
93
from django.core.management import BaseCommand, CommandError
10-
from django.utils import timezone
11-
12-
from eb_sqs import settings
13-
from eb_sqs.worker.worker import Worker
14-
from eb_sqs.worker.worker_factory import WorkerFactory
154

16-
logger = logging.getLogger(__name__)
5+
from eb_sqs.worker.service import WorkerService
176

187

198
class Command(BaseCommand):
209
help = 'Command to process tasks from one or more SQS queues'
2110

22-
PREFIX_STR = 'prefix:'
23-
2411
def add_arguments(self, parser):
2512
parser.add_argument('--queues', '-q',
2613
dest='queue_names',
@@ -32,68 +19,4 @@ def handle(self, *args, **options):
3219

3320
queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')]
3421

35-
logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))
36-
37-
sqs = boto3.resource(
38-
'sqs',
39-
region_name=settings.AWS_REGION,
40-
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
41-
)
42-
43-
prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
44-
queues = self._get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))
45-
46-
queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
47-
static_queues = queues
48-
last_update_time = timezone.make_aware(datetime.min)
49-
50-
logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))
51-
52-
worker = WorkerFactory.default().create()
53-
54-
while True:
55-
if len(queue_prefixes) > 0 and \
56-
timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time:
57-
queues = static_queues + self._get_queues_by_prefixes(sqs, queue_prefixes)
58-
last_update_time = timezone.now()
59-
logger.info('[django-eb-sqs] Updated SQS queues: {}'.format(
60-
', '.join([queue.url for queue in queues])
61-
))
62-
63-
for queue in queues:
64-
try:
65-
messages = queue.receive_messages(
66-
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
67-
WaitTimeSeconds=settings.WAIT_TIME_S,
68-
)
69-
70-
for msg in messages:
71-
self._process_message(msg, worker)
72-
except Exception as exc:
73-
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)
74-
75-
@staticmethod
76-
def _process_message(msg, worker):
77-
# type: (Any, Worker) -> None
78-
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
79-
try:
80-
worker.execute(msg.body)
81-
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
82-
except Exception as exc:
83-
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
84-
finally:
85-
msg.delete()
86-
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))
87-
88-
@staticmethod
89-
def _get_queues_by_names(sqs, queue_names):
90-
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
91-
92-
@staticmethod
93-
def _get_queues_by_prefixes(sqs, prefixes):
94-
queues = []
95-
96-
for prefix in prefixes:
97-
queues += sqs.queues.filter(QueueNamePrefix=prefix)
98-
99-
return queues
22+
WorkerService().process_queues(queue_names)

eb_sqs/worker/service.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
from __future__ import absolute_import, unicode_literals
2+
3+
from datetime import timedelta, datetime
4+
5+
import boto3
6+
import logging
7+
8+
from botocore.config import Config
9+
from django.utils import timezone
10+
11+
from eb_sqs import settings
12+
from eb_sqs.worker.worker import Worker
13+
from eb_sqs.worker.worker_factory import WorkerFactory
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class WorkerService(object):
19+
PREFIX_STR = 'prefix:'
20+
21+
def process_queues(self, queue_names):
22+
# type: (list) -> None
23+
logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))
24+
25+
sqs = boto3.resource(
26+
'sqs',
27+
region_name=settings.AWS_REGION,
28+
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
29+
)
30+
31+
prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
32+
queues = self.get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))
33+
34+
queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
35+
static_queues = queues
36+
last_update_time = timezone.make_aware(datetime.min)
37+
38+
logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))
39+
40+
worker = WorkerFactory.default().create()
41+
42+
logger.info('[django-eb-sqs] WAIT_TIME_S = {}'.format(settings.WAIT_TIME_S))
43+
logger.info('[django-eb-sqs] MAX_NUMBER_OF_MESSAGES = {}'.format(settings.MAX_NUMBER_OF_MESSAGES))
44+
logger.info('[django-eb-sqs] AUTO_ADD_QUEUE = {}'.format(settings.AUTO_ADD_QUEUE))
45+
logger.info('[django-eb-sqs] QUEUE_PREFIX = {}'.format(settings.QUEUE_PREFIX))
46+
logger.info('[django-eb-sqs] DEFAULT_QUEUE = {}'.format(settings.DEFAULT_QUEUE))
47+
logger.info('[django-eb-sqs] DEFAULT_MAX_RETRIES = {}'.format(settings.DEFAULT_MAX_RETRIES))
48+
logger.info('[django-eb-sqs] REFRESH_PREFIX_QUEUES_S = {}'.format(settings.REFRESH_PREFIX_QUEUES_S))
49+
50+
while True:
51+
if len(queue_prefixes) > 0 and \
52+
timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time:
53+
queues = static_queues + self.get_queues_by_prefixes(sqs, queue_prefixes)
54+
last_update_time = timezone.now()
55+
logger.info('[django-eb-sqs] Updated SQS queues: {}'.format(
56+
', '.join([queue.url for queue in queues])
57+
))
58+
59+
logger.debug('[django-eb-sqs] Processing {} queues'.format(len(queues)))
60+
self.process_messages(queues, worker)
61+
62+
def process_messages(self, queues, worker):
63+
# type: (list, Worker) -> None
64+
for queue in queues:
65+
try:
66+
messages = self.poll_messages(queue)
67+
logger.debug('[django-eb-sqs] Polled {} messages'.format(len(messages)))
68+
69+
msg_entries = []
70+
71+
for msg in messages:
72+
self.process_message(msg, worker)
73+
msg_entries.append({
74+
'Id': msg.message_id,
75+
'ReceiptHandle': msg.receipt_handle
76+
})
77+
78+
self.delete_messages(queue, msg_entries)
79+
except Exception as exc:
80+
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)
81+
82+
def delete_messages(self, queue, msg_entries):
83+
# type: (Queue, list) -> None
84+
if len(msg_entries) > 0:
85+
response = queue.delete_messages(Entries=msg_entries)
86+
logger.debug('[django-eb-sqs] Deleted {} messages successfully'.format(
87+
len(response.get('Successful', []))
88+
))
89+
logger.debug('[django-eb-sqs] Failed deleting {} messages'.format(
90+
len(response.get('Failed', []))
91+
))
92+
93+
def poll_messages(self, queue):
94+
# type: (Queue) -> list
95+
return queue.receive_messages(
96+
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
97+
WaitTimeSeconds=settings.WAIT_TIME_S,
98+
)
99+
100+
def process_message(self, msg, worker):
101+
# type: (Message, Worker) -> None
102+
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
103+
try:
104+
worker.execute(msg.body)
105+
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
106+
except Exception as exc:
107+
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
108+
109+
def get_queues_by_names(self, sqs, queue_names):
110+
# type: (ServiceResource, list) -> list
111+
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
112+
113+
def get_queues_by_prefixes(self, sqs, prefixes):
114+
# type: (ServiceResource, list) -> list
115+
queues = []
116+
117+
for prefix in prefixes:
118+
queues += sqs.queues.filter(QueueNamePrefix=prefix)
119+
120+
return queues

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
setup(
88
name='django-eb-sqs',
9-
version='1.0',
9+
version='1.01',
1010
package_dir={'eb_sqs': 'eb_sqs'},
1111
include_package_data=True,
1212
packages=find_packages(),

0 commit comments

Comments
 (0)