Skip to content

Commit 972d9fd

Browse files
committed
fix: add fake task and kicker
1 parent 70240c5 commit 972d9fd

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

taskiq_faststream/broker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
from faststream.types import SendableMessage
99
from taskiq import AsyncBroker, BrokerMessage
1010
from taskiq.acks import AckableMessage
11-
from taskiq.decor import AsyncTaskiqDecoratedTask
1211

12+
from taskiq_faststream.task import PatchedTaskiqDecoratedTask
1313
from taskiq_faststream.types import ScheduledTask
1414
from taskiq_faststream.utils import resolve_msg
1515

@@ -33,6 +33,7 @@ class BrokerWrapper(AsyncBroker):
3333
def __init__(self, broker: BrokerAsyncUsecase[typing.Any, typing.Any]) -> None:
3434
super().__init__()
3535
self.broker = broker
36+
self.decorator_class = PatchedTaskiqDecoratedTask
3637

3738
async def startup(self) -> None:
3839
"""Startup wrapped FastStream broker."""
@@ -76,7 +77,7 @@ def task( # type: ignore[override]
7677
*,
7778
schedule: typing.List[ScheduledTask],
7879
**kwargs: PublishParameters,
79-
) -> "AsyncTaskiqDecoratedTask[[], None]":
80+
) -> "PatchedTaskiqDecoratedTask[[], None]":
8081
"""Register FastStream scheduled task.
8182
8283
Args:
@@ -108,6 +109,7 @@ class AppWrapper(BrokerWrapper):
108109
def __init__(self, app: FastStream) -> None:
109110
super(BrokerWrapper, self).__init__()
110111
self.app = app
112+
self.decorator_class = PatchedTaskiqDecoratedTask
111113

112114
async def startup(self) -> None:
113115
"""Startup wrapped FastStream."""

taskiq_faststream/kicker.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import Any
2+
3+
from taskiq.kicker import AsyncKicker, _FuncParams, _ReturnType
4+
from taskiq.message import TaskiqMessage
5+
6+
7+
class LabelRespectKicker(AsyncKicker[_FuncParams, _ReturnType]):
8+
"""Patched kicker doesn't cast labels to str."""
9+
10+
def _prepare_message(self, *args: Any, **kwargs: Any) -> TaskiqMessage:
11+
msg = super()._prepare_message(*args, **kwargs)
12+
msg.labels = self.labels
13+
return msg

taskiq_faststream/task.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from taskiq.decor import AsyncTaskiqDecoratedTask as Task
2+
from taskiq.decor import _FuncParams, _ReturnType
3+
4+
from taskiq_faststream.kicker import LabelRespectKicker
5+
6+
7+
class PatchedTaskiqDecoratedTask(Task[_FuncParams, _ReturnType]):
8+
"""Patched Decorated Task has a patched kicker."""
9+
10+
def kicker(self) -> LabelRespectKicker[_FuncParams, _ReturnType]:
11+
"""
12+
This function returns kicker object.
13+
14+
Kicker is a object that can modify kiq request
15+
before sending it.
16+
17+
:return: AsyncKicker instance.
18+
"""
19+
return LabelRespectKicker(
20+
task_name=self.task_name,
21+
broker=self.broker,
22+
labels=self.labels,
23+
)

0 commit comments

Comments
 (0)