11import datetime
2- import functools
32import importlib
43import logging
54from collections import defaultdict
6- from dataclasses import dataclass
75from typing import (
86 Any ,
97 Callable ,
108 Collection ,
119 Dict ,
12- Generic ,
1310 Iterable ,
1411 List ,
1512 Optional ,
16- ParamSpec ,
1713 Tuple ,
1814 Type ,
19- TypeVar ,
2015 Union ,
21- overload ,
2216)
2317
2418import click
9589STRING <prefix>:qlock:<queue> (Legacy queue locks that are no longer used)
9690"""
9791
98- P = ParamSpec ("P" )
99- R = TypeVar ("R" )
100-
101-
102- @dataclass
103- class TaskCallable (Generic [P , R ]):
104- _func : Callable [P , R ]
105- _tiger : "TaskTiger"
106-
107- _task_hard_timeout : float | None = None
108- _task_queue : str | None = None
109- _task_unique : bool | None = None
110- _task_unique_key : Collection [str ] | None = None
111- _task_lock : bool | None = None
112- _task_lock_key : Collection [str ] | None = None
113- _task_retry : int | None = None
114- _task_retry_on : Collection [type [BaseException ]] | None = None
115- _task_retry_method : (
116- Callable [[int ], float ] | Tuple [Callable [..., float ], Tuple ] | None
117- ) = None
118- _task_batch : bool | None = None
119- _task_schedule : Callable | None = None
120- _task_max_queue_size : int | None = None
121- _task_max_stored_executions : int | None = None
122- _task_runner_class : type | None = None
123-
124- def __post_init__ (self ) -> None :
125- functools .update_wrapper (self , self ._func )
126-
127- def __call__ (self , * args : P .args , ** kwargs : P .kwargs ) -> R :
128- return self ._func (* args , ** kwargs )
129-
130- def delay (self , * args : P .args , ** kwargs : P .kwargs ) -> "Task" :
131- return self ._tiger .delay (self , args = args , kwargs = kwargs )
132-
13392
13493class TaskTiger :
13594 log : BoundLogger
@@ -342,56 +301,9 @@ def _key(self, *parts: str) -> str:
342301 """
343302 return ":" .join ([self .config ["REDIS_PREFIX" ]] + list (parts ))
344303
345- @overload
346- def task (
347- self ,
348- _fn : Callable [P , R ],
349- * ,
350- queue : Optional [str ] = ...,
351- hard_timeout : Optional [float ] = ...,
352- unique : Optional [bool ] = ...,
353- unique_key : Optional [Collection [str ]] = ...,
354- lock : Optional [bool ] = ...,
355- lock_key : Optional [Collection [str ]] = ...,
356- retry : Optional [bool ] = ...,
357- retry_on : Optional [Collection [Type [BaseException ]]] = ...,
358- retry_method : Optional [
359- Union [Callable [[int ], float ], Tuple [Callable [..., float ], Tuple ]]
360- ] = ...,
361- schedule : Optional [Callable ] = ...,
362- batch : bool = ...,
363- max_queue_size : Optional [int ] = ...,
364- max_stored_executions : Optional [int ] = ...,
365- runner_class : Optional [Type ["BaseRunner" ]] = ...,
366- ) -> TaskCallable [P , R ]: ...
367-
368- @overload
369304 def task (
370305 self ,
371- _fn : None = None ,
372- * ,
373- queue : Optional [str ] = ...,
374- hard_timeout : Optional [float ] = ...,
375- unique : Optional [bool ] = ...,
376- unique_key : Optional [Collection [str ]] = ...,
377- lock : Optional [bool ] = ...,
378- lock_key : Optional [Collection [str ]] = ...,
379- retry : Optional [bool ] = ...,
380- retry_on : Optional [Collection [Type [BaseException ]]] = ...,
381- retry_method : Optional [
382- Union [Callable [[int ], float ], Tuple [Callable [..., float ], Tuple ]]
383- ] = ...,
384- schedule : Optional [Callable ] = ...,
385- batch : bool = ...,
386- max_queue_size : Optional [int ] = ...,
387- max_stored_executions : Optional [int ] = ...,
388- runner_class : Optional [Type ["BaseRunner" ]] = ...,
389- ) -> Callable [[Callable [P , R ]], TaskCallable [P , R ]]: ...
390-
391- def task (
392- self ,
393- _fn : Optional [Callable [P , R ]] = None ,
394- * ,
306+ _fn : Optional [Callable ] = None ,
395307 queue : Optional [str ] = None ,
396308 hard_timeout : Optional [float ] = None ,
397309 unique : Optional [bool ] = None ,
@@ -408,7 +320,7 @@ def task(
408320 max_queue_size : Optional [int ] = None ,
409321 max_stored_executions : Optional [int ] = None ,
410322 runner_class : Optional [Type ["BaseRunner" ]] = None ,
411- ) -> Callable [[ Callable [ P , R ]], TaskCallable [ P , R ]] | TaskCallable [ P , R ] :
323+ ) -> Callable :
412324 """
413325 Function decorator that defines the behavior of the function when it is
414326 used as a task. To use the default behavior, tasks don't need to be
@@ -417,40 +329,58 @@ def task(
417329 See README.rst for an explanation of the options.
418330 """
419331
332+ def _delay (func : Callable ) -> Callable :
333+ def _delay_inner (* args : Any , ** kwargs : Any ) -> Task :
334+ return self .delay (func , args = args , kwargs = kwargs )
335+
336+ return _delay_inner
337+
420338 # Periodic tasks are unique.
421339 if schedule is not None :
422340 unique = True
423341
424- def _wrap (func : Callable [P , R ]) -> TaskCallable [P , R ]:
425- tc = TaskCallable (
426- _func = func ,
427- _tiger = self ,
428- _task_hard_timeout = hard_timeout ,
429- _task_queue = queue ,
430- _task_unique = unique ,
431- _task_unique_key = unique_key ,
432- _task_lock = lock ,
433- _task_lock_key = lock_key ,
434- _task_retry = retry ,
435- _task_retry_on = retry_on ,
436- _task_retry_method = retry_method ,
437- _task_batch = batch ,
438- _task_schedule = schedule ,
439- _task_max_queue_size = max_queue_size ,
440- _task_max_stored_executions = max_stored_executions ,
441- _task_runner_class = runner_class ,
442- )
342+ def _wrap (func : Callable ) -> Callable :
343+ if hard_timeout is not None :
344+ func ._task_hard_timeout = hard_timeout # type: ignore[attr-defined]
345+ if queue is not None :
346+ func ._task_queue = queue # type: ignore[attr-defined]
347+ if unique is not None :
348+ func ._task_unique = unique # type: ignore[attr-defined]
349+ if unique_key is not None :
350+ func ._task_unique_key = unique_key # type: ignore[attr-defined]
351+ if lock is not None :
352+ func ._task_lock = lock # type: ignore[attr-defined]
353+ if lock_key is not None :
354+ func ._task_lock_key = lock_key # type: ignore[attr-defined]
355+ if retry is not None :
356+ func ._task_retry = retry # type: ignore[attr-defined]
357+ if retry_on is not None :
358+ func ._task_retry_on = retry_on # type: ignore[attr-defined]
359+ if retry_method is not None :
360+ func ._task_retry_method = retry_method # type: ignore[attr-defined]
361+ if batch is not None :
362+ func ._task_batch = batch # type: ignore[attr-defined]
363+ if schedule is not None :
364+ func ._task_schedule = schedule # type: ignore[attr-defined]
365+ if max_queue_size is not None :
366+ func ._task_max_queue_size = max_queue_size # type: ignore[attr-defined]
367+ if max_stored_executions is not None :
368+ func ._task_max_stored_executions = max_stored_executions # type: ignore[attr-defined]
369+ if runner_class is not None :
370+ func ._task_runner_class = runner_class # type: ignore[attr-defined]
371+
372+ func .delay = _delay (func ) # type: ignore[attr-defined]
443373
444374 if schedule is not None :
445375 serialized_func = serialize_func_name (func )
446- assert serialized_func not in self . periodic_task_funcs , (
447- "attempted duplicate registration of periodic task"
448- )
449- self .periodic_task_funcs [serialized_func ] = tc
376+ assert (
377+ serialized_func not in self . periodic_task_funcs
378+ ), "attempted duplicate registration of periodic task"
379+ self .periodic_task_funcs [serialized_func ] = func
450380
451- return tc
381+ return func
452382
453- return _wrap if _fn is None else _wrap (_fn )
383+ return _wrap if _fn is None else _wrap (_fn ) # type: ignore[return-value]
454384
455385 def run_worker_with_args (self , args : List [str ]) -> None :
456386 """
0 commit comments