Skip to content

Commit 23c4ae1

Browse files
committed
Updated broker for better support of direct exchanges.
1 parent f81f129 commit 23c4ae1

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,3 @@ broker = AioPikaBroker(
157157
```
158158

159159
This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults.
160-

tests/test_broker.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uuid
33

44
import pytest
5-
from aio_pika import Channel, Message
5+
from aio_pika import Channel, ExchangeType, Message
66
from aio_pika.exceptions import QueueEmpty
77
from taskiq import AckableMessage, BrokerMessage
88
from taskiq.utils import maybe_awaitable
@@ -17,8 +17,9 @@ async def get_first_task(broker: AioPikaBroker) -> AckableMessage: # type: igno
1717
:param broker: async message broker.
1818
:return: first message from listen method
1919
"""
20-
async for message in broker.listen(): # noqa: RET503
20+
async for message in broker.listen():
2121
return message
22+
return None # type: ignore
2223

2324

2425
@pytest.mark.anyio
@@ -219,3 +220,50 @@ async def test_delayed_message_with_plugin(
219220
await asyncio.sleep(2)
220221

221222
assert await main_queue.get()
223+
224+
225+
@pytest.mark.anyio
226+
async def test_direct_kick(
227+
broker: AioPikaBroker,
228+
test_channel: Channel,
229+
queue_name: str,
230+
exchange_name: str,
231+
) -> None:
232+
"""
233+
Test that messages are published and read correctly.
234+
235+
We kick the message and then try to listen to the queue,
236+
and check that message we got is the same as we sent.
237+
"""
238+
queue = await test_channel.get_queue(queue_name)
239+
exchange = await test_channel.get_exchange(exchange_name)
240+
await queue.delete()
241+
await exchange.delete()
242+
243+
broker._declare_exchange = True
244+
broker._exchange_type = ExchangeType.DIRECT
245+
broker._routing_key = "direct_routing_key"
246+
247+
await broker.startup()
248+
249+
await test_channel.get_queue(queue_name, ensure=True)
250+
await test_channel.get_exchange(exchange_name, ensure=True)
251+
252+
task_id = uuid.uuid4().hex
253+
task_name = uuid.uuid4().hex
254+
255+
sent = BrokerMessage(
256+
task_id=task_id,
257+
task_name=task_name,
258+
message=b"my_msg",
259+
labels={
260+
"label1": "val1",
261+
},
262+
)
263+
264+
await broker.kick(sent)
265+
266+
message = await asyncio.wait_for(get_first_task(broker), timeout=0.4)
267+
268+
assert message.data == sent.message
269+
await maybe_awaitable(message.ack())

0 commit comments

Comments
 (0)