Skip to content

Commit 9ea85ff

Browse files
committed
add optional redis event handlers
1 parent 084ddc4 commit 9ea85ff

File tree

2 files changed

+41
-4
lines changed

2 files changed

+41
-4
lines changed

README.rst

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@ The server(s) to connect to, as either URIs, ``(host, port)`` tuples, or dicts c
5151
Defaults to ``['localhost', 6379]``. Pass multiple hosts to enable sharding,
5252
but note that changing the host list will lose some sharded data.
5353

54-
Sentinel connections require dicts conforming to `create_sentinel
55-
<https://aioredis.readthedocs.io/en/v1.3.0/sentinel.html#aioredis.sentinel.
56-
create_sentinel>` with an additional `master_name` key specifying the Sentinel
54+
Sentinel connections require dicts conforming to `create_sentinel <https://aioredis.readthedocs.io/en/v1.3.0/sentinel.html#aioredis.sentinel.create_sentinel>`_
55+
with an additional `master_name` key specifying the Sentinel
5756
master set. Plain Redis and Sentinel connections can be mixed and matched if
5857
sharding.
5958

@@ -165,6 +164,32 @@ If you're using Django, you may also wish to set this to your site's
165164
},
166165
}
167166
167+
``on_disconnect`` / ``on_reconnect``
168+
~~~~~~~~~~~~
169+
170+
The PubSub layer, which maintains long-running connections to Redis, can drop messages in the event of a network partition.
171+
To handle such situations the PubSub layer accepts optional arguments which will notify consumers of Redis disconnect/reconnect events.
172+
A common use-case is for consumers to ensure that they perform a full state re-sync to ensure that no messages have been missed.
173+
174+
.. code-block:: python
175+
176+
CHANNEL_LAYERS = {
177+
"default": {
178+
"BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
179+
"CONFIG": {
180+
"hosts": [...],
181+
"on_disconnect": "redis.disconnect",
182+
},
183+
},
184+
}
185+
186+
187+
And then in your channels consumer, you can implement the handler:
188+
189+
.. code-block:: python
190+
191+
async def redis_disconnect(self, *args):
192+
# Handle disconnect
168193
169194
Dependencies
170195
------------

channels_redis/pubsub.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ class RedisPubSubChannelLayer:
1313
Channel Layer that uses Redis's pub/sub functionality.
1414
"""
1515

16-
def __init__(self, hosts=None, prefix="asgi", **kwargs):
16+
def __init__(
17+
self, hosts=None, prefix="asgi", on_disconnect=None, on_reconnect=None, **kwargs
18+
):
1719
if hosts is None:
1820
hosts = [("localhost", 6379)]
1921
assert (
@@ -22,6 +24,9 @@ def __init__(self, hosts=None, prefix="asgi", **kwargs):
2224

2325
self.prefix = prefix
2426

27+
self.on_disconnect = on_disconnect
28+
self.on_reconnect = on_reconnect
29+
2530
# Each consumer gets its own *specific* channel, created with the `new_channel()` method.
2631
# This dict maps `channel_name` to a queue of messages for that channel.
2732
self.channels = {}
@@ -271,6 +276,7 @@ async def _get_sub_conn(self):
271276
async with self._lock:
272277
if self._sub_conn is not None and self._sub_conn.closed:
273278
self._sub_conn = None
279+
self._notify_consumers(self.channel_layer.on_disconnect)
274280
if self._sub_conn is None:
275281
if self._receive_task is not None:
276282
self._receive_task.cancel()
@@ -301,6 +307,7 @@ async def _get_sub_conn(self):
301307
self._receiver.channel(name) for name in self._subscribed_to
302308
]
303309
await self._sub_conn.subscribe(*resubscribe_to)
310+
self._notify_consumers(self.channel_layer.on_reconnect)
304311
return self._sub_conn
305312

306313
async def _do_receiving(self):
@@ -317,6 +324,11 @@ async def _do_receiving(self):
317324
if channel_name in self.channel_layer.channels:
318325
self.channel_layer.channels[channel_name].put_nowait(message)
319326

327+
def _notify_consumers(self, mtype):
328+
if mtype is not None:
329+
for channel in self.channel_layer.channels.values():
330+
channel.put_nowait(msgpack.packb({"type": mtype}))
331+
320332
async def _do_keepalive(self):
321333
"""
322334
This task's simple job is just to call `self._get_sub_conn()` periodically.

0 commit comments

Comments
 (0)