11import importlib
22import logging
33
4- from eb_sqs .auto_tasks .base_service import BaseAutoTaskService , NoopTaskService
54from eb_sqs .auto_tasks .exceptions import RetryableTaskException
65from eb_sqs .decorators import task
76from eb_sqs .worker .worker_exceptions import MaxRetriesReachedException
@@ -24,11 +23,12 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs):
2423 module = importlib .import_module (module_name ) # import module
2524 class_ = getattr (module , class_name ) # find class
2625
27- noop_task_service = NoopTaskService ( )
28- instance = class_ (auto_task_service = noop_task_service ) # instantiate class using NoopTaskService
26+ auto_task_executor_service = _AutoTaskExecutorService ( func_name )
27+ instance = class_ (auto_task_service = auto_task_executor_service ) # instantiate class using _AutoTaskExecutorService
2928
30- if noop_task_service .is_func_name_registered (func_name ):
31- getattr (instance , func_name )(* args , ** kwargs ) # invoke method on instance
29+ executor_func_name = auto_task_executor_service .get_executor_func_name ()
30+ if executor_func_name :
31+ getattr (instance , executor_func_name )(* args , ** kwargs ) # invoke method on instance
3232 else :
3333 logger .error (
3434 'Trying to invoke _auto_task_wrapper for unregistered task with module: %s class: %s func: %s args: %s and kwargs: %s' ,
@@ -57,7 +57,7 @@ def _auto_task_wrapper(module_name, class_name, func_name, *args, **kwargs):
5757 logger .error ('Reached max retries in auto task {}.{}.{} with error: {}' .format (module_name , class_name , func_name , repr (exc )))
5858
5959
60- class AutoTaskService (BaseAutoTaskService ):
60+ class AutoTaskService (object ):
6161 def register_task (self , method , queue_name = None , max_retries = None ):
6262 # type: (Any, str, int) -> None
6363 instance = method .__self__
@@ -79,3 +79,26 @@ def _auto_task_wrapper_invoker(*args, **kwargs):
7979 )
8080
8181 setattr (instance , func_name , _auto_task_wrapper_invoker )
82+
83+
84+ class _AutoTaskExecutorService (AutoTaskService ):
85+ def __init__ (self , func_name ):
86+ # type: (str) -> None
87+ self ._func_name = func_name
88+
89+ self ._executor_func_name = None
90+
91+ def register_task (self , method , queue_name = None , max_retries = None ):
92+ # type: (Any, str, int) -> None
93+ if self ._func_name == method .__name__ :
94+ # circuit breaker to allow actually executing the method once
95+ instance = method .__self__
96+
97+ self ._executor_func_name = self ._func_name + '__auto_task_executor__'
98+ setattr (instance , self ._executor_func_name , getattr (instance , self ._func_name ))
99+
100+ super (_AutoTaskExecutorService , self ).register_task (method , queue_name , max_retries )
101+
102+ def get_executor_func_name (self ):
103+ # type: () -> str
104+ return self ._executor_func_name
0 commit comments