Skip to content

Commit 83dc599

Browse files
committed
add _same_addres methods for sentinels
1 parent e5c74ac commit 83dc599

File tree

4 files changed

+69
-19
lines changed

4 files changed

+69
-19
lines changed

redis/asyncio/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ async def execute_command(self, *args, **options):
652652
if not self.connection:
653653
await pool.release(conn)
654654
if "ITER" in command_name.upper():
655-
pool.cleanup_scan(iter_req_id=options.get("_iter_req_id", None))
655+
pool.cleanup(iter_req_id=options.get("_iter_req_id", None))
656656

657657
async def parse_response(
658658
self, connection: Connection, command_name: Union[str, bytes], **options

redis/asyncio/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,7 +1058,7 @@ class ConnectionPool:
10581058
"""
10591059

10601060
@abstractmethod
1061-
def cleanup_scan(self, **options):
1061+
def cleanup(self, **options):
10621062
"""
10631063
Additional cleanup operations that the connection pool might
10641064
need to do after a SCAN ITER family command is executed
@@ -1332,5 +1332,5 @@ async def release(self, connection: AbstractConnection):
13321332
await super().release(connection)
13331333
self._condition.notify()
13341334

1335-
def cleanup_scan(self, **options):
1335+
def cleanup(self, **options):
13361336
pass

redis/asyncio/sentinel.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,16 @@ async def connect_to_address(self, host: str, port: int) -> None:
9191
lambda error: asyncio.sleep(0),
9292
)
9393

94+
async def connect_to_same_address(self):
95+
"""
96+
Similar to connect, but instead of rotating to the next slave (if not in master mode),
97+
it just connects to the same address of the connection object.
98+
"""
99+
return await self.retry.call_with_retry(
100+
lambda: self._connect_retry(same_address=True),
101+
lambda error: asyncio.sleep(0),
102+
)
103+
94104
async def read_response(
95105
self,
96106
disable_decoding: bool = False,
@@ -193,7 +203,30 @@ async def rotate_slaves(self) -> AsyncIterator:
193203
pass
194204
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
195205

196-
def cleanup_scan(self, **options):
206+
async def ensure_connection_connected_to_address(self, connection: SentinelManagedConnection):
207+
"""
208+
Ensure the connection is already connected to the server that this connection
209+
object wants to connect to
210+
211+
Similar to self.ensure_connection, but calling connection.connect()
212+
in SentinelManagedConnection (replica mode) will cause the
213+
connection object to connect to the next replica in rotation,
214+
and we don't wnat behavior. Look at get_connection inline docs for details.
215+
216+
Here, we just try to make sure that the connection is already connected
217+
to the replica we wanted it to.
218+
"""
219+
await connection.connect_to_same_address()
220+
try:
221+
if await connection.can_read_destructive() and connection.client_cache is None:
222+
raise ConnectionError("Connection has data")
223+
except (ConnectionError, OSError):
224+
await connection.disconnect()
225+
await connection.connect_to_same_address()
226+
if await connection.can_read_destructive():
227+
raise ConnectionError("Connection has data")
228+
229+
def cleanup(self, **options):
197230
"""
198231
Remove the SCAN ITER family command's request id from the dictionary
199232
"""
@@ -257,7 +290,7 @@ async def get_connection(
257290
# This will connect to the host and port of the replica
258291
else:
259292
await connection.connect_to_address(server_host, server_port)
260-
await self.ensure_connection(connection)
293+
self.ensure_connection_connected_to_address(connection)
261294
except BaseException:
262295
# Release the connection back to the pool so that we don't
263296
# leak it

redis/sentinel.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ def connect_to(self, address):
4141
if str_if_bytes(self.read_response()) != "PONG":
4242
raise ConnectionError("PING failed")
4343

44-
def _connect_retry(self, same_server: bool = False):
44+
def _connect_retry(self, same_address: bool = False):
4545
if self._sock:
4646
return # already connected
4747
# If same_server is True, it means that the connection
4848
# is not rotating to the next slave (if the connection pool is not master)
49-
if same_server:
49+
if same_address:
5050
self.connect_to(self.host, self.port)
5151
return
52-
# If same_server is False, connnect to master in master mode
52+
# If same_server is False, connnect to master in master mode
5353
# and rotate to the next slave in slave mode
5454
if self.connection_pool.is_master:
5555
self.connect_to(self.connection_pool.get_master_address())
@@ -61,12 +61,33 @@ def _connect_retry(self, same_server: bool = False):
6161
continue
6262
raise SlaveNotFoundError # Never be here
6363

64-
def connect(self, same_server: bool = False):
64+
def connect(self):
65+
return self.retry.call_with_retry(lambda: self._connect_retry(), lambda error: None)
66+
67+
def connect_to_same_address(self):
68+
"""
69+
Similar to connect, but instead of rotating to the next slave (if not in master mode),
70+
it just connects to the same address of the connection object.
71+
"""
6572
return self.retry.call_with_retry(
66-
lambda: self._connect_retry(same_server),
67-
lambda error: None
73+
lambda: self._connect_retry(same_address=True),
74+
lambda error: None
6875
)
6976

77+
def can_read_same_address(self, timeout=0):
78+
"""Similar to can_read_same_address, but calls connect_to_same_address instead of connect"""
79+
sock = self._sock
80+
if not sock:
81+
self.connect_to_same_address()
82+
83+
host_error = self._host_error()
84+
85+
try:
86+
return self._parser.can_read(timeout)
87+
except OSError as e:
88+
self.disconnect()
89+
raise ConnectionError(f"Error while reading from {host_error}: {e.args}")
90+
7091
def read_response(
7192
self,
7293
disable_decoding=False,
@@ -218,17 +239,17 @@ def ensure_connection_connected_to_address(self, connection: SentinelManagedConn
218239
Here, we just try to make sure that the connection is already connected
219240
to the replica we wanted it to.
220241
"""
221-
connection.connect(same_address=True)
242+
connection.connect_to_same_address()
222243
try:
223-
if connection.can_read(same_address=True) and connection.client_cache is None:
244+
if connection.can_read_same_address() and connection.client_cache is None:
224245
raise ConnectionError("Connection has data")
225246
except (ConnectionError, OSError):
226247
connection.disconnect()
227-
connection.connect(same_address=True)
248+
connection.connect_to_same_address()
228249
if connection.can_read():
229250
raise ConnectionError("Connection has data")
230251

231-
def cleanup_scan(self, **options):
252+
def cleanup(self, **options):
232253
"""
233254
Remove the SCAN ITER family command's request id from the dictionary
234255
"""
@@ -281,7 +302,6 @@ def get_connection(
281302
connection = self.make_connection()
282303
assert connection
283304
self._in_use_connections.add(connection)
284-
breakpoint()
285305
try:
286306
# Ensure this connection is connected to Redis
287307
# If this is the first scan request, it will
@@ -293,9 +313,7 @@ def get_connection(
293313
# This will connect to the host and port of the replica
294314
else:
295315
connection.connect_to_address(server_host, server_port)
296-
breakpoint()
297316
self.ensure_connection_connected_to_address(connection)
298-
breakpoint()
299317
except BaseException:
300318
# Release the connection back to the pool so that we don't
301319
# leak it
@@ -306,7 +324,6 @@ def get_connection(
306324
connection.host,
307325
connection.port,
308326
)
309-
breakpoint()
310327
return connection
311328

312329

0 commit comments

Comments
 (0)