Skip to content

Commit 07f2940

Browse files
authored
Merge pull request #36 from cuda-networks/auto_tasks
Auto tasks
2 parents 77ab42c + a60ff8f commit 07f2940

File tree

8 files changed

+207
-1
lines changed

8 files changed

+207
-1
lines changed

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,32 @@ def group_finished(group_id):
135135
pass
136136
```
137137

138+
#### Auto Tasks
139+
140+
This is a helper tool for the case you wish to define one of your class method as a task, and make it seamless to all callers.
141+
This makes the code much simpler, and allows using classes to invoke your method directly without considering whether it's invoked async or not.
142+
143+
This is how you would define your class:
144+
```python
145+
class MyService:
146+
def __init__(self, p1=default1, ..., pN=defaultN, auto_task_service=None):
147+
self._auto_task_service = auto_task_service or AutoTaskService()
148+
149+
self._auto_task_service.register_task(self.my_task_method)
150+
151+
def my_task_method(self, *args, **kwargs):
152+
...
153+
154+
```
155+
156+
Notice the following:
157+
1. Your class needs to have defaults for all parameters in the c'tor
158+
2. The c'tor must have a parameter named `auto_task_service`
159+
3. The method shouldn't have any return value (as it's invoked async)
160+
161+
In case you want your method to retry certain cases, you need to raise `RetryableTaskException`.
162+
You can provide on optional `delay` time for the retry, set `count_retries=False` in case you don't want to limit retries, or use `max_retries_func` to specify a function which will be invoked when the defined maximum number of retries is exhausted.
163+
138164
#### Settings
139165

140166
The following settings can be used to fine tune django-eb-sqs. Copy them into your Django `settings.py` file.

eb_sqs/auto_tasks/__init__.py

Whitespace-only changes.

eb_sqs/auto_tasks/base_service.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from abc import ABCMeta, abstractmethod
2+
3+
4+
class BaseAutoTaskService:
5+
__metaclass__ = ABCMeta
6+
7+
@abstractmethod
8+
def register_task(self, method, queue_name=None, max_retries=None):
9+
# type: (Any, str, int) -> None
10+
pass
11+
12+
13+
class NoopTaskService(BaseAutoTaskService):
14+
def __init__(self):
15+
# type: () -> None
16+
self._registered_func_names = []
17+
18+
def register_task(self, method, queue_name=None, max_retries=None):
19+
# type: (Any, str, int) -> None
20+
self._registered_func_names.append(method.__name__)
21+
22+
def is_func_name_registered(self, func_name):
23+
# type: (str) -> bool
24+
return func_name in self._registered_func_names

eb_sqs/auto_tasks/exceptions.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
class RetryableTaskException(Exception):
2+
def __init__(self, inner, delay=None, count_retries=None, max_retries_func=None):
3+
# type: (Exception, int, bool, Any) -> None
4+
self._inner = inner
5+
6+
self.delay = delay
7+
self.count_retries = count_retries
8+
self.max_retries_func = max_retries_func
9+
10+
def __repr__(self):
11+
# type: () -> str
12+
return repr(self._inner)

eb_sqs/auto_tasks/service.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import importlib
2+
import logging
3+
4+
from eb_sqs.auto_tasks.base_service import BaseAutoTaskService, NoopTaskService
5+
from eb_sqs.auto_tasks.exceptions import RetryableTaskException
6+
from eb_sqs.decorators import task
7+
from eb_sqs.worker.worker_exceptions import MaxRetriesReachedException
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
@task()
13+
def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs):
14+
try:
15+
logger.debug(
16+
'Invoke _auto_task_wrapper with module: %s class: %s func: %s args: %s and kwargs: %s',
17+
module_name,
18+
class_name,
19+
func_name,
20+
args,
21+
kwargs
22+
)
23+
24+
module = importlib.import_module(module_name) # import module
25+
class_ = getattr(module, class_name) # find class
26+
27+
noop_task_service = NoopTaskService()
28+
instance = class_(auto_task_service=noop_task_service) # instantiate class using NoopTaskService
29+
30+
if noop_task_service.is_func_name_registered(func_name):
31+
getattr(instance, func_name)(*args, **kwargs) # invoke method on instance
32+
else:
33+
logger.error(
34+
'Trying to invoke _auto_task_wrapper for unregistered task with module: %s class: %s func: %s args: %s and kwargs: %s',
35+
module_name,
36+
class_name,
37+
func_name,
38+
args,
39+
kwargs
40+
)
41+
except RetryableTaskException as exc:
42+
try:
43+
retry_kwargs = {}
44+
45+
if exc.delay is not None:
46+
retry_kwargs['delay'] = exc.delay
47+
48+
if exc.count_retries is not None:
49+
retry_kwargs['count_retries'] = exc.count_retries
50+
51+
_auto_task_wrapper.retry(**retry_kwargs)
52+
except MaxRetriesReachedException:
53+
if exc.max_retries_func:
54+
exc.max_retries_func()
55+
else:
56+
# by default log an error
57+
logger.error('Reached max retries in auto task {}.{}.{} with error: {}'.format(module_name, class_name, func_name, repr(exc)))
58+
59+
60+
class AutoTaskService(BaseAutoTaskService):
61+
def register_task(self, method, queue_name=None, max_retries=None):
62+
# type: (Any, str, int) -> None
63+
instance = method.__self__
64+
class_ = instance.__class__
65+
func_name = method.__name__
66+
67+
def _auto_task_wrapper_invoker(*args, **kwargs):
68+
if queue_name is not None:
69+
kwargs['queue_name'] = queue_name
70+
71+
if max_retries is not None:
72+
kwargs['max_retries'] = max_retries
73+
74+
_auto_task_wrapper.delay(
75+
class_.__module__,
76+
class_.__name__,
77+
func_name,
78+
*args, **kwargs
79+
)
80+
81+
setattr(instance, func_name, _auto_task_wrapper_invoker)

eb_sqs/tests/auto_tasks/__init__.py

Whitespace-only changes.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from unittest import TestCase
2+
3+
from mock import Mock, call
4+
5+
from eb_sqs import settings
6+
from eb_sqs.auto_tasks.exceptions import RetryableTaskException
7+
from eb_sqs.auto_tasks.service import AutoTaskService, _auto_task_wrapper
8+
9+
10+
class TestService:
11+
_TEST_MOCK = Mock()
12+
_MAX_RETRY_NUM = 5
13+
14+
def __init__(self, auto_task_service=None):
15+
self._auto_task_service = auto_task_service or AutoTaskService()
16+
self._auto_task_service.register_task(self.task_method)
17+
self._auto_task_service.register_task(self.task_retry_method, max_retries=self._MAX_RETRY_NUM)
18+
19+
def task_method(self, *args, **kwargs):
20+
self._TEST_MOCK.task_method(*args, **kwargs)
21+
22+
def task_retry_method(self, *args, **kwargs):
23+
self._TEST_MOCK.task_retry_method(*args, **kwargs)
24+
25+
def max_retry_fun():
26+
self._TEST_MOCK.task_max_retry_method(*args, **kwargs)
27+
28+
raise RetryableTaskException(Exception('Test'), max_retries_func=max_retry_fun)
29+
30+
def non_task_method(self):
31+
self._TEST_MOCK.non_task_method()
32+
33+
34+
class AutoTasksTest(TestCase):
35+
def setUp(self):
36+
self._test_service = TestService()
37+
38+
self._args = [5, '6']
39+
self._kwargs = {'p1': 'bla', 'p2': 130}
40+
41+
settings.EXECUTE_INLINE = True
42+
43+
def test_task_method(self):
44+
self._test_service.task_method(*self._args, **self._kwargs)
45+
46+
TestService._TEST_MOCK.task_method.assert_called_once_with(*self._args, **self._kwargs)
47+
48+
def test_task_retry_method(self):
49+
self._test_service.task_retry_method(*self._args, **self._kwargs)
50+
51+
TestService._TEST_MOCK.task_retry_method.assert_has_calls([call(*self._args, **self._kwargs)] * TestService._MAX_RETRY_NUM)
52+
53+
TestService._TEST_MOCK.task_max_retry_method.assert_called_once_with(*self._args, **self._kwargs)
54+
55+
def test_non_task_method(self):
56+
_auto_task_wrapper.delay(
57+
self._test_service.__class__.__module__,
58+
self._test_service.__class__.__name__,
59+
TestService.non_task_method.__name__,
60+
execute_inline=True
61+
)
62+
63+
TestService._TEST_MOCK.non_task_method.assert_not_called()

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
setup(
88
name='django-eb-sqs',
9-
version='1.20',
9+
version='1.30',
1010
package_dir={'eb_sqs': 'eb_sqs'},
1111
include_package_data=True,
1212
packages=find_packages(),

0 commit comments

Comments
 (0)