Skip to content

Commit 449b90f

Browse files
authored
Log error if failed to group_send because of full capacity. (#172)
1 parent 2dfc892 commit 449b90f

File tree

2 files changed

+62
-6
lines changed

2 files changed

+62
-6
lines changed

channels_redis/core.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import collections
55
import hashlib
66
import itertools
7+
import logging
78
import random
89
import string
910
import time
@@ -15,6 +16,8 @@
1516
from channels.exceptions import ChannelFull
1617
from channels.layers import BaseChannelLayer
1718

19+
logger = logging.getLogger(__name__)
20+
1821

1922
def _wrap_close(loop, pool):
2023
"""
@@ -632,13 +635,16 @@ async def group_send(self, group, message):
632635
# __asgi_channel__ key.
633636

634637
group_send_lua = (
635-
"""
638+
""" local over_capacity = 0
636639
for i=1,#KEYS do
637640
if redis.call('LLEN', KEYS[i]) < tonumber(ARGV[i + #KEYS]) then
638641
redis.call('LPUSH', KEYS[i], ARGV[i])
639642
redis.call('EXPIRE', KEYS[i], %d)
643+
else
644+
over_capacity = over_capacity + 1
640645
end
641646
end
647+
return over_capacity
642648
"""
643649
% self.expiry
644650
)
@@ -657,9 +663,13 @@ async def group_send(self, group, message):
657663

658664
# channel_keys does not contain a single redis key more than once
659665
async with self.connection(connection_index) as connection:
660-
await connection.eval(
666+
channels_over_capacity = await connection.eval(
661667
group_send_lua, keys=channel_redis_keys, args=args
662668
)
669+
if channels_over_capacity > 0:
670+
logger.exception(
671+
f"{channels_over_capacity} of {len(channel_names)} channels over capacity in group {group}"
672+
)
663673

664674
def _map_channel_to_connection(self, channel_names, message):
665675
"""

tests/test_core.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,10 @@ async def test_groups_multiple_hosts_performance(
299299

300300

301301
@pytest.mark.asyncio
302-
async def test_group_send_capacity(channel_layer):
302+
async def test_group_send_capacity(channel_layer, caplog):
303303
"""
304-
Makes sure we dont send messages to groups that are over capacity
304+
Makes sure we dont group_send messages to channels that are over capacity.
305+
Make sure number of channels with full capacity are logged as an exception to help debug errors.
305306
"""
306307

307308
channel = await channel_layer.new_channel()
@@ -312,16 +313,61 @@ async def test_group_send_capacity(channel_layer):
312313
await channel_layer.group_send("test-group", {"type": "message.3"})
313314
await channel_layer.group_send("test-group", {"type": "message.4"})
314315

315-
# We should recieve the first 3 messages
316+
# We should receive the first 3 messages
316317
assert (await channel_layer.receive(channel))["type"] == "message.1"
317318
assert (await channel_layer.receive(channel))["type"] == "message.2"
318319
assert (await channel_layer.receive(channel))["type"] == "message.3"
319320

320-
# Make sure we do NOT recieve message 4
321+
# Make sure we do NOT receive message 4
321322
with pytest.raises(asyncio.TimeoutError):
322323
async with async_timeout.timeout(1):
323324
await channel_layer.receive(channel)
324325

326+
# Make sure number of channels over capacity are logged
327+
for record in caplog.records:
328+
assert record.levelname == "ERROR"
329+
assert record.msg == "1 of 1 channels over capacity in group test-group"
330+
331+
332+
@pytest.mark.asyncio
333+
async def test_group_send_capacity_multiple_channels(channel_layer, caplog):
334+
"""
335+
Makes sure we dont group_send messages to channels that are over capacity
336+
Make sure number of channels with full capacity are logged as an exception to help debug errors.
337+
"""
338+
339+
channel_1 = await channel_layer.new_channel()
340+
channel_2 = await channel_layer.new_channel(prefix="channel_2")
341+
await channel_layer.group_add("test-group", channel_1)
342+
await channel_layer.group_add("test-group", channel_2)
343+
344+
# Let's put channel_2 over capacity
345+
await channel_layer.send(channel_2, {"type": "message.0"})
346+
347+
await channel_layer.group_send("test-group", {"type": "message.1"})
348+
await channel_layer.group_send("test-group", {"type": "message.2"})
349+
await channel_layer.group_send("test-group", {"type": "message.3"})
350+
351+
# Channel_1 should receive all 3 group messages
352+
assert (await channel_layer.receive(channel_1))["type"] == "message.1"
353+
assert (await channel_layer.receive(channel_1))["type"] == "message.2"
354+
assert (await channel_layer.receive(channel_1))["type"] == "message.3"
355+
356+
# Channel_2 should receive the first message + 2 group messages
357+
assert (await channel_layer.receive(channel_2))["type"] == "message.0"
358+
assert (await channel_layer.receive(channel_2))["type"] == "message.1"
359+
assert (await channel_layer.receive(channel_2))["type"] == "message.2"
360+
361+
# Make sure channel_2 does not receive the 3rd group message
362+
with pytest.raises(asyncio.TimeoutError):
363+
async with async_timeout.timeout(1):
364+
await channel_layer.receive(channel_2)
365+
366+
# Make sure number of channels over capacity are logged
367+
for record in caplog.records:
368+
assert record.levelname == "ERROR"
369+
assert record.msg == "1 of 2 channels over capacity in group test-group"
370+
325371

326372
@pytest.mark.asyncio
327373
async def test_connection_pool_pop():

0 commit comments

Comments
 (0)