@@ -100,8 +100,9 @@ def __init__(
100
100
# Configure the host objects
101
101
self .hosts = self .decode_hosts (hosts )
102
102
self .ring_size = len (self .hosts )
103
- # Cached redis connection pools
104
- self .pools = self .create_pools ()
103
+ # Cached redis connection pools and the event loop they are from
104
+ self .pools = {}
105
+ self .pools_loop = None
105
106
# Normal channels choose a host index by cycling through the available hosts
106
107
self ._receive_index_generator = itertools .cycle (range (len (self .hosts )))
107
108
self ._send_index_generator = itertools .cycle (range (len (self .hosts )))
@@ -125,23 +126,19 @@ def __init__(
125
126
# a message back into the main queue before its cleanup has completed
126
127
self .receive_clean_locks = ChannelLock ()
127
128
128
- def create_pools (self ):
129
- pools = []
130
- for host in self .hosts :
131
- if "address" in host :
132
- pools .append (aioredis .ConnectionPool .from_url (host ["address" ]))
133
- elif "master_name" in host :
134
- sentinels = host .pop ("sentinels" )
135
- master_name = host .pop ("master_name" )
136
- pools .append (
137
- aioredis .sentinel .SentinelConnectionPool (
138
- master_name , aioredis .sentinel .Sentinel (sentinels ), ** host
139
- )
140
- )
141
- else :
142
- pools .append (aioredis .ConnectionPool (** host ))
129
+ def create_pool (self , index ):
130
+ host = self .hosts [index ]
143
131
144
- return pools
132
+ if "address" in host :
133
+ return aioredis .ConnectionPool .from_url (host ["address" ])
134
+ elif "master_name" in host :
135
+ sentinels = host .pop ("sentinels" )
136
+ master_name = host .pop ("master_name" )
137
+ return aioredis .sentinel .SentinelConnectionPool (
138
+ master_name , aioredis .sentinel .Sentinel (sentinels ), ** host
139
+ )
140
+ else :
141
+ return aioredis .ConnectionPool (** host )
145
142
146
143
def decode_hosts (self , hosts ):
147
144
"""
@@ -497,8 +494,8 @@ async def close_pools(self):
497
494
# pools without flushing first.
498
495
await self .wait_received ()
499
496
500
- for pool in self .pools :
501
- await pool .disconnect ()
497
+ for index in self .pools :
498
+ await self . pools [ index ] .disconnect ()
502
499
503
500
async def wait_received (self ):
504
501
"""
@@ -725,4 +722,16 @@ def connection(self, index):
725
722
raise ValueError (
726
723
"There are only %s hosts - you asked for %s!" % (self .ring_size , index )
727
724
)
725
+
726
+ try :
727
+ loop = asyncio .get_running_loop ()
728
+ if self .pools_loop != loop :
729
+ self .pools = {}
730
+ self .pools_loop = loop
731
+ except RuntimeError :
732
+ pass
733
+
734
+ if index not in self .pools :
735
+ self .pools [index ] = self .create_pool (index )
736
+
728
737
return aioredis .Redis (connection_pool = self .pools [index ])
0 commit comments