Skip to content

Commit da715f7

Browse files
committed
fix: add custom serializer and scheduler
1 parent 972d9fd commit da715f7

File tree

6 files changed

+54
-30
lines changed

6 files changed

+54
-30
lines changed

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,8 @@ async def handler(msg: str):
6161
print(msg)
6262

6363
# taskiq-faststream scheduling
64-
from taskiq import TaskiqScheduler
6564
from taskiq.schedule_sources import LabelScheduleSource
66-
from taskiq_faststream import BrokerWrapper
65+
from taskiq_faststream import BrokerWrapper, StreamScheduler
6766

6867
# wrap FastStream object
6968
taskiq_broker = BrokerWrapper(broker)
@@ -80,7 +79,7 @@ taskiq_broker.task(
8079
)
8180

8281
# create scheduler object
83-
scheduler = TaskiqScheduler(
82+
scheduler = StreamScheduler(
8483
broker=taskiq_broker,
8584
sources=[LabelScheduleSource(taskiq_broker)],
8685
)

taskiq_faststream/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from taskiq_faststream.broker import AppWrapper, BrokerWrapper
2+
from taskiq_faststream.scheduler import StreamScheduler
23

34
__all__ = (
45
"BrokerWrapper",
6+
"StreamScheduler",
57
"AppWrapper",
68
)

taskiq_faststream/broker.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
from faststream.types import SendableMessage
99
from taskiq import AsyncBroker, BrokerMessage
1010
from taskiq.acks import AckableMessage
11+
from taskiq.decor import AsyncTaskiqDecoratedTask
1112

12-
from taskiq_faststream.task import PatchedTaskiqDecoratedTask
13+
from taskiq_faststream.serializer import PatchedSerializer
1314
from taskiq_faststream.types import ScheduledTask
1415
from taskiq_faststream.utils import resolve_msg
1516

@@ -32,8 +33,8 @@ class BrokerWrapper(AsyncBroker):
3233

3334
def __init__(self, broker: BrokerAsyncUsecase[typing.Any, typing.Any]) -> None:
3435
super().__init__()
36+
self.serializer = PatchedSerializer()
3537
self.broker = broker
36-
self.decorator_class = PatchedTaskiqDecoratedTask
3738

3839
async def startup(self) -> None:
3940
"""Startup wrapped FastStream broker."""
@@ -77,7 +78,7 @@ def task( # type: ignore[override]
7778
*,
7879
schedule: typing.List[ScheduledTask],
7980
**kwargs: PublishParameters,
80-
) -> "PatchedTaskiqDecoratedTask[[], None]":
81+
) -> "AsyncTaskiqDecoratedTask[[], None]":
8182
"""Register FastStream scheduled task.
8283
8384
Args:
@@ -108,8 +109,8 @@ class AppWrapper(BrokerWrapper):
108109

109110
def __init__(self, app: FastStream) -> None:
110111
super(BrokerWrapper, self).__init__()
112+
self.serializer = PatchedSerializer()
111113
self.app = app
112-
self.decorator_class = PatchedTaskiqDecoratedTask
113114

114115
async def startup(self) -> None:
115116
"""Startup wrapped FastStream."""

taskiq_faststream/scheduler.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from typing import TYPE_CHECKING
2+
3+
from taskiq.scheduler.scheduled_task import ScheduledTask
4+
from taskiq.scheduler.scheduler import TaskiqScheduler as Scheduler
5+
from taskiq.utils import maybe_awaitable
6+
7+
from taskiq_faststream.kicker import LabelRespectKicker
8+
9+
if TYPE_CHECKING: # pragma: no cover
10+
from taskiq.abc.schedule_source import ScheduleSource
11+
12+
13+
class StreamScheduler(Scheduler):
14+
"""Patched scheduler with custom kicker."""
15+
16+
async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None:
17+
"""
18+
This method is called when task is ready to be enqueued.
19+
20+
It's triggered on proper time depending on `task.cron` or `task.time` attribute.
21+
:param task: task to send
22+
"""
23+
await maybe_awaitable(source.pre_send(task))
24+
await LabelRespectKicker(task.task_name, self.broker, task.labels).kiq(
25+
*task.args,
26+
**task.kwargs,
27+
)
28+
await maybe_awaitable(source.post_send(task))

taskiq_faststream/serializer.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from typing import Any
2+
3+
from taskiq.serializers.json_serializer import JSONSerializer
4+
5+
6+
class PatchedSerializer(JSONSerializer):
7+
"""Patched serializer removes labels."""
8+
9+
def dumpb(self, value: Any) -> bytes:
10+
"""
11+
Dumps taskiq message to some broker message format.
12+
13+
:param message: message to send.
14+
:return: Dumped message.
15+
"""
16+
del value["labels"]
17+
return super().dumpb(value)

taskiq_faststream/task.py

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)