@@ -41,15 +41,15 @@ def connect_to(self, address):
41
41
if str_if_bytes (self .read_response ()) != "PONG" :
42
42
raise ConnectionError ("PING failed" )
43
43
44
- def _connect_retry (self , same_server : bool = False ):
44
+ def _connect_retry (self , same_address : bool = False ):
45
45
if self ._sock :
46
46
return # already connected
47
47
# If same_server is True, it means that the connection
48
48
# is not rotating to the next slave (if the connection pool is not master)
49
- if same_server :
49
+ if same_address :
50
50
self .connect_to (self .host , self .port )
51
51
return
52
- # If same_server is False, connnect to master in master mode
52
+ # If same_server is False, connnect to master in master mode
53
53
# and rotate to the next slave in slave mode
54
54
if self .connection_pool .is_master :
55
55
self .connect_to (self .connection_pool .get_master_address ())
@@ -61,12 +61,33 @@ def _connect_retry(self, same_server: bool = False):
61
61
continue
62
62
raise SlaveNotFoundError # Never be here
63
63
64
- def connect (self , same_server : bool = False ):
64
+ def connect (self ):
65
+ return self .retry .call_with_retry (lambda : self ._connect_retry (), lambda error : None )
66
+
67
+ def connect_to_same_address (self ):
68
+ """
69
+ Similar to connect, but instead of rotating to the next slave (if not in master mode),
70
+ it just connects to the same address of the connection object.
71
+ """
65
72
return self .retry .call_with_retry (
66
- lambda : self ._connect_retry (same_server ),
67
- lambda error : None
73
+ lambda : self ._connect_retry (same_address = True ),
74
+ lambda error : None
68
75
)
69
76
77
+ def can_read_same_address (self , timeout = 0 ):
78
+ """Similar to can_read_same_address, but calls connect_to_same_address instead of connect"""
79
+ sock = self ._sock
80
+ if not sock :
81
+ self .connect_to_same_address ()
82
+
83
+ host_error = self ._host_error ()
84
+
85
+ try :
86
+ return self ._parser .can_read (timeout )
87
+ except OSError as e :
88
+ self .disconnect ()
89
+ raise ConnectionError (f"Error while reading from { host_error } : { e .args } " )
90
+
70
91
def read_response (
71
92
self ,
72
93
disable_decoding = False ,
@@ -218,17 +239,17 @@ def ensure_connection_connected_to_address(self, connection: SentinelManagedConn
218
239
Here, we just try to make sure that the connection is already connected
219
240
to the replica we wanted it to.
220
241
"""
221
- connection .connect ( same_address = True )
242
+ connection .connect_to_same_address ( )
222
243
try :
223
- if connection .can_read ( same_address = True ) and connection .client_cache is None :
244
+ if connection .can_read_same_address ( ) and connection .client_cache is None :
224
245
raise ConnectionError ("Connection has data" )
225
246
except (ConnectionError , OSError ):
226
247
connection .disconnect ()
227
- connection .connect ( same_address = True )
248
+ connection .connect_to_same_address ( )
228
249
if connection .can_read ():
229
250
raise ConnectionError ("Connection has data" )
230
251
231
- def cleanup_scan (self , ** options ):
252
+ def cleanup (self , ** options ):
232
253
"""
233
254
Remove the SCAN ITER family command's request id from the dictionary
234
255
"""
@@ -281,7 +302,6 @@ def get_connection(
281
302
connection = self .make_connection ()
282
303
assert connection
283
304
self ._in_use_connections .add (connection )
284
- breakpoint ()
285
305
try :
286
306
# Ensure this connection is connected to Redis
287
307
# If this is the first scan request, it will
@@ -293,9 +313,7 @@ def get_connection(
293
313
# This will connect to the host and port of the replica
294
314
else :
295
315
connection .connect_to_address (server_host , server_port )
296
- breakpoint ()
297
316
self .ensure_connection_connected_to_address (connection )
298
- breakpoint ()
299
317
except BaseException :
300
318
# Release the connection back to the pool so that we don't
301
319
# leak it
@@ -306,7 +324,6 @@ def get_connection(
306
324
connection .host ,
307
325
connection .port ,
308
326
)
309
- breakpoint ()
310
327
return connection
311
328
312
329
0 commit comments