Skip to content

Commit 8eb7850

Browse files
authored
Merge pull request #18 from taskiq-python/feature/plugins
Plugin system.
2 parents 17c2332 + 4c008df commit 8eb7850

16 files changed

+366
-138
lines changed

.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ docstring_style=sphinx
88
ignore =
99
; Found `%` string formatting
1010
WPS323,
11+
; Found block variable overlap.
12+
WPS440,
1113
; Found multi-line function type annotation
1214
WPS320,
1315
; Found statement that has no effect

taskiq/abc/broker.py

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,34 @@
11
from abc import ABC, abstractmethod
22
from functools import wraps
33
from logging import getLogger
4-
from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union, overload
4+
from typing import ( # noqa: WPS235
5+
TYPE_CHECKING,
6+
Any,
7+
AsyncGenerator,
8+
Callable,
9+
Dict,
10+
List,
11+
Optional,
12+
TypeVar,
13+
Union,
14+
overload,
15+
)
16+
17+
from typing_extensions import ParamSpec
518

6-
from taskiq.abc.result_backend import AsyncResultBackend
719
from taskiq.decor import AsyncTaskiqDecoratedTask
8-
from taskiq.message import TaskiqMessage
20+
from taskiq.message import BrokerMessage
21+
from taskiq.plugins.json_formatter import JSONFormatter
922
from taskiq.result_backends.dummy import DummyResultBackend
10-
from taskiq.types_helpers import T_, FuncParams_, ReturnType_
23+
24+
if TYPE_CHECKING:
25+
from taskiq.abc.formatter import TaskiqFormatter
26+
from taskiq.abc.middleware import TaskiqMiddleware
27+
from taskiq.abc.result_backend import AsyncResultBackend
28+
29+
_T = TypeVar("_T") # noqa: WPS111
30+
_FuncParams = ParamSpec("_FuncParams")
31+
_ReturnType = TypeVar("_ReturnType")
1132

1233
logger = getLogger("taskiq")
1334

@@ -25,13 +46,28 @@ class AsyncBroker(ABC):
2546

2647
def __init__(
2748
self,
28-
result_backend: Optional[AsyncResultBackend[T_]] = None,
49+
result_backend: "Optional[AsyncResultBackend[_T]]" = None,
2950
) -> None:
3051
if result_backend is None:
3152
result_backend = DummyResultBackend()
53+
self.middlewares: "List[TaskiqMiddleware]" = []
3254
self.result_backend = result_backend
3355
self.is_worker_process = False
3456
self.decorator_class = AsyncTaskiqDecoratedTask
57+
self.formatter: "TaskiqFormatter" = JSONFormatter()
58+
59+
def add_middlewares(self, middlewares: "List[TaskiqMiddleware]") -> None:
60+
"""
61+
Add a list of middlewares.
62+
63+
You should call this method to set middlewares,
64+
since it saves current broker in all middlewares.
65+
66+
:param middlewares: list of middlewares.
67+
"""
68+
for middleware in middlewares:
69+
middleware.set_broker(self)
70+
self.middlewares.append(middleware)
3571

3672
async def startup(self) -> None:
3773
"""Do something when starting broker."""
@@ -47,7 +83,7 @@ async def shutdown(self) -> None:
4783
@abstractmethod
4884
async def kick(
4985
self,
50-
message: TaskiqMessage,
86+
message: BrokerMessage,
5187
) -> None:
5288
"""
5389
This method is used to kick tasks out from current program.
@@ -59,7 +95,7 @@ async def kick(
5995
"""
6096

6197
@abstractmethod
62-
def listen(self) -> AsyncGenerator[TaskiqMessage, None]:
98+
def listen(self) -> AsyncGenerator[BrokerMessage, None]:
6399
"""
64100
This function listens to new messages and yields them.
65101
@@ -73,8 +109,8 @@ def listen(self) -> AsyncGenerator[TaskiqMessage, None]:
73109
@overload
74110
def task(
75111
self,
76-
task_name: Callable[FuncParams_, ReturnType_],
77-
) -> AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_]:
112+
task_name: Callable[_FuncParams, _ReturnType],
113+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
78114
...
79115

80116
@overload
@@ -83,8 +119,8 @@ def task(
83119
task_name: Optional[str] = None,
84120
**labels: Union[str, int],
85121
) -> Callable[
86-
[Callable[FuncParams_, ReturnType_]],
87-
AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_],
122+
[Callable[_FuncParams, _ReturnType]],
123+
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
88124
]:
89125
...
90126

@@ -120,12 +156,12 @@ def make_decorated_task(
120156
inner_labels: Dict[str, Union[str, int]],
121157
inner_task_name: Optional[str] = None,
122158
) -> Callable[
123-
[Callable[FuncParams_, ReturnType_]],
124-
AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_],
159+
[Callable[_FuncParams, _ReturnType]],
160+
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
125161
]:
126162
def inner(
127-
func: Callable[FuncParams_, ReturnType_],
128-
) -> AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_]:
163+
func: Callable[_FuncParams, _ReturnType],
164+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
129165
nonlocal inner_task_name # noqa: WPS420
130166
if inner_task_name is None:
131167
inner_task_name = ( # noqa: WPS442

taskiq/abc/formatter.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Any, Dict
3+
4+
from taskiq.message import BrokerMessage, TaskiqMessage
5+
6+
7+
class TaskiqFormatter(ABC):
8+
"""Custom formatter for brokers."""
9+
10+
@abstractmethod
11+
def dumps(self, message: TaskiqMessage, labels: Dict[str, Any]) -> BrokerMessage:
12+
"""
13+
Dump message to broker message instance.
14+
15+
:param message: message to send.
16+
:param labels: task's labels.
17+
:return: message for brokers.
18+
"""
19+
20+
@abstractmethod
21+
def loads(self, message: BrokerMessage) -> TaskiqMessage:
22+
"""
23+
Parses broker message to TaskiqMessage.
24+
25+
:param message: message to parse.
26+
:return: parsed taskiq message.
27+
"""

taskiq/abc/middleware.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
from typing import TYPE_CHECKING, Any, Coroutine, Dict, Union
2+
3+
if TYPE_CHECKING:
4+
from taskiq.abc.broker import AsyncBroker
5+
from taskiq.message import TaskiqMessage
6+
from taskiq.result import TaskiqResult
7+
8+
9+
class TaskiqMiddleware:
10+
"""Base class for middlewares."""
11+
12+
def __init__(self) -> None:
13+
self.broker: "AsyncBroker" = None # type: ignore
14+
15+
def set_broker(self, broker: "AsyncBroker") -> None:
16+
"""
17+
Sets broker to middleware.
18+
19+
:param broker: broker to set.
20+
"""
21+
self.broker = broker
22+
23+
def pre_send(
24+
self,
25+
message: "TaskiqMessage",
26+
labels: Dict[str, Any],
27+
) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
28+
"""
29+
Hook that executes before sending the task to worker.
30+
31+
This is a client-side hook, that executes right before
32+
the message is sent to broker.
33+
34+
:param message: message to send.
35+
:param labels: task's labels.
36+
:return: modified message.
37+
"""
38+
return message
39+
40+
def post_send(
41+
self,
42+
message: "TaskiqMessage",
43+
labels: Dict[str, Any],
44+
) -> "Union[None, Coroutine[Any, Any, None]]":
45+
"""
46+
This hook is executed right after the task is sent.
47+
48+
This is a client-side hook. It executes right
49+
after the messages is kicked in broker.
50+
51+
:param message: kicked message.
52+
:param labels: labels for a message.
53+
"""
54+
55+
def pre_execute(
56+
self,
57+
message: "TaskiqMessage",
58+
labels: Dict[str, Any],
59+
) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
60+
"""
61+
This hook is called before executing task.
62+
63+
This is a worker-side hook, wich means it
64+
executes in the worker process.
65+
66+
:param message: incoming parsed taskiq message.
67+
:param labels: task's labels without user-supplied lables.
68+
:return: modified message.
69+
"""
70+
return message
71+
72+
def post_execute(
73+
self,
74+
result: "TaskiqResult[Any]",
75+
labels: Dict[str, Any],
76+
) -> "Union[None, Coroutine[Any, Any, None]]":
77+
"""
78+
This hook executes after task is complete.
79+
80+
This is a worker-side hook. It's called
81+
in worker process.
82+
83+
:param result: result of execution for current task.
84+
:param labels: task's labels. Without user-supplied labels.
85+
"""

taskiq/abc/result_backend.py

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from abc import ABC, abstractmethod
2-
from typing import Generic
2+
from typing import Generic, TypeVar
33

44
from taskiq.result import TaskiqResult
5-
from taskiq.task import AsyncTaskiqTask
6-
from taskiq.types_helpers import ReturnType_
75

6+
_ReturnType = TypeVar("_ReturnType")
87

9-
class AsyncResultBackend(ABC, Generic[ReturnType_]):
8+
9+
class AsyncResultBackend(ABC, Generic[_ReturnType]):
1010
"""Async result backend."""
1111

1212
async def startup(self) -> None:
@@ -15,21 +15,8 @@ async def startup(self) -> None:
1515
async def shutdown(self) -> None:
1616
"""Do something on shutdown."""
1717

18-
def generate_task(self, task_id: str) -> "AsyncTaskiqTask[ReturnType_]":
19-
"""
20-
Generates new task.
21-
22-
This function creates new AsyncTaskiqTask
23-
that returned to client after calling kiq
24-
method.
25-
26-
:param task_id: id of a task to save.
27-
:return: task object.
28-
"""
29-
return AsyncTaskiqTask(task_id=task_id, result_backend=self)
30-
3118
@abstractmethod
32-
async def set_result(self, task_id: str, result: TaskiqResult[ReturnType_]) -> None:
19+
async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None:
3320
"""
3421
Saves result to the result backend.
3522
@@ -55,7 +42,7 @@ async def get_result(
5542
self,
5643
task_id: str,
5744
with_logs: bool = False,
58-
) -> TaskiqResult[ReturnType_]:
45+
) -> TaskiqResult[_ReturnType]:
5946
"""
6047
Gets result from the task.
6148

taskiq/brokers/inmemory_broker.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
88
from taskiq.cli.async_task_runner import run_task
99
from taskiq.exceptions import TaskiqError
10-
from taskiq.message import TaskiqMessage
10+
from taskiq.message import BrokerMessage
1111

1212
_ReturnType = TypeVar("_ReturnType")
1313

@@ -101,7 +101,7 @@ def __init__(
101101
logs_format = "%(levelname)s %(message)s"
102102
self.logs_format = logs_format
103103

104-
async def kick(self, message: TaskiqMessage) -> None:
104+
async def kick(self, message: BrokerMessage) -> None:
105105
"""
106106
Kicking task.
107107
@@ -111,18 +111,19 @@ async def kick(self, message: TaskiqMessage) -> None:
111111
:raises TaskiqError: if someone wants to kick unknown task.
112112
"""
113113
target_task = self.available_tasks.get(message.task_name)
114+
taskiq_message = self.formatter.loads(message=message)
114115
if target_task is None:
115116
raise TaskiqError("Unknown task.")
116117
result = await run_task(
117118
target=target_task.original_func,
118119
signature=inspect.signature(target_task.original_func),
119-
message=message,
120+
message=taskiq_message,
120121
log_collector_format=self.logs_format,
121122
executor=self.executor,
122123
)
123124
await self.result_backend.set_result(message.task_id, result)
124125

125-
async def listen(self) -> AsyncGenerator[TaskiqMessage, None]: # type: ignore
126+
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
126127
"""
127128
Inmemory broker cannot listen.
128129

0 commit comments

Comments
 (0)