Skip to content

Commit 75fa40b

Browse files
author
Alexey Tsitkin
committed
refactor logic out of command to support instrumentation
1 parent 80853d2 commit 75fa40b

File tree

2 files changed

+99
-79
lines changed

2 files changed

+99
-79
lines changed
Lines changed: 2 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,13 @@
11
from __future__ import absolute_import, unicode_literals
22

3-
from datetime import timedelta, datetime
4-
5-
import boto3
6-
import logging
7-
8-
from botocore.config import Config
93
from django.core.management import BaseCommand, CommandError
10-
from django.utils import timezone
11-
12-
from eb_sqs import settings
13-
from eb_sqs.worker.worker import Worker
14-
from eb_sqs.worker.worker_factory import WorkerFactory
154

16-
logger = logging.getLogger(__name__)
5+
from eb_sqs.worker.service import WorkerService
176

187

198
class Command(BaseCommand):
209
help = 'Command to process tasks from one or more SQS queues'
2110

22-
PREFIX_STR = 'prefix:'
23-
2411
def add_arguments(self, parser):
2512
parser.add_argument('--queues', '-q',
2613
dest='queue_names',
@@ -32,68 +19,4 @@ def handle(self, *args, **options):
3219

3320
queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')]
3421

35-
logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))
36-
37-
sqs = boto3.resource(
38-
'sqs',
39-
region_name=settings.AWS_REGION,
40-
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
41-
)
42-
43-
prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
44-
queues = self._get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))
45-
46-
queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
47-
static_queues = queues
48-
last_update_time = timezone.make_aware(datetime.min)
49-
50-
logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))
51-
52-
worker = WorkerFactory.default().create()
53-
54-
while True:
55-
if len(queue_prefixes) > 0 and \
56-
timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time:
57-
queues = static_queues + self._get_queues_by_prefixes(sqs, queue_prefixes)
58-
last_update_time = timezone.now()
59-
logger.info('[django-eb-sqs] Updated SQS queues: {}'.format(
60-
', '.join([queue.url for queue in queues])
61-
))
62-
63-
for queue in queues:
64-
try:
65-
messages = queue.receive_messages(
66-
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
67-
WaitTimeSeconds=settings.WAIT_TIME_S,
68-
)
69-
70-
for msg in messages:
71-
self._process_message(msg, worker)
72-
except Exception as exc:
73-
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)
74-
75-
@staticmethod
76-
def _process_message(msg, worker):
77-
# type: (Any, Worker) -> None
78-
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
79-
try:
80-
worker.execute(msg.body)
81-
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
82-
except Exception as exc:
83-
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
84-
finally:
85-
msg.delete()
86-
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))
87-
88-
@staticmethod
89-
def _get_queues_by_names(sqs, queue_names):
90-
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
91-
92-
@staticmethod
93-
def _get_queues_by_prefixes(sqs, prefixes):
94-
queues = []
95-
96-
for prefix in prefixes:
97-
queues += sqs.queues.filter(QueueNamePrefix=prefix)
98-
99-
return queues
22+
WorkerService().process_queues(queue_names)

eb_sqs/worker/service.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from __future__ import absolute_import, unicode_literals
2+
3+
from datetime import timedelta, datetime
4+
5+
import boto3
6+
import logging
7+
8+
from botocore.config import Config
9+
from django.utils import timezone
10+
11+
from eb_sqs import settings
12+
from eb_sqs.worker.worker import Worker
13+
from eb_sqs.worker.worker_factory import WorkerFactory
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class WorkerService(object):
19+
PREFIX_STR = 'prefix:'
20+
21+
def process_queues(self, queue_names):
22+
logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))
23+
24+
sqs = boto3.resource(
25+
'sqs',
26+
region_name=settings.AWS_REGION,
27+
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
28+
)
29+
30+
prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
31+
queues = self.get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))
32+
33+
queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
34+
static_queues = queues
35+
last_update_time = timezone.make_aware(datetime.min)
36+
37+
logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))
38+
39+
worker = WorkerFactory.default().create()
40+
41+
logger.info('[django-eb-sqs] WAIT_TIME_S = {}'.format(settings.WAIT_TIME_S))
42+
logger.info('[django-eb-sqs] MAX_NUMBER_OF_MESSAGES = {}'.format(settings.MAX_NUMBER_OF_MESSAGES))
43+
logger.info('[django-eb-sqs] AUTO_ADD_QUEUE = {}'.format(settings.AUTO_ADD_QUEUE))
44+
logger.info('[django-eb-sqs] QUEUE_PREFIX = {}'.format(settings.QUEUE_PREFIX))
45+
logger.info('[django-eb-sqs] DEFAULT_QUEUE = {}'.format(settings.DEFAULT_QUEUE))
46+
logger.info('[django-eb-sqs] DEFAULT_MAX_RETRIES = {}'.format(settings.DEFAULT_MAX_RETRIES))
47+
logger.info('[django-eb-sqs] REFRESH_PREFIX_QUEUES_S = {}'.format(settings.REFRESH_PREFIX_QUEUES_S))
48+
49+
while True:
50+
if len(queue_prefixes) > 0 and \
51+
timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time:
52+
queues = static_queues + self.get_queues_by_prefixes(sqs, queue_prefixes)
53+
last_update_time = timezone.now()
54+
logger.info('[django-eb-sqs] Updated SQS queues: {}'.format(
55+
', '.join([queue.url for queue in queues])
56+
))
57+
58+
self.process_messages(queues, worker)
59+
60+
def process_messages(self, queues, worker):
61+
for queue in queues:
62+
try:
63+
messages = self.poll_messages(queue)
64+
65+
for msg in messages:
66+
self.process_message(msg, worker)
67+
except Exception as exc:
68+
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)
69+
70+
def poll_messages(self, queue):
71+
return queue.receive_messages(
72+
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
73+
WaitTimeSeconds=settings.WAIT_TIME_S,
74+
)
75+
76+
def process_message(self, msg, worker):
77+
# type: (Any, Worker) -> None
78+
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
79+
try:
80+
worker.execute(msg.body)
81+
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
82+
except Exception as exc:
83+
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
84+
finally:
85+
msg.delete()
86+
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))
87+
88+
def get_queues_by_names(self, sqs, queue_names):
89+
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
90+
91+
def get_queues_by_prefixes(self, sqs, prefixes):
92+
queues = []
93+
94+
for prefix in prefixes:
95+
queues += sqs.queues.filter(QueueNamePrefix=prefix)
96+
97+
return queues

0 commit comments

Comments
 (0)