11import datetime
2+ import functools
23import importlib
34import logging
45from collections import defaultdict
6+ from dataclasses import dataclass
57from typing import (
68 Any ,
79 Callable ,
810 Collection ,
911 Dict ,
12+ Generic ,
1013 Iterable ,
1114 List ,
1215 Optional ,
1316 Tuple ,
1417 Type ,
18+ TypeVar ,
1519 Union ,
20+ overload ,
1621)
1722
1823import click
1924import redis
2025import structlog
2126from structlog .stdlib import BoundLogger
27+ from typing_extensions import ParamSpec
2228
2329from ._internal import (
2430 ACTIVE ,
8995STRING <prefix>:qlock:<queue> (Legacy queue locks that are no longer used)
9096"""
9197
98+ P = ParamSpec ("P" )
99+ R = TypeVar ("R" )
100+
101+
102+ @dataclass
103+ class TaskCallable (Generic [P , R ]):
104+ _func : Callable [P , R ]
105+ _tiger : "TaskTiger"
106+
107+ _task_hard_timeout : float | None = None
108+ _task_queue : str | None = None
109+ _task_unique : bool | None = None
110+ _task_unique_key : Collection [str ] | None = None
111+ _task_lock : bool | None = None
112+ _task_lock_key : Collection [str ] | None = None
113+ _task_retry : int | None = None
114+ _task_retry_on : Collection [type [BaseException ]] | None = None
115+ _task_retry_method : (
116+ Callable [[int ], float ] | Tuple [Callable [..., float ], Tuple ] | None
117+ ) = None
118+ _task_batch : bool | None = None
119+ _task_schedule : Callable | None = None
120+ _task_max_queue_size : int | None = None
121+ _task_max_stored_executions : int | None = None
122+ _task_runner_class : type | None = None
123+
124+ def __post_init__ (self ) -> None :
125+ functools .update_wrapper (self , self ._func )
126+
127+ def __call__ (self , * args : P .args , ** kwargs : P .kwargs ) -> R :
128+ return self ._func (* args , ** kwargs )
129+
130+ def delay (self , * args : P .args , ** kwargs : P .kwargs ) -> "Task" :
131+ return self ._tiger .delay (self , args = args , kwargs = kwargs )
132+
92133
93134class TaskTiger :
94135 log : BoundLogger
@@ -301,9 +342,58 @@ def _key(self, *parts: str) -> str:
301342 """
302343 return ":" .join ([self .config ["REDIS_PREFIX" ]] + list (parts ))
303344
345+ @overload
346+ def task (
347+ self ,
348+ _fn : Callable [P , R ],
349+ * ,
350+ queue : Optional [str ] = ...,
351+ hard_timeout : Optional [float ] = ...,
352+ unique : Optional [bool ] = ...,
353+ unique_key : Optional [Collection [str ]] = ...,
354+ lock : Optional [bool ] = ...,
355+ lock_key : Optional [Collection [str ]] = ...,
356+ retry : Optional [bool ] = ...,
357+ retry_on : Optional [Collection [Type [BaseException ]]] = ...,
358+ retry_method : Optional [
359+ Union [Callable [[int ], float ], Tuple [Callable [..., float ], Tuple ]]
360+ ] = ...,
361+ schedule : Optional [Callable ] = ...,
362+ batch : bool = ...,
363+ max_queue_size : Optional [int ] = ...,
364+ max_stored_executions : Optional [int ] = ...,
365+ runner_class : Optional [Type ["BaseRunner" ]] = ...,
366+ ) -> TaskCallable [P , R ]:
367+ ...
368+
369+ @overload
304370 def task (
305371 self ,
306- _fn : Optional [Callable ] = None ,
372+ _fn : None = None ,
373+ * ,
374+ queue : Optional [str ] = ...,
375+ hard_timeout : Optional [float ] = ...,
376+ unique : Optional [bool ] = ...,
377+ unique_key : Optional [Collection [str ]] = ...,
378+ lock : Optional [bool ] = ...,
379+ lock_key : Optional [Collection [str ]] = ...,
380+ retry : Optional [bool ] = ...,
381+ retry_on : Optional [Collection [Type [BaseException ]]] = ...,
382+ retry_method : Optional [
383+ Union [Callable [[int ], float ], Tuple [Callable [..., float ], Tuple ]]
384+ ] = ...,
385+ schedule : Optional [Callable ] = ...,
386+ batch : bool = ...,
387+ max_queue_size : Optional [int ] = ...,
388+ max_stored_executions : Optional [int ] = ...,
389+ runner_class : Optional [Type ["BaseRunner" ]] = ...,
390+ ) -> Callable [[Callable [P , R ]], TaskCallable [P , R ]]:
391+ ...
392+
393+ def task (
394+ self ,
395+ _fn : Optional [Callable [P , R ]] = None ,
396+ * ,
307397 queue : Optional [str ] = None ,
308398 hard_timeout : Optional [float ] = None ,
309399 unique : Optional [bool ] = None ,
@@ -320,7 +410,7 @@ def task(
320410 max_queue_size : Optional [int ] = None ,
321411 max_stored_executions : Optional [int ] = None ,
322412 runner_class : Optional [Type ["BaseRunner" ]] = None ,
323- ) -> Callable :
413+ ) -> Callable [[ Callable [ P , R ]], TaskCallable [ P , R ]] | TaskCallable [ P , R ] :
324414 """
325415 Function decorator that defines the behavior of the function when it is
326416 used as a task. To use the default behavior, tasks don't need to be
@@ -329,58 +419,40 @@ def task(
329419 See README.rst for an explanation of the options.
330420 """
331421
332- def _delay (func : Callable ) -> Callable :
333- def _delay_inner (* args : Any , ** kwargs : Any ) -> Task :
334- return self .delay (func , args = args , kwargs = kwargs )
335-
336- return _delay_inner
337-
338422 # Periodic tasks are unique.
339423 if schedule is not None :
340424 unique = True
341425
342- def _wrap (func : Callable ) -> Callable :
343- if hard_timeout is not None :
344- func ._task_hard_timeout = hard_timeout # type: ignore[attr-defined]
345- if queue is not None :
346- func ._task_queue = queue # type: ignore[attr-defined]
347- if unique is not None :
348- func ._task_unique = unique # type: ignore[attr-defined]
349- if unique_key is not None :
350- func ._task_unique_key = unique_key # type: ignore[attr-defined]
351- if lock is not None :
352- func ._task_lock = lock # type: ignore[attr-defined]
353- if lock_key is not None :
354- func ._task_lock_key = lock_key # type: ignore[attr-defined]
355- if retry is not None :
356- func ._task_retry = retry # type: ignore[attr-defined]
357- if retry_on is not None :
358- func ._task_retry_on = retry_on # type: ignore[attr-defined]
359- if retry_method is not None :
360- func ._task_retry_method = retry_method # type: ignore[attr-defined]
361- if batch is not None :
362- func ._task_batch = batch # type: ignore[attr-defined]
363- if schedule is not None :
364- func ._task_schedule = schedule # type: ignore[attr-defined]
365- if max_queue_size is not None :
366- func ._task_max_queue_size = max_queue_size # type: ignore[attr-defined]
367- if max_stored_executions is not None :
368- func ._task_max_stored_executions = max_stored_executions # type: ignore[attr-defined]
369- if runner_class is not None :
370- func ._task_runner_class = runner_class # type: ignore[attr-defined]
371-
372- func .delay = _delay (func ) # type: ignore[attr-defined]
426+ def _wrap (func : Callable [P , R ]) -> TaskCallable [P , R ]:
427+ tc = TaskCallable (
428+ _func = func ,
429+ _tiger = self ,
430+ _task_hard_timeout = hard_timeout ,
431+ _task_queue = queue ,
432+ _task_unique = unique ,
433+ _task_unique_key = unique_key ,
434+ _task_lock = lock ,
435+ _task_lock_key = lock_key ,
436+ _task_retry = retry ,
437+ _task_retry_on = retry_on ,
438+ _task_retry_method = retry_method ,
439+ _task_batch = batch ,
440+ _task_schedule = schedule ,
441+ _task_max_queue_size = max_queue_size ,
442+ _task_max_stored_executions = max_stored_executions ,
443+ _task_runner_class = runner_class ,
444+ )
373445
374446 if schedule is not None :
375447 serialized_func = serialize_func_name (func )
376448 assert (
377449 serialized_func not in self .periodic_task_funcs
378450 ), "attempted duplicate registration of periodic task"
379- self .periodic_task_funcs [serialized_func ] = func
451+ self .periodic_task_funcs [serialized_func ] = tc
380452
381- return func
453+ return tc
382454
383- return _wrap if _fn is None else _wrap (_fn ) # type: ignore[return-value]
455+ return _wrap if _fn is None else _wrap (_fn )
384456
385457 def run_worker_with_args (self , args : List [str ]) -> None :
386458 """
0 commit comments