Skip to content

Commit 0e54138

Browse files
jberciandrewgodwin
authored andcommitted
Fix pool management on event loop shutdown (#130)
Monkey patches the close method so it works on 3.6 as well.
1 parent cf5f741 commit 0e54138

File tree

1 file changed

+39
-3
lines changed

1 file changed

+39
-3
lines changed

channels_redis/core.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import random
88
import string
99
import time
10+
import types
1011

1112
import aioredis
1213
import msgpack
@@ -15,6 +16,23 @@
1516
from channels.layers import BaseChannelLayer
1617

1718

19+
def _wrap_close(loop, pool):
20+
"""
21+
Decorate an event loop's close method with our own.
22+
"""
23+
original_impl = loop.close
24+
25+
def _wrapper(self, *args, **kwargs):
26+
# If the event loop was closed, there's nothing we can do anymore.
27+
if not self.is_closed():
28+
self.run_until_complete(pool.close_loop(self))
29+
# Restore the original close() implementation after we're done.
30+
self.close = original_impl
31+
return self.close(*args, **kwargs)
32+
33+
loop.close = types.MethodType(_wrapper, loop)
34+
35+
1836
class ConnectionPool:
1937
"""
2038
Connection pool manager for the channel layer.
@@ -36,6 +54,9 @@ def _ensure_loop(self, loop):
3654
loop = asyncio.get_event_loop()
3755

3856
if loop not in self.conn_map:
57+
# Swap the loop's close method with our own so we get
58+
# a chance to do some cleanup.
59+
_wrap_close(loop, self)
3960
self.conn_map[loop] = []
4061

4162
return self.conn_map[loop], loop
@@ -57,8 +78,9 @@ def push(self, conn):
5778
"""
5879
loop = self.in_use[conn]
5980
del self.in_use[conn]
60-
conns, _ = self._ensure_loop(loop)
61-
conns.append(conn)
81+
if loop is not None:
82+
conns, _ = self._ensure_loop(loop)
83+
conns.append(conn)
6284

6385
def conn_error(self, conn):
6486
"""
@@ -74,6 +96,20 @@ def reset(self):
7496
self.conn_map = {}
7597
self.in_use = {}
7698

99+
async def close_loop(self, loop):
100+
"""
101+
Close all connections owned by the pool on the given loop.
102+
"""
103+
if loop in self.conn_map:
104+
for conn in self.conn_map[loop]:
105+
conn.close()
106+
await conn.wait_closed()
107+
del self.conn_map[loop]
108+
109+
for k, v in self.in_use.items():
110+
if v is loop:
111+
self.in_use[k] = None
112+
77113
async def close(self):
78114
"""
79115
Close all connections owned by the pool.
@@ -732,7 +768,7 @@ def consistent_hash(self, value):
732768
"""
733769
if isinstance(value, str):
734770
value = value.encode("utf8")
735-
bigval = binascii.crc32(value) & 0xfff
771+
bigval = binascii.crc32(value) & 0xFFF
736772
ring_divisor = 4096 / float(self.ring_size)
737773
return int(bigval / ring_divisor)
738774

0 commit comments

Comments
 (0)