@@ -56,9 +56,16 @@ async def connect_to(self, address):
56
56
if str_if_bytes (await self .read_response ()) != "PONG" :
57
57
raise ConnectionError ("PING failed" )
58
58
59
- async def _connect_retry (self ):
59
+ async def _connect_retry (self , same_address : bool = False ):
60
60
if self ._reader :
61
61
return # already connected
62
+ # If same_server is True, it means that the connection
63
+ # is not rotating to the next slave (if the connection pool is not master)
64
+ if same_address :
65
+ self .connect_to (self .host , self .port )
66
+ return
67
+ # If same_server is False, connnect to master in master mode
68
+ # and rotate to the next slave in slave mode
62
69
if self .connection_pool .is_master :
63
70
await self .connect_to (await self .connection_pool .get_master_address ())
64
71
else :
@@ -75,22 +82,6 @@ async def connect(self):
75
82
lambda error : asyncio .sleep (0 ),
76
83
)
77
84
78
- async def _connect_to_address_retry (self , host : str , port : int ) -> None :
79
- if self ._reader :
80
- return # already connected
81
- try :
82
- return await self .connect_to ((host , port ))
83
- except ConnectionError :
84
- raise SlaveNotFoundError
85
-
86
- async def connect_to_address (self , host : str , port : int ) -> None :
87
- # Connect to the specified host and port
88
- # instead of connecting to the master / rotated slaves
89
- return await self .retry .call_with_retry (
90
- lambda : self ._connect_to_address_retry (host , port ),
91
- lambda error : asyncio .sleep (0 ),
92
- )
93
-
94
85
async def connect_to_same_address (self ):
95
86
"""
96
87
Similar to connect, but instead of rotating to the next slave (if not in master mode),
@@ -274,7 +265,7 @@ async def get_connection(
274
265
host = server_host , port = server_port
275
266
)
276
267
# If not, make a new dummy connection object, and set its host and
277
- # port to the one that we want later in the call to ``connect_to_address ``
268
+ # port to the one that we want later in the call to ``connect_to_same_address ``
278
269
if not connection :
279
270
connection = self .make_connection ()
280
271
assert connection
@@ -289,7 +280,7 @@ async def get_connection(
289
280
# connect to the previous replica.
290
281
# This will connect to the host and port of the replica
291
282
else :
292
- await connection .connect_to_address ( server_host , server_port )
283
+ await connection .connect_to_same_address ( )
293
284
self .ensure_connection_connected_to_address (connection )
294
285
except BaseException :
295
286
# Release the connection back to the pool so that we don't
0 commit comments