Skip to content

Commit 9a6317b

Browse files
author
itaybleier
authored
Merge pull request #12 from cuda-networks/process-queue-command
Process queue Django Command
2 parents 48c16a6 + 2238370 commit 9a6317b

File tree

5 files changed

+97
-20
lines changed

5 files changed

+97
-20
lines changed

README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ django-eb-sqs is a simple task manager for the Elastic Beanstalk Worker Tier. It
44

55
### Installation
66

7-
Install the module with `pip install django-eb-sqs` or add it to your `requirements.txt`.
7+
Install the module with `pip install git+git://github.com/cuda-networks/django-eb-sqs.git` or add it to your `requirements.txt`.
88

99
Don't forget to add django-eb-sqs app to your Django `INSTALLED_APPS` settings:
1010
```python
@@ -94,6 +94,17 @@ For example:
9494
python manage.py run_eb_sqs_worker --url http://127.0.0.1:80/worker/process --queue default
9595
```
9696

97+
#### Executing Tasks without Elastic Beanstalk
98+
99+
Another way of executing tasks is to use the Django command `process_queue`.
100+
This command can work with one or more queues, reading from the queues infinitely and executing tasks as they come-in.
101+
102+
```bash
103+
python manage.py process_queue --queues <comma-delimited list of queue names>
104+
```
105+
106+
This is a good idea for someone who wants to execute tasks without an Elastic Beanstalk worker.
107+
97108

98109
#### Group Tasks
99110
Multiple tasks can be grouped by specifing the `group_id` argument when calling `delay` on a task.
@@ -119,6 +130,9 @@ def group_finished(group_id):
119130

120131
The following settings can be used to fine tune django-eb-sqs. Copy them into your Django `settings.py` file.
121132

133+
- EB_AWS_REGION (`us-east-1`): The AWS region to use when working with SQS.
134+
- EB_SQS_MAX_NUMBER_OF_MESSAGES (`10`): The maximum number of messages to read in a single call from SQS (<= 10).
135+
- EB_SQS_WAIT_TIME_S (`2`): The time to wait (seconds) when receiving messages from SQS.
122136
- EB_SQS_AUTO_ADD_QUEUE (`True`): If queues should be added automatically to AWS if they don't exist.
123137
- EB_SQS_DEAD_LETTER_MODE (`False`): Enable if this worker is handling the SQS dead letter queue. Tasks won't be executed but group callback is.
124138
- EB_SQS_DEFAULT_DELAY (`0`): Default task delay time in seconds.

development.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
boto3==1.3.1
2-
Django==1.9.6
1+
boto3==1.4.4
2+
Django==1.10.6
33
mock==2.0.0
44
moto==0.4.24
55
redis==2.10.5
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from __future__ import absolute_import, unicode_literals
2+
3+
import boto3
4+
import logging
5+
6+
from django.core.management import BaseCommand, CommandError
7+
8+
from eb_sqs import settings
9+
from eb_sqs.worker.worker import Worker
10+
from eb_sqs.worker.worker_factory import WorkerFactory
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class Command(BaseCommand):
16+
help = 'Command to process tasks from one or more SQS queues'
17+
18+
def add_arguments(self, parser):
19+
parser.add_argument('--queues', '-q',
20+
dest='queue_names',
21+
help='Name of queues to process, separated by commas')
22+
23+
def handle(self, *args, **options):
24+
if not options['queue_names']:
25+
raise CommandError('Queue names (--queues) not specified')
26+
27+
queue_names = [queue_name.rstrip() for queue_name in options['queue_names'].split(',')]
28+
29+
logger.debug('[django-eb-sqs] Connecting to SQS: {}'.format(', '.join(queue_names)))
30+
31+
sqs = boto3.resource('sqs', region_name=settings.AWS_REGION)
32+
queues = [sqs.get_queue_by_name(QueueName=queue_name) for queue_name in queue_names]
33+
34+
logger.debug('[django-eb-sqs] Connected to SQS: {}'.format(', '.join(queue_names)))
35+
36+
worker = WorkerFactory.default().create()
37+
38+
while True:
39+
for queue in queues:
40+
messages = queue.receive_messages(
41+
MaxNumberOfMessages=settings.MAX_NUMBER_OF_MESSAGES,
42+
WaitTimeSeconds=settings.WAIT_TIME_S,
43+
)
44+
45+
for msg in messages:
46+
self._process_message(msg, worker)
47+
48+
def _process_message(self, msg, worker):
49+
# type: (Any, Worker) -> None
50+
logger.debug('[django-eb-sqs] Read message {}'.format(msg.message_id))
51+
try:
52+
worker.execute(msg.body)
53+
logger.debug('[django-eb-sqs] Processed message {}'.format(msg.message_id))
54+
except Exception as exc:
55+
logger.error('[django-eb-sqs] Unhandled error: {}'.format(exc), exc_info=1)
56+
finally:
57+
msg.delete()
58+
logger.debug('[django-eb-sqs] Deleted message {}'.format(msg.message_id))

eb_sqs/settings.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,31 @@
22

33
from django.conf import settings
44

5-
AUTO_ADD_QUEUE = getattr(settings, 'EB_SQS_AUTO_ADD_QUEUE', True) # type: bool
6-
QUEUE_PREFIX = getattr(settings, 'EB_SQS_QUEUE_PREFIX', 'eb-sqs-') # type: unicode
7-
DEFAULT_QUEUE = getattr(settings, 'EB_SQS_DEFAULT_QUEUE', 'default') # type: unicode
5+
AWS_REGION = getattr(settings, 'EB_AWS_REGION', 'us-east-1') # type: unicode
86

9-
EXECUTE_INLINE = getattr(settings, 'EB_SQS_EXECUTE_INLINE', False) # type: bool
10-
FORCE_SERIALIZATION = getattr(settings, 'EB_SQS_FORCE_SERIALIZATION', False) # type: bool
7+
MAX_NUMBER_OF_MESSAGES = getattr(settings, 'EB_SQS_MAX_NUMBER_OF_MESSAGES', 10) # type: int
8+
WAIT_TIME_S = getattr(settings, 'EB_SQS_WAIT_TIME_S', 2) # type: int
119

12-
DEFAULT_DELAY = getattr(settings, 'EB_SQS_DEFAULT_DELAY', 0) # type: int
13-
DEFAULT_MAX_RETRIES = getattr(settings, 'EB_SQS_DEFAULT_MAX_RETRIES', 0) # type: int
14-
DEFAULT_COUNT_RETRIES = getattr(settings, 'EB_SQS_DEFAULT_COUNT_RETRIES', True) # type: bool
10+
AUTO_ADD_QUEUE = getattr(settings, 'EB_SQS_AUTO_ADD_QUEUE', True) # type: bool
11+
QUEUE_PREFIX = getattr(settings, 'EB_SQS_QUEUE_PREFIX', 'eb-sqs-') # type: unicode
12+
DEFAULT_QUEUE = getattr(settings, 'EB_SQS_DEFAULT_QUEUE', 'default') # type: unicode
1513

16-
USE_PICKLE = getattr(settings, 'EB_SQS_USE_PICKLE', False) # type: bool
14+
EXECUTE_INLINE = getattr(settings, 'EB_SQS_EXECUTE_INLINE', False) # type: bool
15+
FORCE_SERIALIZATION = getattr(settings, 'EB_SQS_FORCE_SERIALIZATION', False) # type: bool
1716

18-
GROUP_CALLBACK_TASK = getattr(settings, 'EB_SQS_GROUP_CALLBACK_TASK', None) # type: Any
17+
DEFAULT_DELAY = getattr(settings, 'EB_SQS_DEFAULT_DELAY', 0) # type: int
18+
DEFAULT_MAX_RETRIES = getattr(settings, 'EB_SQS_DEFAULT_MAX_RETRIES', 0) # type: int
19+
DEFAULT_COUNT_RETRIES = getattr(settings, 'EB_SQS_DEFAULT_COUNT_RETRIES', True) # type: bool
1920

20-
REDIS_CLIENT = getattr(settings, 'EB_SQS_REDIS_CLIENT', None) # type: StrictRedis
21+
USE_PICKLE = getattr(settings, 'EB_SQS_USE_PICKLE', False) # type: bool
22+
23+
GROUP_CALLBACK_TASK = getattr(settings, 'EB_SQS_GROUP_CALLBACK_TASK', None) # type: Any
24+
25+
REDIS_CLIENT = getattr(settings, 'EB_SQS_REDIS_CLIENT', None) # type: StrictRedis
2126
# default: 7 days
22-
REDIS_EXPIRY = getattr(settings, 'EB_SQS_REDIS_EXPIRY', 3600*24*7) # type: int
23-
REDIS_KEY_PREFIX = getattr(settings, 'EB_SQS_REDIS_KEY_PREFIX', 'eb-sqs-') # type: string
27+
REDIS_EXPIRY = getattr(settings, 'EB_SQS_REDIS_EXPIRY', 3600 * 24 * 7) # type: int
28+
REDIS_KEY_PREFIX = getattr(settings, 'EB_SQS_REDIS_KEY_PREFIX', 'eb-sqs-') # type: string
2429

25-
WORKER_FACTORY = getattr(settings, 'EB_SQS_WORKER_FACTORY', None) # type: WorkerFactory
30+
WORKER_FACTORY = getattr(settings, 'EB_SQS_WORKER_FACTORY', None) # type: WorkerFactory
2631

27-
DEAD_LETTER_MODE = getattr(settings, 'EB_SQS_DEAD_LETTER_MODE', False) # type: bool
32+
DEAD_LETTER_MODE = getattr(settings, 'EB_SQS_DEAD_LETTER_MODE', False) # type: bool

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66

77
setup(
88
name='django-eb-sqs',
9-
version='0.96',
9+
version='0.97',
1010
package_dir={'eb_sqs': 'eb_sqs'},
1111
include_package_data=True,
1212
packages=find_packages(),
1313
description='A SQS worker implementation for Elastic Beanstalk',
1414
long_description=README,
15-
url='https://github.com/sookasa/django-eb-sqs',
15+
url='https://github.com/cuda-networks/django-eb-sqs',
1616
install_requires=[
1717
'boto3>=1.3.1',
1818
'Django>=1.7',

0 commit comments

Comments
 (0)