Skip to content

Commit 052ddb4

Browse files
committed
Added kicker object.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent b50f70b commit 052ddb4

File tree

1 file changed

+135
-29
lines changed

1 file changed

+135
-29
lines changed

taskiq/abc/broker.py

Lines changed: 135 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from abc import ABC, abstractmethod
2+
from dataclasses import asdict, is_dataclass
23
from functools import wraps
34
from logging import getLogger
45
from typing import ( # noqa: WPS235
@@ -29,42 +30,65 @@
2930
_ReturnType = TypeVar("_ReturnType")
3031

3132

32-
class AsyncTaskiqDecoratedTask(Generic[_FuncParams, _ReturnType]):
33-
"""
34-
Class for all task functions.
35-
36-
When function is decorated
37-
with the `task` decorator, it
38-
will return an instance of this class.
39-
40-
This class parametrized with original function's
41-
arguments types and a return type.
42-
43-
This class has kiq method which is used
44-
to kick tasks out of this thread and send them to
45-
current broker.
46-
"""
33+
class AsyncKicker(Generic[_FuncParams, _ReturnType]):
34+
"""Class that used to modify data before sending it to broker."""
4735

4836
def __init__(
4937
self,
50-
broker: "AsyncBroker",
5138
task_name: str,
52-
original_func: Callable[_FuncParams, _ReturnType],
39+
broker: "AsyncBroker",
5340
labels: Dict[str, Any],
5441
) -> None:
55-
self.broker = broker
5642
self.task_name = task_name
57-
self.original_func = original_func
43+
self.broker = broker
5844
self.labels = labels
5945

60-
# Docs for this method are ommited in order to help
61-
# your IDE resolve correct docs for it.
62-
def __call__( # noqa: D102
46+
def with_label(
6347
self,
64-
*args: _FuncParams.args,
65-
**kwargs: _FuncParams.kwargs,
66-
) -> _ReturnType:
67-
return self.original_func(*args, **kwargs)
48+
label_name: str,
49+
value: Any,
50+
) -> "AsyncKicker[_FuncParams, _ReturnType]":
51+
"""
52+
Update one single label.
53+
54+
This method is used to update
55+
task's labels before sending.
56+
57+
:param label_name: name of the label to update.
58+
:param value: label's value.
59+
:return: kicker object with new labels.
60+
"""
61+
self.labels[label_name] = value
62+
return self
63+
64+
def with_labels(
65+
self,
66+
labels: Dict[str, Any],
67+
) -> "AsyncKicker[_FuncParams, _ReturnType]":
68+
"""
69+
Update function's labels before sending.
70+
71+
:param labels: dict with new labels.
72+
:return: kicker with new labels.
73+
"""
74+
self.labels.update(labels)
75+
return self
76+
77+
def with_broker(
78+
self,
79+
broker: "AsyncBroker",
80+
) -> "AsyncKicker[_FuncParams, _ReturnType]":
81+
"""
82+
Replace broker for the function.
83+
84+
This method can be used with
85+
shared tasks.
86+
87+
:param broker: new broker instance.
88+
:return: Kicker with new broker.
89+
"""
90+
self.broker = broker
91+
return self
6892

6993
async def kiq(
7094
self,
@@ -89,13 +113,21 @@ async def kiq(
89113
await self.broker.kick(message)
90114
return self.broker.result_backend.generate_task(message.task_id) # type: ignore
91115

92-
def __repr__(self) -> str:
93-
return f"AsyncTaskiqDecoratedTask({self.task_name})"
94-
95116
@classmethod
96117
def _prepare_arg(cls, arg: Any) -> Any:
118+
"""
119+
Parses argument if possible.
120+
121+
This function is used to construct dicts
122+
from pydantic models or dataclasses.
123+
124+
:param arg: argument to format.
125+
:return: Formatted argument.
126+
"""
97127
if isinstance(arg, BaseModel):
98128
arg = arg.dict()
129+
if is_dataclass(arg):
130+
arg = asdict(arg)
99131
return arg
100132

101133
def _prepare_message( # noqa: WPS210
@@ -128,6 +160,80 @@ def _prepare_message( # noqa: WPS210
128160
)
129161

130162

163+
class AsyncTaskiqDecoratedTask(Generic[_FuncParams, _ReturnType]):
164+
"""
165+
Class for all task functions.
166+
167+
When function is decorated
168+
with the `task` decorator, it
169+
will return an instance of this class.
170+
171+
This class parametrized with original function's
172+
arguments types and a return type.
173+
174+
This class has kiq method which is used
175+
to kick tasks out of this thread and send them to
176+
current broker.
177+
"""
178+
179+
def __init__(
180+
self,
181+
broker: "AsyncBroker",
182+
task_name: str,
183+
original_func: Callable[_FuncParams, _ReturnType],
184+
labels: Dict[str, Any],
185+
) -> None:
186+
self.broker = broker
187+
self.task_name = task_name
188+
self.original_func = original_func
189+
self.labels = labels
190+
191+
# Docs for this method are ommited in order to help
192+
# your IDE resolve correct docs for it.
193+
def __call__( # noqa: D102
194+
self,
195+
*args: _FuncParams.args,
196+
**kwargs: _FuncParams.kwargs,
197+
) -> _ReturnType:
198+
return self.original_func(*args, **kwargs)
199+
200+
async def kiq(
201+
self,
202+
*args: _FuncParams.args,
203+
**kwargs: _FuncParams.kwargs,
204+
) -> AsyncTaskiqTask[_ReturnType]:
205+
"""
206+
This method sends function call over the network.
207+
208+
It gets current broker and calls it's kick method,
209+
returning what it returns.
210+
211+
:param args: function's arguments.
212+
:param kwargs: function's key word arguments.
213+
214+
:returns: taskiq task.
215+
"""
216+
return await self.kicker().kiq(*args, **kwargs)
217+
218+
def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
219+
"""
220+
This function returns kicker object.
221+
222+
Kicker is a object that can modyfy kiq request
223+
before sendig it.
224+
225+
:return: AsyncKicker instance.
226+
"""
227+
return AsyncKicker(
228+
task_name=self.task_name,
229+
broker=self.broker,
230+
labels=self.labels,
231+
)
232+
233+
def __repr__(self) -> str:
234+
return f"AsyncTaskiqDecoratedTask({self.task_name})"
235+
236+
131237
class AsyncBroker(ABC):
132238
"""
133239
Async broker.

0 commit comments

Comments
 (0)