Skip to content

Commit 75ee482

Browse files
authored
Merge branch 'main' into fix-linter-pyteststyle
2 parents 1fa79fe + 7da1fb2 commit 75ee482

File tree

2 files changed

+22
-6
lines changed

2 files changed

+22
-6
lines changed

tests/datastore_redis/test_asyncio.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,24 +129,32 @@ async def _test_pipeline(client):
129129
@background_task()
130130
def test_async_pubsub(client, loop):
131131
messages_received = []
132+
message_received = asyncio.Event()
132133

133134
async def reader(pubsub):
134135
while True:
135136
message = await pubsub.get_message(ignore_subscribe_messages=True)
136137
if message:
138+
message_received.set()
137139
messages_received.append(message["data"].decode())
138140
if message["data"].decode() == "NOPE":
139141
break
140142

143+
async def _publish(client, channel, message):
144+
"""Publish a message and wait for the reader to receive it."""
145+
await client.publish(channel, message)
146+
await asyncio.wait_for(message_received.wait(), timeout=10)
147+
message_received.clear()
148+
141149
async def _test_pubsub():
142150
async with client.pubsub() as pubsub:
143151
await pubsub.psubscribe("channel:*")
144152

145153
future = asyncio.create_task(reader(pubsub))
146154

147-
await client.publish("channel:1", "Hello")
148-
await client.publish("channel:2", "World")
149-
await client.publish("channel:1", "NOPE")
155+
await _publish(client, "channel:1", "Hello")
156+
await _publish(client, "channel:2", "World")
157+
await _publish(client, "channel:1", "NOPE")
150158

151159
await future
152160

tests/datastore_valkey/test_asyncio.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,24 +121,32 @@ async def _test_pipeline(client):
121121
@background_task()
122122
def test_async_pubsub(client, loop):
123123
messages_received = []
124+
message_received = asyncio.Event()
124125

125126
async def reader(pubsub):
126127
while True:
127128
message = await pubsub.get_message(ignore_subscribe_messages=True)
128129
if message:
130+
message_received.set()
129131
messages_received.append(message["data"].decode())
130132
if message["data"].decode() == "NOPE":
131133
break
132134

135+
async def _publish(client, channel, message):
136+
"""Publish a message and wait for the reader to receive it."""
137+
await client.publish(channel, message)
138+
await asyncio.wait_for(message_received.wait(), timeout=10)
139+
message_received.clear()
140+
133141
async def _test_pubsub():
134142
async with client.pubsub() as pubsub:
135143
await pubsub.psubscribe("channel:*")
136144

137145
future = asyncio.create_task(reader(pubsub))
138146

139-
await client.publish("channel:1", "Hello")
140-
await client.publish("channel:2", "World")
141-
await client.publish("channel:1", "NOPE")
147+
await _publish(client, "channel:1", "Hello")
148+
await _publish(client, "channel:2", "World")
149+
await _publish(client, "channel:1", "NOPE")
142150

143151
await future
144152

0 commit comments

Comments
 (0)