Skip to content

Commit e1c8374

Browse files
authored
Merge pull request #17 from taskiq-python/feature/refactoring
Refactored code a little bit.
2 parents 64440da + ec5063e commit e1c8374

File tree

9 files changed

+374
-352
lines changed

9 files changed

+374
-352
lines changed

taskiq/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""Distributed task manager."""
22
from taskiq.abc.broker import AsyncBroker, AsyncTaskiqDecoratedTask
3-
from taskiq.abc.result_backend import AsyncResultBackend, AsyncTaskiqTask
3+
from taskiq.abc.result_backend import AsyncResultBackend
44
from taskiq.message import TaskiqMessage
5+
from taskiq.task import AsyncTaskiqTask
56

67
__all__ = [
78
"TaskiqMessage",

taskiq/abc/broker.py

Lines changed: 14 additions & 269 deletions
Original file line numberDiff line numberDiff line change
@@ -1,271 +1,16 @@
11
from abc import ABC, abstractmethod
2-
from dataclasses import asdict, is_dataclass
32
from functools import wraps
43
from logging import getLogger
5-
from typing import ( # noqa: WPS235
6-
Any,
7-
AsyncGenerator,
8-
Callable,
9-
Coroutine,
10-
Dict,
11-
Generic,
12-
Optional,
13-
Set,
14-
TypeVar,
15-
Union,
16-
overload,
17-
)
18-
from uuid import uuid4
19-
20-
from pydantic import BaseModel
21-
from typing_extensions import ParamSpec
22-
23-
from taskiq.abc.result_backend import AsyncResultBackend, AsyncTaskiqTask
4+
from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union, overload
5+
6+
from taskiq.abc.result_backend import AsyncResultBackend
7+
from taskiq.decor import AsyncTaskiqDecoratedTask
248
from taskiq.message import TaskiqMessage
259
from taskiq.result_backends.dummy import DummyResultBackend
10+
from taskiq.types_helpers import T_, FuncParams_, ReturnType_
2611

2712
logger = getLogger("taskiq")
2813

29-
_T = TypeVar("_T") # noqa: WPS111
30-
_FuncParams = ParamSpec("_FuncParams")
31-
_ReturnType = TypeVar("_ReturnType")
32-
33-
34-
class AsyncKicker(Generic[_FuncParams, _ReturnType]):
35-
"""Class that used to modify data before sending it to broker."""
36-
37-
def __init__(
38-
self,
39-
task_name: str,
40-
broker: "AsyncBroker",
41-
labels: Dict[str, Any],
42-
) -> None:
43-
self.task_name = task_name
44-
self.broker = broker
45-
self.labels = labels
46-
47-
def with_label(
48-
self,
49-
label_name: str,
50-
value: Any,
51-
) -> "AsyncKicker[_FuncParams, _ReturnType]":
52-
"""
53-
Update one single label.
54-
55-
This method is used to update
56-
task's labels before sending.
57-
58-
:param label_name: name of the label to update.
59-
:param value: label's value.
60-
:return: kicker object with new labels.
61-
"""
62-
self.labels[label_name] = value
63-
return self
64-
65-
def with_labels(
66-
self,
67-
labels: Dict[str, Any],
68-
) -> "AsyncKicker[_FuncParams, _ReturnType]":
69-
"""
70-
Update function's labels before sending.
71-
72-
:param labels: dict with new labels.
73-
:return: kicker with new labels.
74-
"""
75-
self.labels.update(labels)
76-
return self
77-
78-
def with_broker(
79-
self,
80-
broker: "AsyncBroker",
81-
) -> "AsyncKicker[_FuncParams, _ReturnType]":
82-
"""
83-
Replace broker for the function.
84-
85-
This method can be used with
86-
shared tasks.
87-
88-
:param broker: new broker instance.
89-
:return: Kicker with new broker.
90-
"""
91-
self.broker = broker
92-
return self
93-
94-
@overload
95-
async def kiq( # noqa: D102
96-
self: "AsyncKicker[_FuncParams, Coroutine[Any, Any, _T]]",
97-
*args: _FuncParams.args,
98-
**kwargs: _FuncParams.kwargs,
99-
) -> AsyncTaskiqTask[_T]:
100-
...
101-
102-
@overload
103-
async def kiq( # noqa: D102
104-
self: "AsyncKicker[_FuncParams, _ReturnType]",
105-
*args: _FuncParams.args,
106-
**kwargs: _FuncParams.kwargs,
107-
) -> AsyncTaskiqTask[_ReturnType]:
108-
...
109-
110-
async def kiq(
111-
self,
112-
*args: _FuncParams.args,
113-
**kwargs: _FuncParams.kwargs,
114-
) -> Any:
115-
"""
116-
This method sends function call over the network.
117-
118-
It gets current broker and calls it's kick method,
119-
returning what it returns.
120-
121-
:param args: function's arguments.
122-
:param kwargs: function's key word arguments.
123-
124-
:returns: taskiq task.
125-
"""
126-
logger.debug(
127-
f"Kicking {self.task_name} with args={args} and kwargs={kwargs}.",
128-
)
129-
message = self._prepare_message(*args, **kwargs)
130-
await self.broker.kick(message)
131-
return self.broker.result_backend.generate_task(message.task_id)
132-
133-
@classmethod
134-
def _prepare_arg(cls, arg: Any) -> Any:
135-
"""
136-
Parses argument if possible.
137-
138-
This function is used to construct dicts
139-
from pydantic models or dataclasses.
140-
141-
:param arg: argument to format.
142-
:return: Formatted argument.
143-
"""
144-
if isinstance(arg, BaseModel):
145-
arg = arg.dict()
146-
if is_dataclass(arg):
147-
arg = asdict(arg)
148-
return arg
149-
150-
def _prepare_message( # noqa: WPS210
151-
self,
152-
*args: Any,
153-
**kwargs: Any,
154-
) -> TaskiqMessage:
155-
"""
156-
Create a message from args and kwargs.
157-
158-
:param args: function's args.
159-
:param kwargs: function's kwargs.
160-
:return: constructed message.
161-
"""
162-
formatted_args = []
163-
formatted_kwargs = {}
164-
for arg in args:
165-
formatted_args.append(self._prepare_arg(arg))
166-
for kwarg_name, kwarg_val in kwargs.items():
167-
formatted_kwargs[kwarg_name] = self._prepare_arg(kwarg_val)
168-
169-
task_id = uuid4().hex
170-
171-
return TaskiqMessage(
172-
task_id=task_id,
173-
task_name=self.task_name,
174-
meta=self.labels,
175-
args=formatted_args,
176-
kwargs=formatted_kwargs,
177-
)
178-
179-
180-
class AsyncTaskiqDecoratedTask(Generic[_FuncParams, _ReturnType]):
181-
"""
182-
Class for all task functions.
183-
184-
When function is decorated
185-
with the `task` decorator, it
186-
will return an instance of this class.
187-
188-
This class parametrized with original function's
189-
arguments types and a return type.
190-
191-
This class has kiq method which is used
192-
to kick tasks out of this thread and send them to
193-
current broker.
194-
"""
195-
196-
def __init__(
197-
self,
198-
broker: "AsyncBroker",
199-
task_name: str,
200-
original_func: Callable[_FuncParams, _ReturnType],
201-
labels: Dict[str, Any],
202-
) -> None:
203-
self.broker = broker
204-
self.task_name = task_name
205-
self.original_func = original_func
206-
self.labels = labels
207-
208-
# Docs for this method are ommited in order to help
209-
# your IDE resolve correct docs for it.
210-
def __call__( # noqa: D102
211-
self,
212-
*args: _FuncParams.args,
213-
**kwargs: _FuncParams.kwargs,
214-
) -> _ReturnType:
215-
return self.original_func(*args, **kwargs)
216-
217-
@overload
218-
async def kiq( # noqa: D102
219-
self: "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T]]",
220-
*args: _FuncParams.args,
221-
**kwargs: _FuncParams.kwargs,
222-
) -> AsyncTaskiqTask[_T]:
223-
...
224-
225-
@overload
226-
async def kiq( # noqa: D102
227-
self: "AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]",
228-
*args: _FuncParams.args,
229-
**kwargs: _FuncParams.kwargs,
230-
) -> AsyncTaskiqTask[_ReturnType]:
231-
...
232-
233-
async def kiq(
234-
self,
235-
*args: _FuncParams.args,
236-
**kwargs: _FuncParams.kwargs,
237-
) -> Any:
238-
"""
239-
This method sends function call over the network.
240-
241-
It gets current broker and calls it's kick method,
242-
returning what it returns.
243-
244-
:param args: function's arguments.
245-
:param kwargs: function's key word arguments.
246-
247-
:returns: taskiq task.
248-
"""
249-
return await self.kicker().kiq(*args, **kwargs)
250-
251-
def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
252-
"""
253-
This function returns kicker object.
254-
255-
Kicker is a object that can modyfy kiq request
256-
before sendig it.
257-
258-
:return: AsyncKicker instance.
259-
"""
260-
return AsyncKicker(
261-
task_name=self.task_name,
262-
broker=self.broker,
263-
labels=self.labels,
264-
)
265-
266-
def __repr__(self) -> str:
267-
return f"AsyncTaskiqDecoratedTask({self.task_name})"
268-
26914

27015
class AsyncBroker(ABC):
27116
"""
@@ -280,7 +25,7 @@ class AsyncBroker(ABC):
28025

28126
def __init__(
28227
self,
283-
result_backend: Optional[AsyncResultBackend[_T]] = None,
28+
result_backend: Optional[AsyncResultBackend[T_]] = None,
28429
) -> None:
28530
if result_backend is None:
28631
result_backend = DummyResultBackend()
@@ -327,8 +72,8 @@ def listen(self) -> AsyncGenerator[TaskiqMessage, None]:
32772
@overload
32873
def task(
32974
self,
330-
task_name: Callable[_FuncParams, _ReturnType],
331-
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
75+
task_name: Callable[FuncParams_, ReturnType_],
76+
) -> AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_]:
33277
...
33378

33479
@overload
@@ -337,8 +82,8 @@ def task(
33782
task_name: Optional[str] = None,
33883
**labels: Union[str, int],
33984
) -> Callable[
340-
[Callable[_FuncParams, _ReturnType]],
341-
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
85+
[Callable[FuncParams_, ReturnType_]],
86+
AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_],
34287
]:
34388
...
34489

@@ -374,12 +119,12 @@ def make_decorated_task(
374119
inner_labels: Dict[str, Union[str, int]],
375120
inner_task_name: Optional[str] = None,
376121
) -> Callable[
377-
[Callable[_FuncParams, _ReturnType]],
378-
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
122+
[Callable[FuncParams_, ReturnType_]],
123+
AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_],
379124
]:
380125
def inner(
381-
func: Callable[_FuncParams, _ReturnType],
382-
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
126+
func: Callable[FuncParams_, ReturnType_],
127+
) -> AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_]:
383128
nonlocal inner_task_name # noqa: WPS420
384129
if inner_task_name is None:
385130
inner_task_name = ( # noqa: WPS442

0 commit comments

Comments
 (0)