Skip to content

Commit 4360712

Browse files
committed
Some broker tests
1 parent 1e70e41 commit 4360712

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

tests/test_broker.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import asyncio
2+
import uuid
3+
4+
import pytest
5+
from taskiq import AsyncBroker, BrokerMessage
6+
7+
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
8+
9+
10+
async def get_message(broker: AsyncBroker) -> BrokerMessage: # type: ignore
11+
"""
12+
Get a message from the broker.
13+
14+
:param broker: async message broker.
15+
:return: first message from listen method.
16+
"""
17+
async for message in broker.listen(): # noqa: WPS328
18+
return message
19+
20+
21+
@pytest.fixture
22+
def valid_broker_message() -> BrokerMessage:
23+
"""
24+
Generate valid broker message for tests.
25+
26+
:returns: broker message.
27+
"""
28+
return BrokerMessage(
29+
task_id=uuid.uuid4().hex,
30+
task_name=uuid.uuid4().hex,
31+
message="my_msg",
32+
labels={
33+
"label1": "val1",
34+
},
35+
)
36+
37+
38+
@pytest.mark.anyio
39+
async def test_pub_sub_broker(
40+
valid_broker_message: BrokerMessage,
41+
redis_url: str,
42+
) -> None:
43+
"""
44+
Test that messages are published and read correctly by PubSubBroker.
45+
46+
We create two workers that listen and send a message to them.
47+
Expect both workers to receive the same message we sent.
48+
"""
49+
broker = PubSubBroker(url=redis_url, queue_name=uuid.uuid4().hex)
50+
worker1_task = asyncio.create_task(get_message(broker))
51+
worker2_task = asyncio.create_task(get_message(broker))
52+
await asyncio.sleep(0.3)
53+
54+
await broker.kick(valid_broker_message)
55+
await asyncio.sleep(0.3)
56+
57+
message1 = worker1_task.result()
58+
message2 = worker2_task.result()
59+
assert message1 == valid_broker_message
60+
assert message1 == message2
61+
62+
63+
@pytest.mark.anyio
64+
async def test_list_queue_broker(
65+
valid_broker_message: BrokerMessage,
66+
redis_url: str,
67+
) -> None:
68+
"""
69+
Test that messages are published and read correctly by ListQueueBroker.
70+
71+
We create two workers that listen and send a message to them.
72+
Expect only one worker to receive the same message we sent.
73+
"""
74+
broker = ListQueueBroker(url=redis_url, queue_name=uuid.uuid4().hex)
75+
worker1_task = asyncio.create_task(get_message(broker))
76+
worker2_task = asyncio.create_task(get_message(broker))
77+
await asyncio.sleep(0.3)
78+
79+
await broker.kick(valid_broker_message)
80+
await asyncio.sleep(0.3)
81+
82+
assert worker1_task.done() != worker2_task.done()
83+
message = worker1_task.result() if worker1_task.done() else worker2_task.result()
84+
assert message == valid_broker_message

0 commit comments

Comments
 (0)