Skip to content

Commit 80853d2

Browse files
authored
Merge pull request #16 from cuda-networks/support_queue_prefixes
Support queue prefixes
2 parents a872415 + 6341220 commit 80853d2

File tree

6 files changed

+76
-19
lines changed

6 files changed

+76
-19
lines changed

README.md

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ from eb_sqs.decorators import task
2525

2626
@task(queue_name='test')
2727
def echo(message):
28-
print message
28+
print(message)
2929

3030
echo.delay(message='Hello World!')
3131
```
@@ -56,10 +56,10 @@ from eb_sqs.decorators import task
5656

5757
@task(queue_name='test', max_retries=5)
5858
def upload_file(message):
59-
print '# of retries: {}'.format(upload_file.retry_num)
59+
print('# of retries: {}'.format(upload_file.retry_num))
6060
try:
6161
# upload ...
62-
expect ConnectionException:
62+
except ConnectionException:
6363
upload_file.retry()
6464
```
6565

@@ -69,7 +69,7 @@ The retry call supports the `delay` and `execute_inline` arguments in order to d
6969

7070
#### Executing Tasks
7171

72-
The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifing the url mapping in your `urls.py` file.
72+
The Elastic Beanstalk Worker Tier sends all tasks to a API endpoint. django-eb-sqs has already such an endpoint which can be used by specifying the url mapping in your `urls.py` file.
7373

7474
```python
7575
urlpatterns = [
@@ -105,9 +105,16 @@ python manage.py process_queue --queues <comma-delimited list of queue names>
105105

106106
This is a good idea for someone who wants to execute tasks without an Elastic Beanstalk worker.
107107

108+
You can either use full queue names, or queue prefix using `prefix:*my_example_prefix*` notation.
109+
110+
Examples:
111+
```bash
112+
python manage.py process_queue --queues queue1,queue2 # process queue1 and queue2
113+
python manage.py process_queue --queues queue1,prefix:pr1-,queue2 # process queue1, queue2 and any queue whose name starts with 'pr1-'
114+
```
108115

109116
#### Group Tasks
110-
Multiple tasks can be grouped by specifing the `group_id` argument when calling `delay` on a task.
117+
Multiple tasks can be grouped by specifying the `group_id` argument when calling `delay` on a task.
111118
If all tasks of a specific group are executed then the group callback task specified by `EB_SQS_GROUP_CALLBACK_TASK` is executed.
112119

113120
Example calls:
@@ -134,6 +141,8 @@ The following settings can be used to fine tune django-eb-sqs. Copy them into yo
134141
- EB_SQS_MAX_NUMBER_OF_MESSAGES (`10`): The maximum number of messages to read in a single call from SQS (<= 10).
135142
- EB_SQS_WAIT_TIME_S (`2`): The time to wait (seconds) when receiving messages from SQS.
136143
- EB_SQS_AUTO_ADD_QUEUE (`False`): If queues should be added automatically to AWS if they don't exist.
144+
- EB_SQS_QUEUE_MESSAGE_RETENTION (`1209600`): The value (in seconds) to be passed to MessageRetentionPeriod parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
145+
- EB_SQS_QUEUE_VISIBILITY_TIMEOUT (`300`): The value (in seconds) to be passed to VisibilityTimeout parameter, when creating a queue (only relevant in case EB_SQS_AUTO_ADD_QUEUE is set to True).
137146
- EB_SQS_DEAD_LETTER_MODE (`False`): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is.
138147
- EB_SQS_DEFAULT_DELAY (`0`): Default task delay time in seconds.
139148
- EB_SQS_DEFAULT_MAX_RETRIES (`0`): Default retry limit for all tasks.
@@ -148,6 +157,7 @@ The following settings can be used to fine tune django-eb-sqs. Copy them into yo
148157
- EB_SQS_REDIS_KEY_PREFIX (`eb-sqs-`): Prefix used for all Redis keys
149158
- EB_SQS_USE_PICKLE (`False`): Enable to use `pickle` to serialize task parameters. Uses `json` as default.
150159
- EB_SQS_AWS_MAX_RETRIES (`30`): Default retry limit on a boto3 call to AWS SQS.
160+
- EB_SQS_REFRESH_PREFIX_QUEUES_S (`10`): Minimal number of seconds to wait between refreshing queue list, in case prefix is used
151161

152162

153163
### Development

development.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
boto3==1.4.7
1+
boto3==1.4.8
22
Django==1.10.6
33
mock==2.0.0
44
moto==0.4.24

eb_sqs/aws/sqs_queue_client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,13 @@ def _get_sqs_queue(self, queue_name):
4242
def _add_sqs_queue(self, queue_name):
4343
# type: (unicode) -> Any
4444
if settings.AUTO_ADD_QUEUE:
45-
queue = self.sqs.create_queue(QueueName=queue_name)
45+
queue = self.sqs.create_queue(
46+
QueueName=queue_name,
47+
Attributes={
48+
'MessageRetentionPeriod': settings.QUEUE_MESSAGE_RETENTION,
49+
'VisibilityTimeout': settings.QUEUE_VISIBILITY_TIMEOUT
50+
}
51+
)
4652
self.queue_cache[queue_name] = queue
4753
return queue
4854
else:

eb_sqs/management/commands/process_queue.py

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
from __future__ import absolute_import, unicode_literals
22

3+
from datetime import timedelta, datetime
4+
35
import boto3
46
import logging
57

68
from botocore.config import Config
79
from django.core.management import BaseCommand, CommandError
10+
from django.utils import timezone
811

912
from eb_sqs import settings
1013
from eb_sqs.worker.worker import Worker
@@ -16,6 +19,8 @@
1619
class Command(BaseCommand):
1720
help = 'Command to process tasks from one or more SQS queues'
1821

22+
PREFIX_STR = 'prefix:'
23+
1924
def add_arguments(self, parser):
2025
parser.add_argument('--queues', '-q',
2126
dest='queue_names',
@@ -34,23 +39,41 @@ def handle(self, *args, **options):
3439
region_name=settings.AWS_REGION,
3540
config=Config(retries={'max_attempts': settings.AWS_MAX_RETRIES})
3641
)
37-
queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
42+
43+
prefixes = list(filter(lambda qn: qn.startswith(self.PREFIX_STR), queue_names))
44+
queues = self._get_queues_by_names(sqs, list(set(queue_names) - set(prefixes)))
45+
46+
queue_prefixes = [prefix.split(self.PREFIX_STR)[1] for prefix in prefixes]
47+
static_queues = queues
48+
last_update_time = timezone.make_aware(datetime.min)
3849

3950
logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))
4051

4152
worker = WorkerFactory.default().create()
4253

4354
while True:
44-
for queue in queues:
45-
messages = queue.receive_messages(
46-
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
47-
WaitTimeSeconds=settings.WAIT_TIME_S,
48-
)
49-
50-
for msg in messages:
51-
self._process_message(msg, worker)
55+
if len(queue_prefixes) > 0 and \
56+
timezone.now() - timedelta(seconds=settings.REFRESH_PREFIX_QUEUES_S) > last_update_time:
57+
queues = static_queues + self._get_queues_by_prefixes(sqs, queue_prefixes)
58+
last_update_time = timezone.now()
59+
logger.info('[django-eb-sqs] Updated SQS queues: {}'.format(
60+
', '.join([queue.url for queue in queues])
61+
))
5262

53-
def _process_message(self, msg, worker):
63+
for queue in queues:
64+
try:
65+
messages = queue.receive_messages(
66+
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
67+
WaitTimeSeconds=settings.WAIT_TIME_S,
68+
)
69+
70+
for msg in messages:
71+
self._process_message(msg, worker)
72+
except Exception as exc:
73+
logger.warning('[django-eb-sqs] Error polling queue {}: {}'.format(queue.url, exc), exc_info=1)
74+
75+
@staticmethod
76+
def _process_message(msg, worker):
5477
# type: (Any, Worker) -> None
5578
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
5679
try:
@@ -61,3 +84,16 @@ def _process_message(self, msg, worker):
6184
finally:
6285
msg.delete()
6386
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))
87+
88+
@staticmethod
89+
def _get_queues_by_names(sqs, queue_names):
90+
return [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
91+
92+
@staticmethod
93+
def _get_queues_by_prefixes(sqs, prefixes):
94+
queues = []
95+
96+
for prefix in prefixes:
97+
queues += sqs.queues.filter(QueueNamePrefix=prefix)
98+
99+
return queues

eb_sqs/settings.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,8 @@
3232
DEAD_LETTER_MODE = getattr(settings, 'EB_SQS_DEAD_LETTER_MODE', False) # type: bool
3333

3434
AWS_MAX_RETRIES = getattr(settings, 'EB_SQS_AWS_MAX_RETRIES', 30) # type: int
35+
36+
REFRESH_PREFIX_QUEUES_S = getattr(settings, 'EB_SQS_REFRESH_PREFIX_QUEUES_S', 10) # type: int
37+
38+
QUEUE_MESSAGE_RETENTION = getattr(settings, 'EB_SQS_QUEUE_MESSAGE_RETENTION', '1209600') # type: str
39+
QUEUE_VISIBILITY_TIMEOUT = getattr(settings, 'EB_SQS_QUEUE_VISIBILITY_TIMEOUT', '300') # type: str

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66

77
setup(
88
name='django-eb-sqs',
9-
version='0.99',
9+
version='1.0',
1010
package_dir={'eb_sqs': 'eb_sqs'},
1111
include_package_data=True,
1212
packages=find_packages(),
1313
description='A SQS worker implementation for Elastic Beanstalk',
1414
long_description=README,
1515
url='https://github.com/cuda-networks/django-eb-sqs',
1616
install_requires=[
17-
'boto3>=1.4.7',
17+
'boto3>=1.4.8',
1818
'Django>=1.7',
1919
'redis>=2.10',
2020
'requests>=2',

0 commit comments

Comments
 (0)