Skip to content

Commit 0a7d10e

Browse files
khpetersonandrewgodwin
authored andcommitted
Release receive_lock on CancelledError when task.cancel() fails
Fixes #134
1 parent 6221965 commit 0a7d10e

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

channels_redis/core.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,10 @@ async def receive(self, channel):
384384
# Ensure all tasks are cancelled if we are cancelled.
385385
# Also see: https://bugs.python.org/issue23859
386386
for task in tasks:
387-
task.cancel()
387+
if not task.cancel():
388+
assert task.done()
389+
if task.result() is True:
390+
self.receive_lock.release()
388391

389392
raise
390393

tests/test_core.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,3 +321,25 @@ async def test_group_send_capacity(channel_layer):
321321
with pytest.raises(asyncio.TimeoutError):
322322
async with async_timeout.timeout(1):
323323
await channel_layer.receive(channel)
324+
325+
326+
@pytest.mark.asyncio
327+
async def test_receive_cancel(channel_layer):
328+
"""
329+
Makes sure we can cancel a receive without blocking
330+
"""
331+
channel_layer = RedisChannelLayer(capacity=10)
332+
channel = await channel_layer.new_channel()
333+
delay = 0
334+
while delay < 0.01:
335+
await channel_layer.send(channel, {"type": "test.message", "text": "Ahoy-hoy!"})
336+
337+
task = asyncio.ensure_future(channel_layer.receive(channel))
338+
await asyncio.sleep(delay)
339+
task.cancel()
340+
delay += 0.001
341+
342+
try:
343+
await asyncio.wait_for(task, None)
344+
except asyncio.CancelledError:
345+
pass

0 commit comments

Comments
 (0)