Skip to content

Commit 8d7de7f

Browse files
committed
Improve resiliency and logging of task groups
1 parent a02f997 commit 8d7de7f

File tree

3 files changed

+43
-21
lines changed

3 files changed

+43
-21
lines changed

eb_sqs/aws/sqs_queue_client.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from botocore.exceptions import ClientError
55

66
from eb_sqs import settings
7-
from eb_sqs.worker.queue_client import QueueClient, QueueDoesNotExistException
7+
from eb_sqs.worker.queue_client import QueueClient, QueueDoesNotExistException, QueueClientException
88

99

1010
class SqsQueueClient(QueueClient):
@@ -50,8 +50,11 @@ def _add_sqs_queue(self, queue_name):
5050

5151
def add_message(self, queue_name, msg, delay):
5252
# type: (unicode, unicode, int) -> None
53-
queue = self._get_queue(queue_name)
54-
queue.send_message(
55-
MessageBody=msg,
56-
DelaySeconds=delay
57-
)
53+
try:
54+
queue = self._get_queue(queue_name)
55+
queue.send_message(
56+
MessageBody=msg,
57+
DelaySeconds=delay
58+
)
59+
except Exception as ex:
60+
raise QueueClientException(ex)

eb_sqs/worker/worker.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,38 +74,44 @@ def delay(self, group_id, queue_name, func, args, kwargs, max_retries, use_pickl
7474
# type: (unicode, unicode, Any, tuple, dict, int, bool, int, bool) -> Any
7575
id = unicode(uuid.uuid4())
7676
worker_task = WorkerTask(id, group_id, queue_name, func, args, kwargs, max_retries, 0, use_pickle)
77-
return self._enqueue_task(worker_task, delay, execute_inline, False)
77+
return self._enqueue_task(worker_task, delay, execute_inline, False, True)
7878

7979
def retry(self, worker_task, delay, execute_inline, count_retries):
8080
# type: (WorkerTask, int, bool, bool) -> Any
81-
return self._enqueue_task(worker_task, delay, execute_inline, count_retries)
81+
return self._enqueue_task(worker_task, delay, execute_inline, True, count_retries)
8282

83-
def _enqueue_task(self, worker_task, delay, execute_inline, is_retry):
84-
# type: (WorkerTask, int, bool, bool) -> Any
83+
def _enqueue_task(self, worker_task, delay, execute_inline, is_retry, count_retries):
84+
# type: (WorkerTask, int, bool, bool, bool) -> Any
8585
try:
86-
if is_retry:
86+
if is_retry and count_retries:
8787
worker_task.retry += 1
8888
if worker_task.retry > worker_task.max_retries:
8989
self._group_callback(worker_task)
9090
raise MaxRetriesReachedException(worker_task.retry)
9191

9292
if worker_task.group_id:
93+
logger.info(
94+
'Add task %s (%s) to group %s',
95+
worker_task.abs_func_name,
96+
worker_task.id,
97+
worker_task.group_id,
98+
)
9399
self.group_client.add(worker_task)
94100

101+
logger.info('%s task %s (%s): %s, %s (%s%s)',
102+
'Retrying' if is_retry else 'Delaying',
103+
worker_task.abs_func_name,
104+
worker_task.id,
105+
worker_task.args,
106+
worker_task.kwargs,
107+
worker_task.queue,
108+
', inline' if execute_inline else '')
95109
if execute_inline:
96110
if settings.FORCE_SERIALIZATION:
97111
return self._execute_task(WorkerTask.deserialize(worker_task.serialize()))
98112
else:
99113
return self._execute_task(worker_task)
100114
else:
101-
logger.info('%s task %s (%s): %s, %s (%s)',
102-
'Retrying' if is_retry else 'Delaying',
103-
worker_task.abs_func_name,
104-
worker_task.id,
105-
worker_task.args,
106-
worker_task.kwargs,
107-
worker_task.queue)
108-
109115
self.queue_client.add_message(worker_task.queue, worker_task.serialize(), delay)
110116
return None
111117
except QueueDoesNotExistException as ex:
@@ -121,6 +127,9 @@ def _enqueue_task(self, worker_task, delay, execute_inline, is_retry):
121127
ex)
122128

123129
raise QueueException()
130+
except Exception:
131+
self._group_callback(worker_task)
132+
raise
124133

125134
def _execute_task(self, worker_task):
126135
# type: (WorkerTask) -> Any
@@ -130,7 +139,17 @@ def _execute_task(self, worker_task):
130139

131140
def _group_callback(self, worker_task):
132141
# type: (WorkerTask) -> None
133-
if worker_task.group_id and self.group_client.remove(worker_task) and settings.GROUP_CALLBACK_TASK:
142+
if not worker_task.group_id:
143+
return
144+
145+
logger.info(
146+
'Remove task %s (%s) from group %s',
147+
worker_task.abs_func_name,
148+
worker_task.id,
149+
worker_task.group_id,
150+
)
151+
152+
if self.group_client.remove(worker_task) and settings.GROUP_CALLBACK_TASK:
134153
callback = settings.GROUP_CALLBACK_TASK
135154

136155
if isinstance(callback, basestring):

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
setup(
88
name='django-eb-sqs',
9-
version='0.9',
9+
version='0.91',
1010
package_dir={'eb_sqs': 'eb_sqs'},
1111
include_package_data=True,
1212
packages=find_packages(),

0 commit comments

Comments
 (0)