Skip to content

Commit f504741

Browse files
committed
set fix address as a property of SentinelManagedConnection
1 parent 8a1bc06 commit f504741

File tree

5 files changed

+42
-119
lines changed

5 files changed

+42
-119
lines changed

redis/asyncio/connection.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,7 @@ def __init__(
11181118
self,
11191119
connection_class: Type[AbstractConnection] = Connection,
11201120
max_connections: Optional[int] = None,
1121+
index_available_connections: bool = False,
11211122
**connection_kwargs,
11221123
):
11231124
max_connections = max_connections or 2**31
@@ -1130,6 +1131,7 @@ def __init__(
11301131

11311132
self._available_connections: ConnectionsIndexer = ConnectionsIndexer()
11321133
self._in_use_connections: Set[AbstractConnection] = set()
1134+
self._index_available_connections= index_available_connections
11331135
self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder)
11341136

11351137
def __repr__(self):
@@ -1139,7 +1141,9 @@ def __repr__(self):
11391141
)
11401142

11411143
def reset(self):
1142-
self._available_connections = []
1144+
self._available_connections: ConnectionsIndexer | list = (
1145+
ConnectionsIndexer() if self._index_available_connections else []
1146+
)
11431147
self._in_use_connections = weakref.WeakSet()
11441148

11451149
def can_get_connection(self) -> bool:

redis/asyncio/sentinel.py

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ class SlaveNotFoundError(ConnectionError):
3535
class SentinelManagedConnection(Connection):
3636
def __init__(self, **kwargs):
3737
self.connection_pool = kwargs.pop("connection_pool")
38+
# To be set to True if we want to prevent
39+
# the sentinel managed connection to connect
40+
# to the most relevant sentinel in the pool and just
41+
# connect to the current self.host and self.port
42+
self._is_address_fixed = False
3843
super().__init__(**kwargs)
3944

4045
def __repr__(self):
@@ -48,6 +53,10 @@ def __repr__(self):
4853
s += host_info
4954
return s + ")>"
5055

56+
def fix_address(self, address):
57+
self.host, self.port = address
58+
self._is_address_fixed = True
59+
5160
async def connect_to(self, address):
5261
self.host, self.port = address
5362
await super().connect()
@@ -56,12 +65,12 @@ async def connect_to(self, address):
5665
if str_if_bytes(await self.read_response()) != "PONG":
5766
raise ConnectionError("PING failed")
5867

59-
async def _connect_retry(self, same_address: bool = False):
68+
async def _connect_retry(self):
6069
if self._reader:
6170
return # already connected
62-
# If same_server is True, it means that the connection
71+
# If address is fixed, it means that the connection
6372
# is not rotating to the next slave (if the connection pool is in replica mode)
64-
if same_address:
73+
if self._is_address_fixed:
6574
self.connect_to(self.host, self.port)
6675
return
6776
# If same_server is False, connnect to master in master mode
@@ -82,24 +91,6 @@ async def connect(self):
8291
lambda error: asyncio.sleep(0),
8392
)
8493

85-
async def connect_to_same_address(self):
86-
"""
87-
Similar to connect, but instead of rotating to the next slave (in replica mode),
88-
it just connects to the same address of the connection object.
89-
"""
90-
return await self.retry.call_with_retry(
91-
lambda: self._connect_retry(same_address=True),
92-
lambda error: asyncio.sleep(0),
93-
)
94-
95-
async def connect_to_address(self, address):
96-
"""
97-
Similar to connect, but instead of rotating to the next slave (in replica mode),
98-
it just connects to the same address of the connection object.
99-
"""
100-
self.host, self.port = address
101-
return await self.connect_to_same_address()
102-
10394
async def read_response(
10495
self,
10596
disable_decoding: bool = False,
@@ -202,34 +193,6 @@ async def rotate_slaves(self) -> AsyncIterator:
202193
pass
203194
raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
204195

205-
async def ensure_connection_connected_to_address(
206-
self, connection: SentinelManagedConnection
207-
):
208-
"""
209-
Ensure the connection is already connected to the server that this connection
210-
object wants to connect to
211-
212-
Similar to self.ensure_connection, but calling connection.connect()
213-
in SentinelManagedConnection (replica mode) will cause the
214-
connection object to connect to the next replica in rotation,
215-
and we don't wnat behavior. Look at get_connection inline docs for details.
216-
217-
Here, we just try to make sure that the connection is already connected
218-
to the replica we wanted it to.
219-
"""
220-
await connection.connect_to_same_address()
221-
try:
222-
if (
223-
await connection.can_read_destructive()
224-
and connection.client_cache is None
225-
):
226-
raise ConnectionError("Connection has data")
227-
except (ConnectionError, OSError):
228-
await connection.disconnect()
229-
await connection.connect_to_same_address()
230-
if await connection.can_read_destructive():
231-
raise ConnectionError("Connection has data")
232-
233196
def cleanup(self, **options):
234197
"""
235198
Remove the SCAN ITER family command's request id from the dictionary
@@ -291,8 +254,8 @@ async def get_connection(
291254
# connect to the previous replica.
292255
# This will connect to the host and port of the replica
293256
else:
294-
await connection.connect_to_address((server_host, server_port))
295-
await self.ensure_connection_connected_to_address(connection)
257+
connection.fix_address((server_host, server_port))
258+
await self.ensure_connection(connection)
296259
except BaseException:
297260
# Release the connection back to the pool so that we don't
298261
# leak it

redis/connection.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,7 @@ def __init__(
11351135
self,
11361136
connection_class=Connection,
11371137
max_connections: Optional[int] = None,
1138+
index_available_connections: bool = False,
11381139
**connection_kwargs,
11391140
):
11401141
max_connections = max_connections or 2**31
@@ -1154,6 +1155,7 @@ def __init__(
11541155
# will notice the first thread already did the work and simply
11551156
# release the lock.
11561157
self._fork_lock = threading.Lock()
1158+
self._index_available_connections= index_available_connections
11571159
self.reset()
11581160

11591161
def __repr__(self) -> (str, str):
@@ -1175,7 +1177,9 @@ def cleanup(self, **options):
11751177
def reset(self) -> None:
11761178
self._lock = threading.Lock()
11771179
self._created_connections = 0
1178-
self._available_connections = ConnectionsIndexer()
1180+
self._available_connections: ConnectionsIndexer | list = (
1181+
ConnectionsIndexer() if self._index_available_connections else []
1182+
)
11791183
self._in_use_connections = set()
11801184

11811185
# this must be the last operation in this method. while reset() is

redis/sentinel.py

Lines changed: 16 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ class SlaveNotFoundError(ConnectionError):
2020
class SentinelManagedConnection(Connection):
2121
def __init__(self, **kwargs):
2222
self.connection_pool = kwargs.pop("connection_pool")
23+
# To be set to True if we want to prevent
24+
# the sentinel managed connection to connect
25+
# to the most relevant sentinel in the pool and just
26+
# connect to the current self.host and self.port
27+
self._is_address_fixed = False
2328
super().__init__(**kwargs)
2429

2530
def __repr__(self):
@@ -33,6 +38,10 @@ def __repr__(self):
3338
s = s % host_info
3439
return s
3540

41+
def fix_address(self, address):
42+
self.host, self.port = address
43+
self._is_address_fixed = True
44+
3645
def connect_to(self, address):
3746
self.host, self.port = address
3847
super().connect()
@@ -41,12 +50,12 @@ def connect_to(self, address):
4150
if str_if_bytes(self.read_response()) != "PONG":
4251
raise ConnectionError("PING failed")
4352

44-
def _connect_retry(self, same_address: bool = False):
53+
def _connect_retry(self):
4554
if self._sock:
4655
return # already connected
47-
# If same_server is True, it means that the connection
56+
# If address is fixed, it means that the connection
4857
# is not rotating to the next slave (if the connection pool is not master)
49-
if same_address:
58+
if self._is_address_fixed:
5059
self.connect_to((self.host, self.port))
5160
return
5261
# If same_server is False, connnect to master in master mode
@@ -66,40 +75,6 @@ def connect(self):
6675
lambda: self._connect_retry(), lambda error: None
6776
)
6877

69-
def connect_to_same_address(self):
70-
"""
71-
Similar to connect, but instead of rotating to the next slave (in replica mode),
72-
it just connects to the same address of the connection object.
73-
"""
74-
return self.retry.call_with_retry(
75-
lambda: self._connect_retry(same_address=True), lambda error: None
76-
)
77-
78-
def connect_to_address(self, address):
79-
"""
80-
Similar to connect, but instead of rotating to the next slave (in replica mode),
81-
it just connects to the address supplied.
82-
"""
83-
self.host, self.port = address
84-
return self.connect_to_same_address()
85-
86-
def can_read_same_address(self, timeout=0):
87-
"""
88-
Similar to can_read_same_address, but calls
89-
connect_to_same_address instead of connect
90-
"""
91-
sock = self._sock
92-
if not sock:
93-
self.connect_to_same_address()
94-
95-
host_error = self._host_error()
96-
97-
try:
98-
return self._parser.can_read(timeout)
99-
except OSError as e:
100-
self.disconnect()
101-
raise ConnectionError(f"Error while reading from {host_error}: {e.args}")
102-
10378
def read_response(
10479
self,
10580
disable_decoding=False,
@@ -203,7 +178,7 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
203178
service_name=service_name,
204179
sentinel_manager=sentinel_manager,
205180
)
206-
super().__init__(**kwargs)
181+
super().__init__(index_available_connections=True, **kwargs)
207182
self.connection_kwargs["connection_pool"] = self.proxy
208183
self.service_name = service_name
209184
self.sentinel_manager = sentinel_manager
@@ -238,31 +213,6 @@ def rotate_slaves(self):
238213
"Round-robin slave balancer"
239214
return self.proxy.rotate_slaves()
240215

241-
def ensure_connection_connected_to_address(
242-
self, connection: SentinelManagedConnection
243-
):
244-
"""
245-
Ensure the connection is already connected to the server that this connection
246-
object wants to connect to.
247-
248-
Similar to self.ensure_connection, but calling connection.connect()
249-
in SentinelManagedConnection (replica mode) will cause the
250-
connection object to connect to the next replica in rotation,
251-
and we don't wnat behavior. Look at get_connection inline docs for details.
252-
253-
Here, we just try to make sure that the connection is already connected
254-
to the replica we wanted it to.
255-
"""
256-
connection.connect_to_same_address()
257-
try:
258-
if connection.can_read_same_address() and connection.client_cache is None:
259-
raise ConnectionError("Connection has data")
260-
except (ConnectionError, OSError):
261-
connection.disconnect()
262-
connection.connect_to_same_address()
263-
if connection.can_read():
264-
raise ConnectionError("Connection has data")
265-
266216
def cleanup(self, **options):
267217
"""
268218
Remove the SCAN ITER family command's request id from the dictionary
@@ -309,7 +259,7 @@ def get_connection(
309259
host=server_host, port=server_port
310260
)
311261
# If not, make a new dummy connection object, and set its host and port
312-
# to the one that we want later in the call to ``connect_to_same_address``
262+
# to the one that we want later in the call to ``fix_address``
313263
if not connection:
314264
connection = self.make_connection()
315265
assert connection
@@ -324,8 +274,8 @@ def get_connection(
324274
# connect to the previous replica.
325275
# This will connect to the host and port of the replica
326276
else:
327-
connection.connect_to_address((server_host, server_port))
328-
self.ensure_connection_connected_to_address(connection)
277+
connection.fix_address((server_host, server_port))
278+
self.ensure_connection(connection)
329279
except BaseException:
330280
# Release the connection back to the pool so that we don't
331281
# leak it

tests/test_connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ def mock_disconnect(_):
298298
assert called == 1
299299
pool.disconnect()
300300

301+
301302
@pytest.mark.parametrize(
302303
"conn, error, expected_message",
303304
[
@@ -347,6 +348,7 @@ def test_unix_socket_connection_failure():
347348
== "Error 2 connecting to unix:///tmp/a.sock. No such file or directory."
348349
)
349350

351+
350352
def test_connections_indexer_operations():
351353
ci = ConnectionsIndexer()
352354
c1 = Connection(host="1", port=2)

0 commit comments

Comments
 (0)