Skip to content

Commit ec29277

Browse files
authored
Merge pull request #69 from IvanKirpichnikov/develop
fix #68
2 parents 5e507fb + 2beb27d commit ec29277

File tree

3 files changed

+51
-0
lines changed

3 files changed

+51
-0
lines changed

taskiq_faststream/__about__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
"""FastStream - taskiq integration to schedule FastStream tasks."""
2+
23
__version__ = "0.1.8"

taskiq_faststream/broker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from taskiq.decor import AsyncTaskiqDecoratedTask
1111
from typing_extensions import TypeAlias, override
1212

13+
from taskiq_faststream.formatter import PatchedFormatter
1314
from taskiq_faststream.serializer import PatchedSerializer
1415
from taskiq_faststream.types import ScheduledTask
1516
from taskiq_faststream.utils import resolve_msg
@@ -34,6 +35,7 @@ class BrokerWrapper(AsyncBroker):
3435
def __init__(self, broker: Any) -> None:
3536
super().__init__()
3637
self.serializer = PatchedSerializer()
38+
self.formatter = PatchedFormatter(self)
3739
self.broker = broker
3840

3941
async def startup(self) -> None:

taskiq_faststream/formatter.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from typing import Any, Dict
2+
3+
from taskiq.abc.broker import AsyncBroker
4+
from taskiq.abc.formatter import TaskiqFormatter
5+
from taskiq.compat import IS_PYDANTIC2, Model, model_dump, model_validate
6+
from taskiq.message import BrokerMessage, TaskiqMessage
7+
8+
if IS_PYDANTIC2:
9+
10+
def model_dump(instance: Model) -> Dict[str, Any]:
11+
"""Model dump."""
12+
return instance.model_dump()
13+
14+
else:
15+
16+
def model_dump(instance: Model) -> Dict[str, Any]:
17+
"""Model dump."""
18+
return instance.dict()
19+
20+
21+
class PatchedFormatter(TaskiqFormatter):
22+
"""Default taskiq formatter."""
23+
24+
def __init__(self, broker: AsyncBroker) -> None:
25+
self.broker = broker
26+
27+
def dumps(self, message: TaskiqMessage) -> BrokerMessage:
28+
"""
29+
Dumps taskiq message to some broker message format.
30+
31+
:param message: message to send.
32+
:return: Dumped message.
33+
"""
34+
return BrokerMessage(
35+
task_id=message.task_id,
36+
task_name=message.task_name,
37+
message=self.broker.serializer.dumpb(model_dump(message)),
38+
labels=message.labels,
39+
)
40+
41+
def loads(self, message: bytes) -> TaskiqMessage:
42+
"""
43+
Loads json from message.
44+
45+
:param message: broker's message.
46+
:return: parsed taskiq message.
47+
"""
48+
return model_validate(TaskiqMessage, self.broker.serializer.loadb(message))

0 commit comments

Comments
 (0)