Skip to content

Commit 319e252

Browse files
authored
Merge pull request #12 from taskiq-python/develop
feat: add custom scheduler to support get_message callbacks
2 parents f2ea7d6 + a8c867a commit 319e252

File tree

8 files changed

+68
-6
lines changed

8 files changed

+68
-6
lines changed

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,8 @@ async def handler(msg: str):
6161
print(msg)
6262

6363
# taskiq-faststream scheduling
64-
from taskiq import TaskiqScheduler
6564
from taskiq.schedule_sources import LabelScheduleSource
66-
from taskiq_faststream import BrokerWrapper
65+
from taskiq_faststream import BrokerWrapper, StreamScheduler
6766

6867
# wrap FastStream object
6968
taskiq_broker = BrokerWrapper(broker)
@@ -80,7 +79,7 @@ taskiq_broker.task(
8079
)
8180

8281
# create scheduler object
83-
scheduler = TaskiqScheduler(
82+
scheduler = StreamScheduler(
8483
broker=taskiq_broker,
8584
sources=[LabelScheduleSource(taskiq_broker)],
8685
)

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ test = [
7171
dev = [
7272
"taskiq-faststream[test]",
7373

74-
"mypy==1.7.0",
74+
"mypy==1.7.1",
7575
"black==23.11.0",
7676
"isort==5.12.0",
77-
"ruff==0.1.5",
77+
"ruff==0.1.6",
7878
"pyupgrade-directories==0.3.0",
7979
"pre-commit==3.5.0",
8080
]

taskiq_faststream/__about__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
"""FastStream - taskiq integration to schedule FastStream tasks."""
2-
__version__ = "0.1.4"
2+
__version__ = "0.1.5"

taskiq_faststream/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from taskiq_faststream.broker import AppWrapper, BrokerWrapper
2+
from taskiq_faststream.scheduler import StreamScheduler
23

34
__all__ = (
45
"BrokerWrapper",
6+
"StreamScheduler",
57
"AppWrapper",
68
)

taskiq_faststream/broker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from taskiq.acks import AckableMessage
1111
from taskiq.decor import AsyncTaskiqDecoratedTask
1212

13+
from taskiq_faststream.serializer import PatchedSerializer
1314
from taskiq_faststream.types import ScheduledTask
1415
from taskiq_faststream.utils import resolve_msg
1516

@@ -32,6 +33,7 @@ class BrokerWrapper(AsyncBroker):
3233

3334
def __init__(self, broker: BrokerAsyncUsecase[typing.Any, typing.Any]) -> None:
3435
super().__init__()
36+
self.serializer = PatchedSerializer()
3537
self.broker = broker
3638

3739
async def startup(self) -> None:
@@ -107,6 +109,7 @@ class AppWrapper(BrokerWrapper):
107109

108110
def __init__(self, app: FastStream) -> None:
109111
super(BrokerWrapper, self).__init__()
112+
self.serializer = PatchedSerializer()
110113
self.app = app
111114

112115
async def startup(self) -> None:

taskiq_faststream/kicker.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import Any
2+
3+
from taskiq.kicker import AsyncKicker, _FuncParams, _ReturnType
4+
from taskiq.message import TaskiqMessage
5+
6+
7+
class LabelRespectKicker(AsyncKicker[_FuncParams, _ReturnType]):
8+
"""Patched kicker doesn't cast labels to str."""
9+
10+
def _prepare_message(self, *args: Any, **kwargs: Any) -> TaskiqMessage:
11+
msg = super()._prepare_message(*args, **kwargs)
12+
msg.labels = self.labels
13+
return msg

taskiq_faststream/scheduler.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from typing import TYPE_CHECKING
2+
3+
from taskiq.scheduler.scheduled_task import ScheduledTask
4+
from taskiq.scheduler.scheduler import TaskiqScheduler as Scheduler
5+
from taskiq.utils import maybe_awaitable
6+
7+
from taskiq_faststream.kicker import LabelRespectKicker
8+
9+
if TYPE_CHECKING: # pragma: no cover
10+
from taskiq.abc.schedule_source import ScheduleSource
11+
12+
13+
class StreamScheduler(Scheduler):
14+
"""Patched scheduler with custom kicker."""
15+
16+
async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None:
17+
"""
18+
This method is called when task is ready to be enqueued.
19+
20+
It's triggered on proper time depending on `task.cron` or `task.time` attribute.
21+
:param task: task to send
22+
"""
23+
await maybe_awaitable(source.pre_send(task))
24+
await LabelRespectKicker(task.task_name, self.broker, task.labels).kiq(
25+
*task.args,
26+
**task.kwargs,
27+
)
28+
await maybe_awaitable(source.post_send(task))

taskiq_faststream/serializer.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from typing import Any
2+
3+
from taskiq.serializers.json_serializer import JSONSerializer
4+
5+
6+
class PatchedSerializer(JSONSerializer):
7+
"""Patched serializer removes labels."""
8+
9+
def dumpb(self, value: Any) -> bytes:
10+
"""
11+
Dumps taskiq message to some broker message format.
12+
13+
:param message: message to send.
14+
:return: Dumped message.
15+
"""
16+
del value["labels"]
17+
return super().dumpb(value)

0 commit comments

Comments
 (0)