Skip to content

Commit 4012ef7

Browse files
authored
Merge pull request #259 from zumalabs/pubsub-redis-events
Add optional redis event handlers
2 parents 4e883c5 + 3449871 commit 4012ef7

File tree

2 files changed

+41
-4
lines changed

2 files changed

+41
-4
lines changed

README.rst

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ The server(s) to connect to, as either URIs, ``(host, port)`` tuples, or dicts c
5151
Defaults to ``['localhost', 6379]``. Pass multiple hosts to enable sharding,
5252
but note that changing the host list will lose some sharded data.
5353

54-
Sentinel connections require dicts conforming to `create_sentinel
55-
<https://aioredis.readthedocs.io/en/v1.3.0/sentinel.html#aioredis.sentinel.
56-
create_sentinel>` with an additional `master_name` key specifying the Sentinel
54+
Sentinel connections require dicts conforming to `create_sentinel <https://aioredis.readthedocs.io/en/v1.3.0/sentinel.html#aioredis.sentinel.create_sentinel>`_
55+
with an additional `master_name` key specifying the Sentinel
5756
master set. Plain Redis and Sentinel connections can be mixed and matched if
5857
sharding.
5958

@@ -165,6 +164,32 @@ If you're using Django, you may also wish to set this to your site's
165164
},
166165
}
167166
167+
``on_disconnect`` / ``on_reconnect``
168+
~~~~~~~~~~~~
169+
170+
The PubSub layer, which maintains long-running connections to Redis, can drop messages in the event of a network partition.
171+
To handle such situations the PubSub layer accepts optional arguments which will notify consumers of Redis disconnect/reconnect events.
172+
A common use-case is for consumers to ensure that they perform a full state re-sync to ensure that no messages have been missed.
173+
174+
.. code-block:: python
175+
176+
CHANNEL_LAYERS = {
177+
"default": {
178+
"BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
179+
"CONFIG": {
180+
"hosts": [...],
181+
"on_disconnect": "redis.disconnect",
182+
},
183+
},
184+
}
185+
186+
187+
And then in your channels consumer, you can implement the handler:
188+
189+
.. code-block:: python
190+
191+
async def redis_disconnect(self, *args):
192+
# Handle disconnect
168193
169194
Dependencies
170195
------------

channels_redis/pubsub.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ class RedisPubSubChannelLayer:
1313
Channel Layer that uses Redis's pub/sub functionality.
1414
"""
1515

16-
def __init__(self, hosts=None, prefix="asgi", **kwargs):
16+
def __init__(
17+
self, hosts=None, prefix="asgi", on_disconnect=None, on_reconnect=None, **kwargs
18+
):
1719
if hosts is None:
1820
hosts = [("localhost", 6379)]
1921
assert (
@@ -22,6 +24,9 @@ def __init__(self, hosts=None, prefix="asgi", **kwargs):
2224

2325
self.prefix = prefix
2426

27+
self.on_disconnect = on_disconnect
28+
self.on_reconnect = on_reconnect
29+
2530
# Each consumer gets its own *specific* channel, created with the `new_channel()` method.
2631
# This dict maps `channel_name` to a queue of messages for that channel.
2732
self.channels = {}
@@ -278,6 +283,7 @@ async def _get_sub_conn(self):
278283
if self._sub_conn is not None and self._sub_conn.closed:
279284
self._put_redis_conn(self._sub_conn)
280285
self._sub_conn = None
286+
self._notify_consumers(self.channel_layer.on_disconnect)
281287
if self._sub_conn is None:
282288
if self._receive_task is not None:
283289
self._receive_task.cancel()
@@ -309,6 +315,7 @@ async def _get_sub_conn(self):
309315
self._receiver.channel(name) for name in self._subscribed_to
310316
]
311317
await self._sub_conn.subscribe(*resubscribe_to)
318+
self._notify_consumers(self.channel_layer.on_reconnect)
312319
return self._sub_conn
313320

314321
async def _do_receiving(self):
@@ -325,6 +332,11 @@ async def _do_receiving(self):
325332
if channel_name in self.channel_layer.channels:
326333
self.channel_layer.channels[channel_name].put_nowait(message)
327334

335+
def _notify_consumers(self, mtype):
336+
if mtype is not None:
337+
for channel in self.channel_layer.channels.values():
338+
channel.put_nowait(msgpack.packb({"type": mtype}))
339+
328340
async def _ensure_redis(self):
329341
if self._redis is None:
330342
if self.master_name is None:

0 commit comments

Comments
 (0)