Skip to content

Commit b124c06

Browse files
committed
Added custom formatters. Fixed mypy.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 17c2332 commit b124c06

File tree

15 files changed

+201
-88
lines changed

15 files changed

+201
-88
lines changed

.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ docstring_style=sphinx
88
ignore =
99
; Found `%` string formatting
1010
WPS323,
11+
; Found block variable overlap.
12+
WPS440,
1113
; Found multi-line function type annotation
1214
WPS320,
1315
; Found statement that has no effect

taskiq/abc/broker.py

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
11
from abc import ABC, abstractmethod
22
from functools import wraps
33
from logging import getLogger
4-
from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union, overload
5-
4+
from typing import (
5+
Any,
6+
AsyncGenerator,
7+
Callable,
8+
Dict,
9+
Optional,
10+
TypeVar,
11+
Union,
12+
overload,
13+
)
14+
15+
from typing_extensions import ParamSpec
16+
17+
from taskiq.abc.plugins.formatter import TaskiqFormatter
618
from taskiq.abc.result_backend import AsyncResultBackend
719
from taskiq.decor import AsyncTaskiqDecoratedTask
8-
from taskiq.message import TaskiqMessage
20+
from taskiq.message import BrokerMessage
21+
from taskiq.plugins.json_formatter import JSONFormatter
922
from taskiq.result_backends.dummy import DummyResultBackend
10-
from taskiq.types_helpers import T_, FuncParams_, ReturnType_
23+
24+
_T = TypeVar("_T") # noqa: WPS111
25+
_FuncParams = ParamSpec("_FuncParams")
26+
_ReturnType = TypeVar("_ReturnType")
1127

1228
logger = getLogger("taskiq")
1329

@@ -25,13 +41,14 @@ class AsyncBroker(ABC):
2541

2642
def __init__(
2743
self,
28-
result_backend: Optional[AsyncResultBackend[T_]] = None,
44+
result_backend: Optional[AsyncResultBackend[_T]] = None,
2945
) -> None:
3046
if result_backend is None:
3147
result_backend = DummyResultBackend()
3248
self.result_backend = result_backend
3349
self.is_worker_process = False
3450
self.decorator_class = AsyncTaskiqDecoratedTask
51+
self.formatter: TaskiqFormatter = JSONFormatter()
3552

3653
async def startup(self) -> None:
3754
"""Do something when starting broker."""
@@ -47,7 +64,7 @@ async def shutdown(self) -> None:
4764
@abstractmethod
4865
async def kick(
4966
self,
50-
message: TaskiqMessage,
67+
message: BrokerMessage,
5168
) -> None:
5269
"""
5370
This method is used to kick tasks out from current program.
@@ -59,7 +76,7 @@ async def kick(
5976
"""
6077

6178
@abstractmethod
62-
def listen(self) -> AsyncGenerator[TaskiqMessage, None]:
79+
def listen(self) -> AsyncGenerator[BrokerMessage, None]:
6380
"""
6481
This function listens to new messages and yields them.
6582
@@ -73,8 +90,8 @@ def listen(self) -> AsyncGenerator[TaskiqMessage, None]:
7390
@overload
7491
def task(
7592
self,
76-
task_name: Callable[FuncParams_, ReturnType_],
77-
) -> AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_]:
93+
task_name: Callable[_FuncParams, _ReturnType],
94+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
7895
...
7996

8097
@overload
@@ -83,8 +100,8 @@ def task(
83100
task_name: Optional[str] = None,
84101
**labels: Union[str, int],
85102
) -> Callable[
86-
[Callable[FuncParams_, ReturnType_]],
87-
AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_],
103+
[Callable[_FuncParams, _ReturnType]],
104+
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
88105
]:
89106
...
90107

@@ -120,12 +137,12 @@ def make_decorated_task(
120137
inner_labels: Dict[str, Union[str, int]],
121138
inner_task_name: Optional[str] = None,
122139
) -> Callable[
123-
[Callable[FuncParams_, ReturnType_]],
124-
AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_],
140+
[Callable[_FuncParams, _ReturnType]],
141+
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
125142
]:
126143
def inner(
127-
func: Callable[FuncParams_, ReturnType_],
128-
) -> AsyncTaskiqDecoratedTask[FuncParams_, ReturnType_]:
144+
func: Callable[_FuncParams, _ReturnType],
145+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
129146
nonlocal inner_task_name # noqa: WPS420
130147
if inner_task_name is None:
131148
inner_task_name = ( # noqa: WPS442

taskiq/abc/plugins/formatter.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Any, Dict
3+
4+
from taskiq.message import BrokerMessage, TaskiqMessage
5+
6+
7+
class TaskiqFormatter(ABC):
8+
"""Custom formatter for brokers."""
9+
10+
@abstractmethod
11+
def dumps(self, message: TaskiqMessage, labels: Dict[str, Any]) -> BrokerMessage:
12+
"""
13+
Dump message to broker message instance.
14+
15+
:param message: message to send.
16+
:param labels: task's labels.
17+
:return: message for brokers.
18+
"""
19+
20+
@abstractmethod
21+
def loads(self, message: BrokerMessage) -> TaskiqMessage:
22+
"""
23+
Parses broker message to TaskiqMessage.
24+
25+
:param message: message to parse.
26+
:return: parsed taskiq message.
27+
"""

taskiq/abc/result_backend.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
from abc import ABC, abstractmethod
2-
from typing import Generic
2+
from typing import Generic, TypeVar
33

44
from taskiq.result import TaskiqResult
55
from taskiq.task import AsyncTaskiqTask
6-
from taskiq.types_helpers import ReturnType_
76

7+
_ReturnType = TypeVar("_ReturnType")
88

9-
class AsyncResultBackend(ABC, Generic[ReturnType_]):
9+
10+
class AsyncResultBackend(ABC, Generic[_ReturnType]):
1011
"""Async result backend."""
1112

1213
async def startup(self) -> None:
@@ -15,7 +16,7 @@ async def startup(self) -> None:
1516
async def shutdown(self) -> None:
1617
"""Do something on shutdown."""
1718

18-
def generate_task(self, task_id: str) -> "AsyncTaskiqTask[ReturnType_]":
19+
def generate_task(self, task_id: str) -> "AsyncTaskiqTask[_ReturnType]":
1920
"""
2021
Generates new task.
2122
@@ -29,7 +30,7 @@ def generate_task(self, task_id: str) -> "AsyncTaskiqTask[ReturnType_]":
2930
return AsyncTaskiqTask(task_id=task_id, result_backend=self)
3031

3132
@abstractmethod
32-
async def set_result(self, task_id: str, result: TaskiqResult[ReturnType_]) -> None:
33+
async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None:
3334
"""
3435
Saves result to the result backend.
3536
@@ -55,7 +56,7 @@ async def get_result(
5556
self,
5657
task_id: str,
5758
with_logs: bool = False,
58-
) -> TaskiqResult[ReturnType_]:
59+
) -> TaskiqResult[_ReturnType]:
5960
"""
6061
Gets result from the task.
6162

taskiq/brokers/inmemory_broker.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
88
from taskiq.cli.async_task_runner import run_task
99
from taskiq.exceptions import TaskiqError
10-
from taskiq.message import TaskiqMessage
10+
from taskiq.message import BrokerMessage
1111

1212
_ReturnType = TypeVar("_ReturnType")
1313

@@ -101,7 +101,7 @@ def __init__(
101101
logs_format = "%(levelname)s %(message)s"
102102
self.logs_format = logs_format
103103

104-
async def kick(self, message: TaskiqMessage) -> None:
104+
async def kick(self, message: BrokerMessage) -> None:
105105
"""
106106
Kicking task.
107107
@@ -111,18 +111,19 @@ async def kick(self, message: TaskiqMessage) -> None:
111111
:raises TaskiqError: if someone wants to kick unknown task.
112112
"""
113113
target_task = self.available_tasks.get(message.task_name)
114+
taskiq_message = self.formatter.loads(message=message)
114115
if target_task is None:
115116
raise TaskiqError("Unknown task.")
116117
result = await run_task(
117118
target=target_task.original_func,
118119
signature=inspect.signature(target_task.original_func),
119-
message=message,
120+
message=taskiq_message,
120121
log_collector_format=self.logs_format,
121122
executor=self.executor,
122123
)
123124
await self.result_backend.set_result(message.task_id, result)
124125

125-
async def listen(self) -> AsyncGenerator[TaskiqMessage, None]: # type: ignore
126+
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
126127
"""
127128
Inmemory broker cannot listen.
128129

taskiq/brokers/shared_broker.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44
from taskiq.decor import AsyncTaskiqDecoratedTask
55
from taskiq.exceptions import TaskiqError
66
from taskiq.kicker import AsyncKicker
7-
from taskiq.message import TaskiqMessage
8-
from taskiq.types_helpers import ReturnType_
7+
from taskiq.message import BrokerMessage
98

10-
Params_ = TypeVar("Params_") # noqa: WPS120
9+
_ReturnType = TypeVar("_ReturnType")
10+
_Params = TypeVar("_Params")
1111

1212

13-
class SharedDecoratedTask(AsyncTaskiqDecoratedTask[Params_, ReturnType_]):
13+
class SharedDecoratedTask(AsyncTaskiqDecoratedTask[_Params, _ReturnType]):
1414
"""Decorator that is used with shared broker."""
1515

16-
def kicker(self) -> AsyncKicker[Params_, ReturnType_]:
16+
def kicker(self) -> AsyncKicker[_Params, _ReturnType]:
1717
"""
1818
This method updates getting default kicker.
1919
@@ -45,7 +45,7 @@ def __init__(self) -> None:
4545
self._default_broker: Optional[AsyncBroker] = None
4646
self.decorator_class = SharedDecoratedTask
4747

48-
async def kick(self, message: TaskiqMessage) -> None:
48+
async def kick(self, message: BrokerMessage) -> None:
4949
"""
5050
Shared broker cannot kick tasks.
5151
@@ -62,7 +62,7 @@ def default_broker(self, new_broker: AsyncBroker) -> None:
6262
"""
6363
self._default_broker = new_broker
6464

65-
async def listen(self) -> AsyncGenerator[TaskiqMessage, None]: # type: ignore
65+
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
6666
"""
6767
Shared broker cannot listen to tasks.
6868

taskiq/cli/async_task_runner.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def parse_params( # noqa: C901
7979
continue
8080
try:
8181
message.args[argnum] = parse_obj_as(annot, value)
82-
except (ValueError, RuntimeError) as exc: # noqa: WPS440
82+
except (ValueError, RuntimeError) as exc:
8383
logger.debug(exc, exc_info=True)
8484

8585

@@ -266,19 +266,28 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
266266
"Function for task %s is resolved. Executing...",
267267
message.task_name,
268268
)
269+
try:
270+
taskiq_msg = broker.formatter.loads(message=message)
271+
except Exception as exc:
272+
logger.warning(
273+
"Cannot parse message: %s. Skipping execution.\n %s",
274+
message,
275+
exc,
276+
exc_info=True,
277+
)
278+
continue
269279
result = await run_task(
270280
broker.available_tasks[message.task_name].original_func,
271281
task_signatures.get(message.task_name),
272-
message,
282+
taskiq_msg,
273283
cli_args.log_collector_format,
274284
executor,
275285
)
276286
try:
277287
await broker.result_backend.set_result(message.task_id, result)
278288
except Exception as exc:
279289
logger.exception(
280-
"Can't set result in %s result backend: \n%s",
281-
broker.result_backend.__class__.__name__,
290+
"Can't set result in result backend. Cause: %s",
282291
exc,
283292
exc_info=True,
284293
)

0 commit comments

Comments
 (0)