Skip to content

Commit 4c58368

Browse files
committed
Added utilities method for inmemory broker.
1 parent e5c6d2b commit 4c58368

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

taskiq/brokers/inmemory_broker.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ def __init__(
127127
cast_types: bool = True,
128128
max_async_tasks: int = 30,
129129
propagate_exceptions: bool = True,
130+
await_inplace: bool = False,
130131
) -> None:
131132
super().__init__()
132133
self.result_backend = InmemoryResultBackend(
@@ -140,6 +141,7 @@ def __init__(
140141
max_async_tasks=max_async_tasks,
141142
propagate_exceptions=propagate_exceptions,
142143
)
144+
self.await_inplace = await_inplace
143145
self._running_tasks: "Set[asyncio.Task[Any]]" = set()
144146

145147
async def kick(self, message: BrokerMessage) -> None:
@@ -156,7 +158,12 @@ async def kick(self, message: BrokerMessage) -> None:
156158
if target_task is None:
157159
raise TaskiqError("Unknown task.")
158160

159-
task = asyncio.create_task(self.receiver.callback(message=message.message))
161+
receiver_cb = self.receiver.callback(message=message.message)
162+
if self.await_inplace:
163+
await receiver_cb
164+
return
165+
166+
task = asyncio.create_task(receiver_cb)
160167
self._running_tasks.add(task)
161168
task.add_done_callback(self._running_tasks.discard)
162169

@@ -171,6 +178,17 @@ def listen(self) -> AsyncGenerator[bytes, None]:
171178
"""
172179
raise RuntimeError("Inmemory brokers cannot listen.")
173180

181+
async def wait_all(self) -> None:
182+
"""
183+
Wait for all currently running tasks to complete.
184+
185+
Useful when used in testing and you need to await all sent tasks
186+
before asserting results.
187+
"""
188+
to_await = list(self._running_tasks)
189+
for task in to_await:
190+
await task
191+
174192
async def startup(self) -> None:
175193
"""Runs startup events for client and worker side."""
176194
for event in (TaskiqEvents.CLIENT_STARTUP, TaskiqEvents.WORKER_STARTUP):

tests/brokers/test_inmemory.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,39 @@ async def test_task() -> str:
8585

8686
result = await task.wait_result()
8787
assert result.return_value == test_value
88+
89+
90+
@pytest.mark.anyio
91+
async def test_inline_awaits() -> None:
92+
broker = InMemoryBroker(await_inplace=True)
93+
slept = False
94+
95+
@broker.task
96+
async def test_task() -> None:
97+
nonlocal slept
98+
await asyncio.sleep(0.2)
99+
slept = True
100+
101+
task = await test_task.kiq()
102+
assert slept
103+
assert await task.is_ready()
104+
assert not broker._running_tasks
105+
106+
107+
@pytest.mark.anyio
108+
async def test_wait_all() -> None:
109+
broker = InMemoryBroker()
110+
slept = False
111+
112+
@broker.task
113+
async def test_task() -> None:
114+
nonlocal slept
115+
await asyncio.sleep(0.2)
116+
slept = True
117+
118+
task = await test_task.kiq()
119+
assert not slept
120+
await broker.wait_all()
121+
assert slept
122+
assert await task.is_ready()
123+
assert not broker._running_tasks

0 commit comments

Comments
 (0)