Skip to content

Commit 602bbe9

Browse files
committed
Applying moving/moved only on connections to the same proxy.
1 parent 9a31a71 commit 602bbe9

File tree

5 files changed

+702
-260
lines changed

5 files changed

+702
-260
lines changed

redis/_parsers/base.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,17 +202,20 @@ def handle_push_response(self, response, **kwargs):
202202
if msg_type in _INVALIDATION_MESSAGE and self.invalidation_push_handler_func:
203203
return self.invalidation_push_handler_func(response)
204204
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
205+
# TODO: PARSE latest format when available
205206
host, port = response[2].decode().split(":")
206207
ttl = response[1]
207208
id = 1 # Hardcoded value until the notification starts including the id
208209
notification = NodeMovingEvent(id, host, port, ttl)
209210
return self.node_moving_push_handler_func(notification)
210211
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
211212
if msg_type in _MIGRATING_MESSAGE:
213+
# TODO: PARSE latest format when available
212214
ttl = response[1]
213215
id = 2 # Hardcoded value until the notification starts including the id
214216
notification = NodeMigratingEvent(id, ttl)
215217
elif msg_type in _MIGRATED_MESSAGE:
218+
# TODO: PARSE latest format when available
216219
id = 3 # Hardcoded value until the notification starts including the id
217220
notification = NodeMigratedEvent(id)
218221
else:
@@ -260,17 +263,20 @@ async def handle_push_response(self, response, **kwargs):
260263
return await self.invalidation_push_handler_func(response)
261264
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
262265
# push notification from enterprise cluster for node moving
266+
# TODO: PARSE latest format when available
263267
host, port = response[2].split(":")
264268
ttl = response[1]
265269
id = 1 # Hardcoded value for async parser
266270
notification = NodeMovingEvent(id, host, port, ttl)
267271
return await self.node_moving_push_handler_func(notification)
268272
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
269273
if msg_type in _MIGRATING_MESSAGE:
274+
# TODO: PARSE latest format when available
270275
ttl = response[1]
271276
id = 2 # Hardcoded value for async parser
272277
notification = NodeMigratingEvent(id, ttl)
273278
elif msg_type in _MIGRATED_MESSAGE:
279+
# TODO: PARSE latest format when available
274280
id = 3 # Hardcoded value for async parser
275281
notification = NodeMigratedEvent(id)
276282
return await self.maintenance_push_handler_func(notification)
@@ -283,7 +289,7 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func):
283289
"""Set the invalidation push handler function"""
284290
self.invalidation_push_handler_func = invalidation_push_handler_func
285291

286-
def set_node_moving_push_handler_func(self, node_moving_push_handler_func):
292+
def set_node_moving_push_handler(self, node_moving_push_handler_func):
287293
self.node_moving_push_handler_func = node_moving_push_handler_func
288294

289295
def set_maintenance_push_handler(self, maintenance_push_handler_func):

redis/connection.py

Lines changed: 118 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from abc import abstractmethod
99
from itertools import chain
1010
from queue import Empty, Full, LifoQueue
11-
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union
11+
from typing import Any, Callable, Dict, List, Literal, Optional, Type, TypeVar, Union
1212
from urllib.parse import parse_qs, unquote, urlparse
1313

1414
from redis.cache import (
@@ -249,6 +249,13 @@ def maintenance_state(self, state: "MaintenanceState"):
249249
"""
250250
pass
251251

252+
@abstractmethod
253+
def getpeername(self):
254+
"""
255+
Returns the peer name of the connection.
256+
"""
257+
pass
258+
252259
@abstractmethod
253260
def mark_for_reconnect(self):
254261
"""
@@ -402,6 +409,7 @@ def __init__(
402409

403410
if maintenance_events_config and maintenance_events_config.enabled:
404411
if maintenance_events_pool_handler:
412+
maintenance_events_pool_handler.set_connection(self)
405413
self._parser.set_node_moving_push_handler(
406414
maintenance_events_pool_handler.handle_event
407415
)
@@ -484,6 +492,7 @@ def set_parser(self, parser_class):
484492
def set_maintenance_event_pool_handler(
485493
self, maintenance_event_pool_handler: MaintenanceEventPoolHandler
486494
):
495+
maintenance_event_pool_handler.set_connection(self)
487496
self._parser.set_node_moving_push_handler(
488497
maintenance_event_pool_handler.handle_event
489498
)
@@ -867,6 +876,11 @@ def maintenance_state(self) -> MaintenanceState:
867876
def maintenance_state(self, state: "MaintenanceState"):
868877
self._maintenance_state = state
869878

879+
def getpeername(self):
880+
if not self._sock:
881+
return None
882+
return self._sock.getpeername()[0]
883+
870884
def mark_for_reconnect(self):
871885
self._should_reconnect = True
872886

@@ -1892,10 +1906,27 @@ def re_auth_callback(self, token: TokenInterface):
18921906
for conn in self._in_use_connections:
18931907
conn.set_re_auth_token(token)
18941908

1895-
def set_maintenance_state_for_all_connections(self, state: "MaintenanceState"):
1909+
def set_maintenance_state_for_connections(
1910+
self,
1911+
state: "MaintenanceState",
1912+
matching_address: Optional[str] = None,
1913+
address_type_to_match: Literal["connected", "configured"] = "connected",
1914+
):
18961915
for conn in self._available_connections:
1916+
if address_type_to_match == "connected":
1917+
if matching_address and conn.getpeername() != matching_address:
1918+
continue
1919+
else:
1920+
if matching_address and conn.host != matching_address:
1921+
continue
18971922
conn.maintenance_state = state
18981923
for conn in self._in_use_connections:
1924+
if address_type_to_match == "connected":
1925+
if matching_address and conn.getpeername() != matching_address:
1926+
continue
1927+
else:
1928+
if matching_address and conn.host != matching_address:
1929+
continue
18991930
conn.maintenance_state = state
19001931

19011932
def set_maintenance_state_in_connection_kwargs(self, state: "MaintenanceState"):
@@ -1963,7 +1994,12 @@ def remove_tmp_config_from_connection_kwargs(self):
19631994
}
19641995
)
19651996

1966-
def reset_connections_tmp_settings(self):
1997+
def reset_connections_tmp_settings(
1998+
self,
1999+
moving_address: Optional[str] = None,
2000+
reset_host_address: bool = False,
2001+
reset_relax_timeout: bool = False,
2002+
):
19672003
"""
19682004
Restore original settings from temporary configuration for all connections in the pool.
19692005
@@ -1978,16 +2014,25 @@ def reset_connections_tmp_settings(self):
19782014
"""
19792015
with self._lock:
19802016
for conn in self._available_connections:
2017+
if moving_address and conn.host != moving_address:
2018+
continue
19812019
conn.reset_tmp_settings(
1982-
reset_host_address=True, reset_relax_timeout=True
2020+
reset_host_address=reset_host_address,
2021+
reset_relax_timeout=reset_relax_timeout,
19832022
)
19842023
for conn in self._in_use_connections:
2024+
if moving_address and conn.host != moving_address:
2025+
continue
19852026
conn.reset_tmp_settings(
1986-
reset_host_address=True, reset_relax_timeout=True
2027+
reset_host_address=reset_host_address,
2028+
reset_relax_timeout=reset_relax_timeout,
19872029
)
19882030

19892031
def update_active_connections_for_reconnect(
1990-
self, tmp_host_address: str, tmp_relax_timeout: Optional[float] = None
2032+
self,
2033+
tmp_host_address: str,
2034+
tmp_relax_timeout: Optional[float] = None,
2035+
moving_address_src: Optional[str] = None,
19912036
):
19922037
"""
19932038
Mark all active connections for reconnect.
@@ -1999,6 +2044,8 @@ def update_active_connections_for_reconnect(
19992044
:param tmp_relax_timeout: The relax timeout to use for the connection.
20002045
"""
20012046
for conn in self._in_use_connections:
2047+
if moving_address_src and conn.getpeername() != moving_address_src:
2048+
continue
20022049
self._update_connection_for_reconnect(
20032050
conn, tmp_host_address, tmp_relax_timeout
20042051
)
@@ -2007,6 +2054,7 @@ def disconnect_and_reconfigure_free_connections(
20072054
self,
20082055
tmp_host_address: str,
20092056
tmp_relax_timeout: Optional[float] = None,
2057+
moving_address_src: Optional[str] = None,
20102058
):
20112059
"""
20122060
Disconnect all free/available connections.
@@ -2019,13 +2067,17 @@ def disconnect_and_reconfigure_free_connections(
20192067
"""
20202068

20212069
for conn in self._available_connections:
2070+
if moving_address_src and conn.getpeername() != moving_address_src:
2071+
continue
20222072
self._disconnect_and_update_connection_for_reconnect(
20232073
conn, tmp_host_address, tmp_relax_timeout
20242074
)
20252075

20262076
def update_connections_current_timeout(
20272077
self,
20282078
relax_timeout: Optional[float],
2079+
matching_address: Optional[str] = None,
2080+
address_type_to_match: Literal["connected", "configured"] = "connected",
20292081
include_free_connections: bool = False,
20302082
):
20312083
"""
@@ -2039,10 +2091,22 @@ def update_connections_current_timeout(
20392091
:param include_available_connections: Whether to include available connections in the update.
20402092
"""
20412093
for conn in self._in_use_connections:
2094+
if address_type_to_match == "connected":
2095+
if matching_address and conn.getpeername() != matching_address:
2096+
continue
2097+
else:
2098+
if matching_address and conn.host != matching_address:
2099+
continue
20422100
conn.update_current_socket_timeout(relax_timeout)
20432101

20442102
if include_free_connections:
20452103
for conn in self._available_connections:
2104+
if address_type_to_match == "connected":
2105+
if matching_address and conn.getpeername() != matching_address:
2106+
continue
2107+
else:
2108+
if matching_address and conn.host != matching_address:
2109+
continue
20462110
conn.update_current_socket_timeout(relax_timeout)
20472111

20482112
def _update_connection_for_reconnect(
@@ -2308,7 +2372,10 @@ def disconnect(self):
23082372
self._locked = False
23092373

23102374
def update_active_connections_for_reconnect(
2311-
self, tmp_host_address: str, tmp_relax_timeout: Optional[float] = None
2375+
self,
2376+
tmp_host_address: str,
2377+
tmp_relax_timeout: Optional[float] = None,
2378+
moving_address_src: Optional[str] = None,
23122379
):
23132380
"""
23142381
Mark all active connections for reconnect.
@@ -2323,6 +2390,8 @@ def update_active_connections_for_reconnect(
23232390
connections_in_queue = {conn for conn in self.pool.queue if conn}
23242391
for conn in self._connections:
23252392
if conn not in connections_in_queue:
2393+
if moving_address_src and conn.getpeername() != moving_address_src:
2394+
continue
23262395
self._update_connection_for_reconnect(
23272396
conn, tmp_host_address, tmp_relax_timeout
23282397
)
@@ -2331,6 +2400,7 @@ def disconnect_and_reconfigure_free_connections(
23312400
self,
23322401
tmp_host_address: str,
23332402
tmp_relax_timeout: Optional[Number] = None,
2403+
moving_address_src: Optional[str] = None,
23342404
):
23352405
"""
23362406
Disconnect all free/available connections.
@@ -2345,13 +2415,17 @@ def disconnect_and_reconfigure_free_connections(
23452415

23462416
for conn in existing_connections:
23472417
if conn:
2418+
if moving_address_src and conn.getpeername() != moving_address_src:
2419+
continue
23482420
self._disconnect_and_update_connection_for_reconnect(
23492421
conn, tmp_host_address, tmp_relax_timeout
23502422
)
23512423

23522424
def update_connections_current_timeout(
23532425
self,
23542426
relax_timeout: Optional[float] = None,
2427+
matching_address: Optional[str] = None,
2428+
address_type_to_match: Literal["connected", "configured"] = "connected",
23552429
include_free_connections: bool = False,
23562430
):
23572431
"""
@@ -2365,11 +2439,23 @@ def update_connections_current_timeout(
23652439
"""
23662440
if include_free_connections:
23672441
for conn in tuple(self._connections):
2442+
if address_type_to_match == "connected":
2443+
if matching_address and conn.getpeername() != matching_address:
2444+
continue
2445+
else:
2446+
if matching_address and conn.host != matching_address:
2447+
continue
23682448
conn.update_current_socket_timeout(relax_timeout)
23692449
else:
23702450
connections_in_queue = {conn for conn in self.pool.queue if conn}
23712451
for conn in self._connections:
23722452
if conn not in connections_in_queue:
2453+
if address_type_to_match == "connected":
2454+
if matching_address and conn.getpeername() != matching_address:
2455+
continue
2456+
else:
2457+
if matching_address and conn.host != matching_address:
2458+
continue
23732459
conn.update_current_socket_timeout(relax_timeout)
23742460

23752461
def _update_maintenance_events_config_for_connections(
@@ -2387,14 +2473,24 @@ def _update_maintenance_events_configs_for_connections(
23872473
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
23882474
conn.maintenance_events_config = maintenance_events_pool_handler.config
23892475

2390-
def reset_connections_tmp_settings(self):
2476+
def reset_connections_tmp_settings(
2477+
self,
2478+
moving_address: Optional[str] = None,
2479+
reset_host_address: bool = False,
2480+
reset_relax_timeout: bool = False,
2481+
):
23912482
"""
23922483
Override base class method to work with BlockingConnectionPool's structure.
23932484
23942485
Restore original settings from temporary configuration for all connections in the pool.
23952486
"""
23962487
for conn in tuple(self._connections):
2397-
conn.reset_tmp_settings(reset_host_address=True, reset_relax_timeout=True)
2488+
if moving_address and conn.host != moving_address:
2489+
continue
2490+
conn.reset_tmp_settings(
2491+
reset_host_address=reset_host_address,
2492+
reset_relax_timeout=reset_relax_timeout,
2493+
)
23982494

23992495
def set_in_maintenance(self, in_maintenance: bool):
24002496
"""
@@ -2405,6 +2501,18 @@ def set_in_maintenance(self, in_maintenance: bool):
24052501
"""
24062502
self._in_maintenance = in_maintenance
24072503

2408-
def set_maintenance_state_for_all_connections(self, state: "MaintenanceState"):
2504+
def set_maintenance_state_for_connections(
2505+
self,
2506+
state: "MaintenanceState",
2507+
matching_address: Optional[str] = None,
2508+
address_type_to_match: Literal["connected", "configured"] = "connected",
2509+
):
24092510
for conn in self._connections:
2511+
if address_type_to_match == "connected":
2512+
if matching_address and conn.getpeername() != matching_address:
2513+
continue
2514+
else:
2515+
if matching_address and conn.host != matching_address:
2516+
continue
2517+
24102518
conn.maintenance_state = state

0 commit comments

Comments
 (0)