Skip to content

Commit cf87480

Browse files
authored
Implement customizable serializer (#214)
1 parent 34db231 commit cf87480

File tree

13 files changed

+306
-57
lines changed

13 files changed

+306
-57
lines changed

poetry.lock

Lines changed: 62 additions & 52 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ freezegun = "^1.2.2"
6161
pytest-mock = "^3.11.1"
6262
tzlocal = "^5.0.1"
6363
types-tzlocal = "^5.0.1.1"
64+
types-pytz = "^2023.3.1.1"
6465

6566
[tool.poetry.extras]
6667
zmq = ["pyzmq"]

taskiq/abc/broker.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
from typing_extensions import ParamSpec, Self, TypeAlias
2626

2727
from taskiq.abc.middleware import TaskiqMiddleware
28+
from taskiq.abc.serializer import TaskiqSerializer
2829
from taskiq.acks import AckableMessage
2930
from taskiq.decor import AsyncTaskiqDecoratedTask
3031
from taskiq.events import TaskiqEvents
31-
from taskiq.formatters.json_formatter import JSONFormatter
32+
from taskiq.formatters.proxy_formatter import ProxyFormatter
3233
from taskiq.message import BrokerMessage
3334
from taskiq.result_backends.dummy import DummyResultBackend
35+
from taskiq.serializers.json_serializer import JSONSerializer
3436
from taskiq.state import TaskiqState
3537
from taskiq.utils import maybe_awaitable, remove_suffix
3638
from taskiq.warnings import TaskiqDeprecationWarning
@@ -97,7 +99,8 @@ def __init__(
9799
self.middlewares: "List[TaskiqMiddleware]" = []
98100
self.result_backend = result_backend
99101
self.decorator_class = AsyncTaskiqDecoratedTask
100-
self.formatter: "TaskiqFormatter" = JSONFormatter()
102+
self.serializer: TaskiqSerializer = JSONSerializer()
103+
self.formatter: "TaskiqFormatter" = ProxyFormatter(self)
101104
self.id_generator = task_id_generator
102105
self.local_task_registry: Dict[str, AsyncTaskiqDecoratedTask[Any, Any]] = {}
103106
# Every event has a list of handlers.
@@ -479,6 +482,19 @@ def with_event_handlers(
479482
self.event_handlers[event].extend(handlers)
480483
return self
481484

485+
def with_serializer(
486+
self,
487+
serializer: TaskiqSerializer,
488+
) -> "Self": # pragma: no cover
489+
"""
490+
Set a new serializer and return an updated broker.
491+
492+
:param serializer: new serializer.
493+
:return: self
494+
"""
495+
self.serializer = serializer
496+
return self
497+
482498
def _register_task(
483499
self,
484500
task_name: str,

taskiq/abc/serializer.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Any
3+
4+
5+
class TaskiqSerializer(ABC):
6+
"""Custom serializer for brokers."""
7+
8+
@abstractmethod
9+
def dumpb(self, value: Any) -> bytes:
10+
"""
11+
Dump value to bytes for sending through the wire.
12+
13+
:param value: value to encode.
14+
:return: encoded value.
15+
"""
16+
17+
@abstractmethod
18+
def loadb(self, value: bytes) -> Any:
19+
"""
20+
Parse byte-encoded value received from the wire.
21+
22+
:param message: value to parse.
23+
:return: decoded value.
24+
"""

taskiq/compat.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@
1616
def parse_obj_as(annot: T, obj: Any) -> T:
1717
return pydantic.TypeAdapter(annot).validate_python(obj)
1818

19+
def model_validate(
20+
model_class: Type[Model],
21+
message: Dict[str, Any],
22+
) -> Model:
23+
return model_class.model_validate(message)
24+
25+
def model_dump(instance: Model) -> Dict[str, Any]:
26+
return instance.model_dump()
27+
1928
def model_validate_json(
2029
model_class: Type[Model],
2130
message: Union[str, bytes, bytearray],
@@ -37,6 +46,15 @@ def model_copy(
3746
else:
3847
parse_obj_as = pydantic.parse_obj_as # type: ignore
3948

49+
def model_validate(
50+
model_class: Type[Model],
51+
message: Dict[str, Any],
52+
) -> Model:
53+
return model_class.parse_obj(message)
54+
55+
def model_dump(instance: Model) -> Dict[str, Any]:
56+
return instance.dict()
57+
4058
def model_validate_json(
4159
model_class: Type[Model],
4260
message: Union[str, bytes, bytearray],

taskiq/formatters/json_formatter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55

66
class JSONFormatter(TaskiqFormatter):
7-
"""Default taskiq formatter."""
7+
"""JSON taskiq formatter."""
88

99
def dumps(self, message: TaskiqMessage) -> BrokerMessage:
1010
"""

taskiq/formatters/proxy_formatter.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import TYPE_CHECKING
2+
3+
from taskiq.abc.formatter import TaskiqFormatter
4+
from taskiq.compat import model_dump, model_validate
5+
from taskiq.message import BrokerMessage, TaskiqMessage
6+
7+
if TYPE_CHECKING:
8+
from taskiq.abc.broker import AsyncBroker
9+
10+
11+
class ProxyFormatter(TaskiqFormatter):
12+
"""Default taskiq formatter."""
13+
14+
def __init__(self, broker: "AsyncBroker") -> None:
15+
self.broker = broker
16+
17+
def dumps(self, message: TaskiqMessage) -> BrokerMessage:
18+
"""
19+
Dumps taskiq message to some broker message format.
20+
21+
:param message: message to send.
22+
:return: Dumped message.
23+
"""
24+
return BrokerMessage(
25+
task_id=message.task_id,
26+
task_name=message.task_name,
27+
message=self.broker.serializer.dumpb(model_dump(message)),
28+
labels=message.labels,
29+
)
30+
31+
def loads(self, message: bytes) -> TaskiqMessage:
32+
"""
33+
Loads json from message.
34+
35+
:param message: broker's message.
36+
:return: parsed taskiq message.
37+
"""
38+
return model_validate(TaskiqMessage, self.broker.serializer.loadb(message))

taskiq/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class TaskiqMessage(BaseModel):
1414

1515
task_id: str
1616
task_name: str
17-
labels: Dict[str, str]
17+
labels: Dict[str, Any]
1818
args: List[Any]
1919
kwargs: Dict[str, Any]
2020

@@ -25,4 +25,4 @@ class BrokerMessage(BaseModel):
2525
task_id: str
2626
task_name: str
2727
message: bytes
28-
labels: Dict[str, str]
28+
labels: Dict[str, Any]

taskiq/serializers/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Taskiq serializers."""

taskiq/serializers/json_serializer.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from json import dumps, loads
2+
from typing import Any
3+
4+
from taskiq.abc.serializer import TaskiqSerializer
5+
6+
7+
class JSONSerializer(TaskiqSerializer):
8+
"""Default taskiq serizalizer."""
9+
10+
def dumpb(self, value: Any) -> bytes:
11+
"""
12+
Dumps taskiq message to some broker message format.
13+
14+
:param message: message to send.
15+
:return: Dumped message.
16+
"""
17+
return dumps(value).encode()
18+
19+
def loadb(self, value: bytes) -> Any:
20+
"""
21+
Parse byte-encoded value received from the wire.
22+
23+
:param message: value to parse.
24+
:return: decoded value.
25+
"""
26+
return loads(value.decode())

0 commit comments

Comments
 (0)