1
1
from abc import ABC , abstractmethod
2
- from dataclasses import asdict , is_dataclass
3
2
from functools import wraps
4
3
from logging import getLogger
5
- from typing import ( # noqa: WPS235
6
- Any ,
7
- AsyncGenerator ,
8
- Callable ,
9
- Coroutine ,
10
- Dict ,
11
- Generic ,
12
- Optional ,
13
- Set ,
14
- TypeVar ,
15
- Union ,
16
- overload ,
17
- )
18
- from uuid import uuid4
19
-
20
- from pydantic import BaseModel
21
- from typing_extensions import ParamSpec
22
-
23
- from taskiq .abc .result_backend import AsyncResultBackend , AsyncTaskiqTask
4
+ from typing import Any , AsyncGenerator , Callable , Dict , Optional , Union , overload
5
+
6
+ from taskiq .abc .result_backend import AsyncResultBackend
7
+ from taskiq .decor import AsyncTaskiqDecoratedTask
24
8
from taskiq .message import TaskiqMessage
25
9
from taskiq .result_backends .dummy import DummyResultBackend
10
+ from taskiq .types_helpers import T_ , FuncParams_ , ReturnType_
26
11
27
12
logger = getLogger ("taskiq" )
28
13
29
- _T = TypeVar ("_T" ) # noqa: WPS111
30
- _FuncParams = ParamSpec ("_FuncParams" )
31
- _ReturnType = TypeVar ("_ReturnType" )
32
-
33
-
34
- class AsyncKicker (Generic [_FuncParams , _ReturnType ]):
35
- """Class that used to modify data before sending it to broker."""
36
-
37
- def __init__ (
38
- self ,
39
- task_name : str ,
40
- broker : "AsyncBroker" ,
41
- labels : Dict [str , Any ],
42
- ) -> None :
43
- self .task_name = task_name
44
- self .broker = broker
45
- self .labels = labels
46
-
47
- def with_label (
48
- self ,
49
- label_name : str ,
50
- value : Any ,
51
- ) -> "AsyncKicker[_FuncParams, _ReturnType]" :
52
- """
53
- Update one single label.
54
-
55
- This method is used to update
56
- task's labels before sending.
57
-
58
- :param label_name: name of the label to update.
59
- :param value: label's value.
60
- :return: kicker object with new labels.
61
- """
62
- self .labels [label_name ] = value
63
- return self
64
-
65
- def with_labels (
66
- self ,
67
- labels : Dict [str , Any ],
68
- ) -> "AsyncKicker[_FuncParams, _ReturnType]" :
69
- """
70
- Update function's labels before sending.
71
-
72
- :param labels: dict with new labels.
73
- :return: kicker with new labels.
74
- """
75
- self .labels .update (labels )
76
- return self
77
-
78
- def with_broker (
79
- self ,
80
- broker : "AsyncBroker" ,
81
- ) -> "AsyncKicker[_FuncParams, _ReturnType]" :
82
- """
83
- Replace broker for the function.
84
-
85
- This method can be used with
86
- shared tasks.
87
-
88
- :param broker: new broker instance.
89
- :return: Kicker with new broker.
90
- """
91
- self .broker = broker
92
- return self
93
-
94
- @overload
95
- async def kiq ( # noqa: D102
96
- self : "AsyncKicker[_FuncParams, Coroutine[Any, Any, _T]]" ,
97
- * args : _FuncParams .args ,
98
- ** kwargs : _FuncParams .kwargs ,
99
- ) -> AsyncTaskiqTask [_T ]:
100
- ...
101
-
102
- @overload
103
- async def kiq ( # noqa: D102
104
- self : "AsyncKicker[_FuncParams, _ReturnType]" ,
105
- * args : _FuncParams .args ,
106
- ** kwargs : _FuncParams .kwargs ,
107
- ) -> AsyncTaskiqTask [_ReturnType ]:
108
- ...
109
-
110
- async def kiq (
111
- self ,
112
- * args : _FuncParams .args ,
113
- ** kwargs : _FuncParams .kwargs ,
114
- ) -> Any :
115
- """
116
- This method sends function call over the network.
117
-
118
- It gets current broker and calls it's kick method,
119
- returning what it returns.
120
-
121
- :param args: function's arguments.
122
- :param kwargs: function's key word arguments.
123
-
124
- :returns: taskiq task.
125
- """
126
- logger .debug (
127
- f"Kicking { self .task_name } with args={ args } and kwargs={ kwargs } ." ,
128
- )
129
- message = self ._prepare_message (* args , ** kwargs )
130
- await self .broker .kick (message )
131
- return self .broker .result_backend .generate_task (message .task_id )
132
-
133
- @classmethod
134
- def _prepare_arg (cls , arg : Any ) -> Any :
135
- """
136
- Parses argument if possible.
137
-
138
- This function is used to construct dicts
139
- from pydantic models or dataclasses.
140
-
141
- :param arg: argument to format.
142
- :return: Formatted argument.
143
- """
144
- if isinstance (arg , BaseModel ):
145
- arg = arg .dict ()
146
- if is_dataclass (arg ):
147
- arg = asdict (arg )
148
- return arg
149
-
150
- def _prepare_message ( # noqa: WPS210
151
- self ,
152
- * args : Any ,
153
- ** kwargs : Any ,
154
- ) -> TaskiqMessage :
155
- """
156
- Create a message from args and kwargs.
157
-
158
- :param args: function's args.
159
- :param kwargs: function's kwargs.
160
- :return: constructed message.
161
- """
162
- formatted_args = []
163
- formatted_kwargs = {}
164
- for arg in args :
165
- formatted_args .append (self ._prepare_arg (arg ))
166
- for kwarg_name , kwarg_val in kwargs .items ():
167
- formatted_kwargs [kwarg_name ] = self ._prepare_arg (kwarg_val )
168
-
169
- task_id = uuid4 ().hex
170
-
171
- return TaskiqMessage (
172
- task_id = task_id ,
173
- task_name = self .task_name ,
174
- meta = self .labels ,
175
- args = formatted_args ,
176
- kwargs = formatted_kwargs ,
177
- )
178
-
179
-
180
- class AsyncTaskiqDecoratedTask (Generic [_FuncParams , _ReturnType ]):
181
- """
182
- Class for all task functions.
183
-
184
- When function is decorated
185
- with the `task` decorator, it
186
- will return an instance of this class.
187
-
188
- This class parametrized with original function's
189
- arguments types and a return type.
190
-
191
- This class has kiq method which is used
192
- to kick tasks out of this thread and send them to
193
- current broker.
194
- """
195
-
196
- def __init__ (
197
- self ,
198
- broker : "AsyncBroker" ,
199
- task_name : str ,
200
- original_func : Callable [_FuncParams , _ReturnType ],
201
- labels : Dict [str , Any ],
202
- ) -> None :
203
- self .broker = broker
204
- self .task_name = task_name
205
- self .original_func = original_func
206
- self .labels = labels
207
-
208
- # Docs for this method are ommited in order to help
209
- # your IDE resolve correct docs for it.
210
- def __call__ ( # noqa: D102
211
- self ,
212
- * args : _FuncParams .args ,
213
- ** kwargs : _FuncParams .kwargs ,
214
- ) -> _ReturnType :
215
- return self .original_func (* args , ** kwargs )
216
-
217
- @overload
218
- async def kiq ( # noqa: D102
219
- self : "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T]]" ,
220
- * args : _FuncParams .args ,
221
- ** kwargs : _FuncParams .kwargs ,
222
- ) -> AsyncTaskiqTask [_T ]:
223
- ...
224
-
225
- @overload
226
- async def kiq ( # noqa: D102
227
- self : "AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]" ,
228
- * args : _FuncParams .args ,
229
- ** kwargs : _FuncParams .kwargs ,
230
- ) -> AsyncTaskiqTask [_ReturnType ]:
231
- ...
232
-
233
- async def kiq (
234
- self ,
235
- * args : _FuncParams .args ,
236
- ** kwargs : _FuncParams .kwargs ,
237
- ) -> Any :
238
- """
239
- This method sends function call over the network.
240
-
241
- It gets current broker and calls it's kick method,
242
- returning what it returns.
243
-
244
- :param args: function's arguments.
245
- :param kwargs: function's key word arguments.
246
-
247
- :returns: taskiq task.
248
- """
249
- return await self .kicker ().kiq (* args , ** kwargs )
250
-
251
- def kicker (self ) -> AsyncKicker [_FuncParams , _ReturnType ]:
252
- """
253
- This function returns kicker object.
254
-
255
- Kicker is a object that can modyfy kiq request
256
- before sendig it.
257
-
258
- :return: AsyncKicker instance.
259
- """
260
- return AsyncKicker (
261
- task_name = self .task_name ,
262
- broker = self .broker ,
263
- labels = self .labels ,
264
- )
265
-
266
- def __repr__ (self ) -> str :
267
- return f"AsyncTaskiqDecoratedTask({ self .task_name } )"
268
-
269
14
270
15
class AsyncBroker (ABC ):
271
16
"""
@@ -280,7 +25,7 @@ class AsyncBroker(ABC):
280
25
281
26
def __init__ (
282
27
self ,
283
- result_backend : Optional [AsyncResultBackend [_T ]] = None ,
28
+ result_backend : Optional [AsyncResultBackend [T_ ]] = None ,
284
29
) -> None :
285
30
if result_backend is None :
286
31
result_backend = DummyResultBackend ()
@@ -327,8 +72,8 @@ def listen(self) -> AsyncGenerator[TaskiqMessage, None]:
327
72
@overload
328
73
def task (
329
74
self ,
330
- task_name : Callable [_FuncParams , _ReturnType ],
331
- ) -> AsyncTaskiqDecoratedTask [_FuncParams , _ReturnType ]:
75
+ task_name : Callable [FuncParams_ , ReturnType_ ],
76
+ ) -> AsyncTaskiqDecoratedTask [FuncParams_ , ReturnType_ ]:
332
77
...
333
78
334
79
@overload
@@ -337,8 +82,8 @@ def task(
337
82
task_name : Optional [str ] = None ,
338
83
** labels : Union [str , int ],
339
84
) -> Callable [
340
- [Callable [_FuncParams , _ReturnType ]],
341
- AsyncTaskiqDecoratedTask [_FuncParams , _ReturnType ],
85
+ [Callable [FuncParams_ , ReturnType_ ]],
86
+ AsyncTaskiqDecoratedTask [FuncParams_ , ReturnType_ ],
342
87
]:
343
88
...
344
89
@@ -374,12 +119,12 @@ def make_decorated_task(
374
119
inner_labels : Dict [str , Union [str , int ]],
375
120
inner_task_name : Optional [str ] = None ,
376
121
) -> Callable [
377
- [Callable [_FuncParams , _ReturnType ]],
378
- AsyncTaskiqDecoratedTask [_FuncParams , _ReturnType ],
122
+ [Callable [FuncParams_ , ReturnType_ ]],
123
+ AsyncTaskiqDecoratedTask [FuncParams_ , ReturnType_ ],
379
124
]:
380
125
def inner (
381
- func : Callable [_FuncParams , _ReturnType ],
382
- ) -> AsyncTaskiqDecoratedTask [_FuncParams , _ReturnType ]:
126
+ func : Callable [FuncParams_ , ReturnType_ ],
127
+ ) -> AsyncTaskiqDecoratedTask [FuncParams_ , ReturnType_ ]:
383
128
nonlocal inner_task_name # noqa: WPS420
384
129
if inner_task_name is None :
385
130
inner_task_name = ( # noqa: WPS442
0 commit comments