Skip to content

Commit 85836e5

Browse files
committed
feat: add BrokerWrapper
1 parent 99274f6 commit 85836e5

File tree

12 files changed

+405
-8
lines changed

12 files changed

+405
-8
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ exclude = [".venv/"]
176176
]
177177

178178
[tool.ruff.pydocstyle]
179-
convention = "pep257"
179+
convention = "google"
180180
ignore-decorators = ["typing.overload"]
181181

182182
[tool.ruff.pylint]

taskiq_faststream/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
"""Project was generated using https://github.com/taskiq-python/project-template/."""
1+
from taskiq_faststream.broker import BrokerWrapper
2+
3+
__all__ = ("BrokerWrapper",)

taskiq_faststream/broker.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import typing
2+
import warnings
3+
4+
from faststream._compat import TypeAlias, override
5+
from faststream.broker.core.asyncronous import BrokerAsyncUsecase
6+
from faststream.types import SendableMessage
7+
from taskiq import AsyncBroker, BrokerMessage
8+
from taskiq.acks import AckableMessage
9+
from taskiq.decor import AsyncTaskiqDecoratedTask
10+
11+
from taskiq_faststream.types import ScheduledTask
12+
from taskiq_faststream.utils import resolve_msg
13+
14+
PublishParameters: TypeAlias = typing.Any
15+
16+
17+
class BrokerWrapper(AsyncBroker):
18+
"""Wrap FastStream broker to taskiq compatible object.
19+
20+
Attributes:
21+
broker : FastStream wrapped broker.
22+
23+
Methods:
24+
__init__ : Initializes the object.
25+
startup : Startup wrapper FastStream broker.
26+
shutdown : Shutdown wrapper FastStream broker.
27+
kick : Call wrapped FastStream broker `publish` method.
28+
task : Register FastStream scheduled task.
29+
"""
30+
31+
def __init__(self, broker: BrokerAsyncUsecase[typing.Any, typing.Any]) -> None:
32+
super().__init__()
33+
self.broker = broker
34+
35+
async def startup(self) -> None:
36+
"""Startup wrapper FastStream broker."""
37+
await super().startup()
38+
await self.broker.start()
39+
40+
async def shutdown(self) -> None:
41+
"""Shutdown wrapper FastStream broker."""
42+
await self.broker.close()
43+
await super().shutdown()
44+
45+
async def kick(self, message: BrokerMessage) -> None:
46+
"""Call wrapped FastStream broker `publish` method."""
47+
labels = message.labels
48+
labels.pop("schedule", None)
49+
msg = await resolve_msg(labels.pop("message", message.message))
50+
await self.broker.publish(msg, **labels)
51+
52+
async def listen(
53+
self,
54+
) -> typing.AsyncGenerator[typing.Union[bytes, AckableMessage], None]:
55+
"""Not supported method."""
56+
warnings.warn(
57+
message=(
58+
f"{self.__class__.__name__} doesn't support `listen` method. "
59+
"Please, use it only to register a task."
60+
),
61+
category=RuntimeWarning,
62+
stacklevel=1,
63+
)
64+
65+
while True:
66+
yield b""
67+
68+
@override
69+
def task( # type: ignore[override]
70+
self,
71+
message: typing.Union[
72+
None,
73+
SendableMessage,
74+
typing.Callable[[], SendableMessage],
75+
typing.Callable[[], typing.Awaitable[SendableMessage]],
76+
] = None,
77+
*,
78+
schedule: typing.List[ScheduledTask],
79+
**kwargs: PublishParameters,
80+
) -> AsyncTaskiqDecoratedTask[[], None]:
81+
"""Register FastStream scheduled task.
82+
83+
Args:
84+
message: object to send or sync/async message generation callback.
85+
schedule: scheduler parameters list.
86+
kwargs: `broker.publish(...)` arguments.
87+
"""
88+
return super().task(
89+
message=message,
90+
schedule=schedule,
91+
**kwargs,
92+
)(lambda: None)

taskiq_faststream/types.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from datetime import datetime, timedelta
2+
from typing import Optional, Union
3+
4+
from faststream._compat import TypedDict
5+
6+
7+
class ScheduledTask(TypedDict, total=False):
8+
"""Store information about scheduled tasks.
9+
10+
https://taskiq-python.github.io/available-components/schedule-sources.html
11+
"""
12+
13+
cron: str
14+
cron_offset: Union[str, timedelta, None]
15+
time: Optional[datetime]

taskiq_faststream/utils.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import typing
2+
3+
from faststream.types import SendableMessage
4+
from faststream.utils.functions import to_async
5+
6+
7+
async def resolve_msg(
8+
msg: typing.Union[
9+
None,
10+
SendableMessage,
11+
typing.Callable[[], SendableMessage],
12+
typing.Callable[[], typing.Awaitable[SendableMessage]],
13+
],
14+
) -> SendableMessage:
15+
"""Resolve message generation callback.
16+
17+
Args:
18+
msg: object to send or sync/async message generation callback.
19+
20+
Returns:
21+
The message to send
22+
"""
23+
if callable(msg):
24+
get_msg = typing.cast(
25+
typing.Callable[[], typing.Awaitable[SendableMessage]],
26+
to_async(msg),
27+
)
28+
msg = await get_msg()
29+
return msg

tests/conftest.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
from unittest.mock import MagicMock
2+
from uuid import uuid4
3+
14
import pytest
5+
from anyio._backends._asyncio import Event
26

37

48
@pytest.fixture(scope="session")
@@ -10,3 +14,18 @@ def anyio_backend() -> str:
1014
:return: backend name.
1115
"""
1216
return "asyncio"
17+
18+
19+
@pytest.fixture()
20+
def subject() -> str:
21+
return uuid4().hex
22+
23+
24+
@pytest.fixture()
25+
def mock() -> MagicMock:
26+
return MagicMock()
27+
28+
29+
@pytest.fixture()
30+
def event() -> Event:
31+
return Event()

tests/test_broker.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import pytest
2+
3+
from taskiq_faststream import BrokerWrapper
4+
5+
6+
@pytest.mark.anyio
7+
async def test_warning() -> None:
8+
broker = BrokerWrapper(None)
9+
10+
with pytest.warns(RuntimeWarning):
11+
async for _ in broker.listen():
12+
break

tests/test_kafka.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import asyncio
2+
from datetime import datetime
3+
from unittest.mock import MagicMock
4+
5+
import pytest
6+
from anyio import Event
7+
from faststream.kafka import KafkaBroker, TestKafkaBroker
8+
from faststream.utils.functions import timeout_scope
9+
from taskiq import TaskiqScheduler
10+
from taskiq.cli.scheduler.args import SchedulerArgs
11+
from taskiq.cli.scheduler.run import run_scheduler
12+
from taskiq.schedule_sources import LabelScheduleSource
13+
14+
from taskiq_faststream import BrokerWrapper
15+
16+
17+
@pytest.fixture
18+
def broker() -> KafkaBroker:
19+
return KafkaBroker()
20+
21+
22+
@pytest.mark.anyio
23+
async def test_time_task(
24+
subject: str,
25+
broker: KafkaBroker,
26+
mock: MagicMock,
27+
event: Event,
28+
) -> None:
29+
@broker.subscriber(subject)
30+
async def handler(msg: str) -> None:
31+
event.set()
32+
mock(msg)
33+
34+
taskiq_broker = BrokerWrapper(broker)
35+
36+
taskiq_broker.task(
37+
"Hi!",
38+
topic=subject,
39+
schedule=[
40+
{
41+
"time": datetime.utcnow(),
42+
},
43+
],
44+
)
45+
46+
async with TestKafkaBroker(broker):
47+
task = asyncio.create_task(
48+
run_scheduler(
49+
SchedulerArgs(
50+
scheduler=TaskiqScheduler(
51+
broker=taskiq_broker,
52+
sources=[LabelScheduleSource(taskiq_broker)],
53+
),
54+
modules=[],
55+
),
56+
),
57+
)
58+
59+
with timeout_scope(3.0, True):
60+
await event.wait()
61+
62+
mock.assert_called_once_with("Hi!")
63+
task.cancel()

tests/test_nats.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import asyncio
2+
from datetime import datetime
3+
from unittest.mock import MagicMock
4+
5+
import pytest
6+
from anyio import Event
7+
from faststream.nats import NatsBroker, TestNatsBroker
8+
from faststream.utils.functions import timeout_scope
9+
from taskiq import TaskiqScheduler
10+
from taskiq.cli.scheduler.args import SchedulerArgs
11+
from taskiq.cli.scheduler.run import run_scheduler
12+
from taskiq.schedule_sources import LabelScheduleSource
13+
14+
from taskiq_faststream import BrokerWrapper
15+
16+
17+
@pytest.fixture
18+
def broker() -> NatsBroker:
19+
return NatsBroker()
20+
21+
22+
@pytest.mark.anyio
23+
async def test_time_task(
24+
subject: str,
25+
broker: NatsBroker,
26+
mock: MagicMock,
27+
event: Event,
28+
) -> None:
29+
@broker.subscriber(subject)
30+
async def handler(msg: str) -> None:
31+
event.set()
32+
mock(msg)
33+
34+
taskiq_broker = BrokerWrapper(broker)
35+
36+
taskiq_broker.task(
37+
"Hi!",
38+
subject=subject,
39+
schedule=[
40+
{
41+
"time": datetime.utcnow(),
42+
},
43+
],
44+
)
45+
46+
async with TestNatsBroker(broker):
47+
task = asyncio.create_task(
48+
run_scheduler(
49+
SchedulerArgs(
50+
scheduler=TaskiqScheduler(
51+
broker=taskiq_broker,
52+
sources=[LabelScheduleSource(taskiq_broker)],
53+
),
54+
modules=[],
55+
),
56+
),
57+
)
58+
59+
with timeout_scope(3.0, True):
60+
await event.wait()
61+
62+
mock.assert_called_once_with("Hi!")
63+
task.cancel()

tests/test_rabbit.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import asyncio
2+
from datetime import datetime
3+
from unittest.mock import MagicMock
4+
5+
import pytest
6+
from anyio import Event
7+
from faststream.rabbit import RabbitBroker, TestRabbitBroker
8+
from faststream.utils.functions import timeout_scope
9+
from taskiq import TaskiqScheduler
10+
from taskiq.cli.scheduler.args import SchedulerArgs
11+
from taskiq.cli.scheduler.run import run_scheduler
12+
from taskiq.schedule_sources import LabelScheduleSource
13+
14+
from taskiq_faststream import BrokerWrapper
15+
16+
17+
@pytest.fixture
18+
def broker() -> RabbitBroker:
19+
return RabbitBroker()
20+
21+
22+
@pytest.mark.anyio
23+
async def test_time_task(
24+
subject: str,
25+
broker: RabbitBroker,
26+
mock: MagicMock,
27+
event: Event,
28+
) -> None:
29+
@broker.subscriber(subject)
30+
async def handler(msg: str) -> None:
31+
event.set()
32+
mock(msg)
33+
34+
taskiq_broker = BrokerWrapper(broker)
35+
36+
taskiq_broker.task(
37+
"Hi!",
38+
queue=subject,
39+
schedule=[
40+
{
41+
"time": datetime.utcnow(),
42+
},
43+
],
44+
)
45+
46+
async with TestRabbitBroker(broker):
47+
task = asyncio.create_task(
48+
run_scheduler(
49+
SchedulerArgs(
50+
scheduler=TaskiqScheduler(
51+
broker=taskiq_broker,
52+
sources=[LabelScheduleSource(taskiq_broker)],
53+
),
54+
modules=[],
55+
),
56+
),
57+
)
58+
59+
with timeout_scope(3.0, True):
60+
await event.wait()
61+
62+
mock.assert_called_once_with("Hi!")
63+
task.cancel()

0 commit comments

Comments
 (0)