Skip to content

Commit a1c3ac2

Browse files
committed
feat: add AppWrapper
1 parent 85836e5 commit a1c3ac2

File tree

7 files changed

+170
-166
lines changed

7 files changed

+170
-166
lines changed

taskiq_faststream/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1-
from taskiq_faststream.broker import BrokerWrapper
1+
from taskiq_faststream.broker import AppWrapper, BrokerWrapper
22

3-
__all__ = ("BrokerWrapper",)
3+
__all__ = (
4+
"BrokerWrapper",
5+
"AppWrapper",
6+
)

taskiq_faststream/broker.py

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import warnings
33

44
from faststream._compat import TypeAlias, override
5+
from faststream.app import FastStream
56
from faststream.broker.core.asyncronous import BrokerAsyncUsecase
67
from faststream.types import SendableMessage
78
from taskiq import AsyncBroker, BrokerMessage
@@ -44,10 +45,7 @@ async def shutdown(self) -> None:
4445

4546
async def kick(self, message: BrokerMessage) -> None:
4647
"""Call wrapped FastStream broker `publish` method."""
47-
labels = message.labels
48-
labels.pop("schedule", None)
49-
msg = await resolve_msg(labels.pop("message", message.message))
50-
await self.broker.publish(msg, **labels)
48+
await _broker_publish(self.broker, message)
5149

5250
async def listen(
5351
self,
@@ -90,3 +88,49 @@ def task( # type: ignore[override]
9088
schedule=schedule,
9189
**kwargs,
9290
)(lambda: None)
91+
92+
93+
class AppWrapper(BrokerWrapper):
94+
"""Wrap FastStream instance to taskiq compatible object.
95+
96+
Attributes:
97+
app : FastStream instance.
98+
99+
Methods:
100+
__init__ : Initializes the object.
101+
startup : Startup wrapper FastStream.
102+
shutdown : Shutdown wrapper FastStream.
103+
kick : Call wrapped FastStream broker `publish` method.
104+
task : Register FastStream scheduled task.
105+
"""
106+
107+
def __init__(self, app: FastStream) -> None:
108+
super(BrokerWrapper, self).__init__()
109+
self.app = app
110+
111+
async def startup(self) -> None:
112+
"""Startup wrapper FastStream broker."""
113+
await super(BrokerWrapper, self).startup()
114+
await self.app._startup() # noqa: SLF001
115+
116+
async def shutdown(self) -> None:
117+
"""Shutdown wrapper FastStream broker."""
118+
await self.app._shutdown() # noqa: SLF001
119+
await super(BrokerWrapper, self).shutdown()
120+
121+
async def kick(self, message: BrokerMessage) -> None:
122+
"""Call wrapped FastStream broker `publish` method."""
123+
assert ( # noqa: S101
124+
self.app.broker
125+
), "You should setup application broker firts"
126+
await _broker_publish(self.app.broker, message)
127+
128+
129+
async def _broker_publish(
130+
broker: BrokerAsyncUsecase[typing.Any, typing.Any],
131+
message: BrokerMessage,
132+
) -> None:
133+
labels = message.labels
134+
labels.pop("schedule", None)
135+
msg = await resolve_msg(labels.pop("message", message.message))
136+
await broker.publish(msg, **labels)

tests/test_broker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ async def test_warning() -> None:
88
broker = BrokerWrapper(None)
99

1010
with pytest.warns(RuntimeWarning):
11-
async for _ in broker.listen():
11+
async for _ in broker.listen(): # pragma: no branch
1212
break

tests/test_kafka.py

Lines changed: 15 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,25 @@
1-
import asyncio
2-
from datetime import datetime
3-
from unittest.mock import MagicMock
4-
51
import pytest
6-
from anyio import Event
2+
from faststream import FastStream
73
from faststream.kafka import KafkaBroker, TestKafkaBroker
8-
from faststream.utils.functions import timeout_scope
9-
from taskiq import TaskiqScheduler
10-
from taskiq.cli.scheduler.args import SchedulerArgs
11-
from taskiq.cli.scheduler.run import run_scheduler
12-
from taskiq.schedule_sources import LabelScheduleSource
4+
from taskiq import AsyncBroker
5+
6+
from taskiq_faststream import AppWrapper
137

14-
from taskiq_faststream import BrokerWrapper
8+
from .testcase import SchedulerTestcase
159

1610

1711
@pytest.fixture
1812
def broker() -> KafkaBroker:
1913
return KafkaBroker()
2014

2115

22-
@pytest.mark.anyio
23-
async def test_time_task(
24-
subject: str,
25-
broker: KafkaBroker,
26-
mock: MagicMock,
27-
event: Event,
28-
) -> None:
29-
@broker.subscriber(subject)
30-
async def handler(msg: str) -> None:
31-
event.set()
32-
mock(msg)
33-
34-
taskiq_broker = BrokerWrapper(broker)
35-
36-
taskiq_broker.task(
37-
"Hi!",
38-
topic=subject,
39-
schedule=[
40-
{
41-
"time": datetime.utcnow(),
42-
},
43-
],
44-
)
45-
46-
async with TestKafkaBroker(broker):
47-
task = asyncio.create_task(
48-
run_scheduler(
49-
SchedulerArgs(
50-
scheduler=TaskiqScheduler(
51-
broker=taskiq_broker,
52-
sources=[LabelScheduleSource(taskiq_broker)],
53-
),
54-
modules=[],
55-
),
56-
),
57-
)
58-
59-
with timeout_scope(3.0, True):
60-
await event.wait()
61-
62-
mock.assert_called_once_with("Hi!")
63-
task.cancel()
16+
class TestBroker(SchedulerTestcase):
17+
test_class = TestKafkaBroker
18+
subj_name = "topic"
19+
20+
21+
class TestApp(TestBroker):
22+
@staticmethod
23+
def build_taskiq_broker(broker: KafkaBroker) -> AsyncBroker:
24+
"""Build AppWrapper."""
25+
return AppWrapper(FastStream(broker))

tests/test_nats.py

Lines changed: 15 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,25 @@
1-
import asyncio
2-
from datetime import datetime
3-
from unittest.mock import MagicMock
4-
51
import pytest
6-
from anyio import Event
2+
from faststream import FastStream
73
from faststream.nats import NatsBroker, TestNatsBroker
8-
from faststream.utils.functions import timeout_scope
9-
from taskiq import TaskiqScheduler
10-
from taskiq.cli.scheduler.args import SchedulerArgs
11-
from taskiq.cli.scheduler.run import run_scheduler
12-
from taskiq.schedule_sources import LabelScheduleSource
4+
from taskiq import AsyncBroker
5+
6+
from taskiq_faststream import AppWrapper
137

14-
from taskiq_faststream import BrokerWrapper
8+
from .testcase import SchedulerTestcase
159

1610

1711
@pytest.fixture
1812
def broker() -> NatsBroker:
1913
return NatsBroker()
2014

2115

22-
@pytest.mark.anyio
23-
async def test_time_task(
24-
subject: str,
25-
broker: NatsBroker,
26-
mock: MagicMock,
27-
event: Event,
28-
) -> None:
29-
@broker.subscriber(subject)
30-
async def handler(msg: str) -> None:
31-
event.set()
32-
mock(msg)
33-
34-
taskiq_broker = BrokerWrapper(broker)
35-
36-
taskiq_broker.task(
37-
"Hi!",
38-
subject=subject,
39-
schedule=[
40-
{
41-
"time": datetime.utcnow(),
42-
},
43-
],
44-
)
45-
46-
async with TestNatsBroker(broker):
47-
task = asyncio.create_task(
48-
run_scheduler(
49-
SchedulerArgs(
50-
scheduler=TaskiqScheduler(
51-
broker=taskiq_broker,
52-
sources=[LabelScheduleSource(taskiq_broker)],
53-
),
54-
modules=[],
55-
),
56-
),
57-
)
58-
59-
with timeout_scope(3.0, True):
60-
await event.wait()
61-
62-
mock.assert_called_once_with("Hi!")
63-
task.cancel()
16+
class TestBroker(SchedulerTestcase):
17+
test_class = TestNatsBroker
18+
subj_name = "subject"
19+
20+
21+
class TestApp(TestBroker):
22+
@staticmethod
23+
def build_taskiq_broker(broker: NatsBroker) -> AsyncBroker:
24+
"""Build AppWrapper."""
25+
return AppWrapper(FastStream(broker))

tests/test_rabbit.py

Lines changed: 15 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,25 @@
1-
import asyncio
2-
from datetime import datetime
3-
from unittest.mock import MagicMock
4-
51
import pytest
6-
from anyio import Event
2+
from faststream import FastStream
73
from faststream.rabbit import RabbitBroker, TestRabbitBroker
8-
from faststream.utils.functions import timeout_scope
9-
from taskiq import TaskiqScheduler
10-
from taskiq.cli.scheduler.args import SchedulerArgs
11-
from taskiq.cli.scheduler.run import run_scheduler
12-
from taskiq.schedule_sources import LabelScheduleSource
4+
from taskiq import AsyncBroker
5+
6+
from taskiq_faststream import AppWrapper
137

14-
from taskiq_faststream import BrokerWrapper
8+
from .testcase import SchedulerTestcase
159

1610

1711
@pytest.fixture
1812
def broker() -> RabbitBroker:
1913
return RabbitBroker()
2014

2115

22-
@pytest.mark.anyio
23-
async def test_time_task(
24-
subject: str,
25-
broker: RabbitBroker,
26-
mock: MagicMock,
27-
event: Event,
28-
) -> None:
29-
@broker.subscriber(subject)
30-
async def handler(msg: str) -> None:
31-
event.set()
32-
mock(msg)
33-
34-
taskiq_broker = BrokerWrapper(broker)
35-
36-
taskiq_broker.task(
37-
"Hi!",
38-
queue=subject,
39-
schedule=[
40-
{
41-
"time": datetime.utcnow(),
42-
},
43-
],
44-
)
45-
46-
async with TestRabbitBroker(broker):
47-
task = asyncio.create_task(
48-
run_scheduler(
49-
SchedulerArgs(
50-
scheduler=TaskiqScheduler(
51-
broker=taskiq_broker,
52-
sources=[LabelScheduleSource(taskiq_broker)],
53-
),
54-
modules=[],
55-
),
56-
),
57-
)
58-
59-
with timeout_scope(3.0, True):
60-
await event.wait()
61-
62-
mock.assert_called_once_with("Hi!")
63-
task.cancel()
16+
class TestBroker(SchedulerTestcase):
17+
test_class = TestRabbitBroker
18+
subj_name = "queue"
19+
20+
21+
class TestApp(TestBroker):
22+
@staticmethod
23+
def build_taskiq_broker(broker: RabbitBroker) -> AsyncBroker:
24+
"""Build AppWrapper."""
25+
return AppWrapper(FastStream(broker))

tests/testcase.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import asyncio
2+
from datetime import datetime
3+
from typing import Any
4+
from unittest.mock import MagicMock
5+
6+
import pytest
7+
from anyio import Event
8+
from faststream.broker.core.asyncronous import BrokerAsyncUsecase
9+
from faststream.utils.functions import timeout_scope
10+
from taskiq import AsyncBroker, TaskiqScheduler
11+
from taskiq.cli.scheduler.args import SchedulerArgs
12+
from taskiq.cli.scheduler.run import run_scheduler
13+
from taskiq.schedule_sources import LabelScheduleSource
14+
15+
from taskiq_faststream import BrokerWrapper
16+
17+
18+
@pytest.mark.anyio
19+
class SchedulerTestcase:
20+
test_class: Any
21+
subj_name: str
22+
23+
@staticmethod
24+
def build_taskiq_broker(broker: BrokerAsyncUsecase[Any, Any]) -> AsyncBroker:
25+
"""Build Taskiq compatible object."""
26+
return BrokerWrapper(broker)
27+
28+
async def test_task(
29+
self,
30+
subject: str,
31+
broker: BrokerAsyncUsecase[Any, Any],
32+
mock: MagicMock,
33+
event: Event,
34+
) -> None:
35+
"""Base testcase."""
36+
37+
@broker.subscriber(subject)
38+
async def handler(msg: str) -> None:
39+
event.set()
40+
mock(msg)
41+
42+
taskiq_broker = self.build_taskiq_broker(broker)
43+
44+
taskiq_broker.task(
45+
"Hi!",
46+
**{self.subj_name: subject},
47+
schedule=[
48+
{
49+
"time": datetime.utcnow(),
50+
},
51+
],
52+
)
53+
54+
async with self.test_class(broker):
55+
task = asyncio.create_task(
56+
run_scheduler(
57+
SchedulerArgs(
58+
scheduler=TaskiqScheduler(
59+
broker=taskiq_broker,
60+
sources=[LabelScheduleSource(taskiq_broker)],
61+
),
62+
modules=[],
63+
),
64+
),
65+
)
66+
67+
with timeout_scope(3.0, True):
68+
await event.wait()
69+
70+
mock.assert_called_once_with("Hi!")
71+
task.cancel()

0 commit comments

Comments
 (0)