File tree Expand file tree Collapse file tree 2 files changed +10
-15
lines changed Expand file tree Collapse file tree 2 files changed +10
-15
lines changed Original file line number Diff line number Diff line change @@ -60,7 +60,7 @@ async def kick(self, message: BrokerMessage) -> None:
60
60
61
61
:param message: message to append.
62
62
"""
63
- await self .redis .lpush (self .queue_name , message .message )
63
+ await self .redis .lpush (self .queue_name , message .message ) # type: ignore[attr-defined]
64
64
65
65
async def listen (self ) -> AsyncGenerator [bytes , None ]:
66
66
"""
@@ -73,6 +73,5 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
73
73
"""
74
74
redis_brpop_data_position = 1
75
75
while True :
76
- yield (await self .redis .brpop ([self .queue_name ]))[
77
- redis_brpop_data_position
78
- ]
76
+ value = await self .redis .brpop ([self .queue_name ]) # type: ignore[attr-defined]
77
+ yield value [redis_brpop_data_position ]
Original file line number Diff line number Diff line change 5
5
import pytest
6
6
from taskiq import AckableMessage , AsyncBroker , BrokerMessage
7
7
8
- from taskiq_redis import ListQueueBroker , PubSubBroker , ListQueueClusterBroker
8
+ from taskiq_redis import ListQueueBroker , ListQueueClusterBroker , PubSubBroker
9
9
10
10
11
11
def test_no_url_should_raise_typeerror () -> None :
@@ -109,21 +109,17 @@ async def test_list_queue_cluster_broker(
109
109
We create two workers that listen and send a message to them.
110
110
Expect only one worker to receive the same message we sent.
111
111
"""
112
-
113
- print (f"redis_cluster_url: { redis_cluster_url } " )
114
112
broker = ListQueueClusterBroker (
115
- url = redis_cluster_url , queue_name = uuid .uuid4 ().hex
113
+ url = redis_cluster_url ,
114
+ queue_name = uuid .uuid4 ().hex ,
116
115
)
117
- worker1_task = asyncio .create_task (get_message (broker ))
118
- worker2_task = asyncio .create_task (get_message (broker ))
116
+ worker_task = asyncio .create_task (get_message (broker ))
119
117
await asyncio .sleep (0.3 )
120
118
121
119
await broker .kick (valid_broker_message )
122
120
await asyncio .sleep (0.3 )
123
121
124
- assert worker1_task .done () != worker2_task .done ()
125
- message = worker1_task .result () if worker1_task .done () else worker2_task .result ()
126
- assert message == valid_broker_message .message
127
- worker1_task .cancel ()
128
- worker2_task .cancel ()
122
+ assert worker_task .done ()
123
+ assert worker_task .result () == valid_broker_message .message
124
+ worker_task .cancel ()
129
125
await broker .shutdown ()
You can’t perform that action at this time.
0 commit comments