Skip to content

Commit 4c6eb44

Browse files
committed
Applying review comments
1 parent 260b34e commit 4c6eb44

File tree

4 files changed

+70
-58
lines changed

4 files changed

+70
-58
lines changed

redis/_parsers/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import sys
22
from abc import ABC
33
from asyncio import IncompleteReadError, StreamReader, TimeoutError
4-
from typing import Callable, List, Optional, Protocol, Union
4+
from typing import Awaitable, Callable, List, Optional, Protocol, Union
55

66
from redis.maintenance_events import (
77
NodeMigratedEvent,
@@ -243,8 +243,8 @@ class AsyncPushNotificationsParser(Protocol):
243243

244244
pubsub_push_handler_func: Callable
245245
invalidation_push_handler_func: Optional[Callable] = None
246-
node_moving_push_handler_func: Optional[Callable] = None
247-
maintenance_push_handler_func: Optional[Callable] = None
246+
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
247+
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
248248

249249
async def handle_pubsub_push_response(self, response):
250250
"""Handle pubsub push responses asynchronously"""

redis/client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -954,12 +954,13 @@ def _execute(self, conn, command, *args, **kwargs):
954954
patterns we were previously listening to
955955
"""
956956

957+
if conn.should_reconnect():
958+
self._reconnect(conn)
959+
957960
response = conn.retry.call_with_retry(
958961
lambda: command(*args, **kwargs),
959962
lambda _: self._reconnect(conn),
960963
)
961-
if conn.should_reconnect():
962-
self._reconnect(conn)
963964

964965
return response
965966

@@ -1172,6 +1173,7 @@ def get_message(
11721173
return None
11731174

11741175
response = self.parse_response(block=(timeout is None), timeout=timeout)
1176+
11751177
if response:
11761178
return self.handle_message(response, ignore_subscribe_messages)
11771179
return None

redis/connection.py

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,8 @@ def __init__(
445445
if orig_socket_connect_timeout
446446
else self.socket_connect_timeout
447447
)
448+
else:
449+
self._maintenance_event_connection_handler = None
448450
self._should_reconnect = False
449451
self.maintenance_state = maintenance_state
450452

@@ -511,8 +513,8 @@ def set_maintenance_event_pool_handler(
511513
maintenance_event_pool_handler.handle_event
512514
)
513515

514-
# Initialize maintenance event connection handler if it doesn't exist
515-
if not hasattr(self, "_maintenance_event_connection_handler"):
516+
# Update maintenance event connection handler if it doesn't exist
517+
if not self._maintenance_event_connection_handler:
516518
self._maintenance_event_connection_handler = (
517519
MaintenanceEventConnectionHandler(
518520
self, maintenance_event_pool_handler.config
@@ -521,6 +523,10 @@ def set_maintenance_event_pool_handler(
521523
self._parser.set_maintenance_push_handler(
522524
self._maintenance_event_connection_handler.handle_event
523525
)
526+
else:
527+
self._maintenance_event_connection_handler.config = (
528+
maintenance_event_pool_handler.config
529+
)
524530

525531
def connect(self):
526532
"Connects to the Redis server if not already connected"
@@ -1983,20 +1989,8 @@ def update_connections_settings(
19831989
:param reset_relax_timeout: Whether to reset the relax timeout to the original timeout.
19841990
:param include_free_connections: Whether to include free/available connections.
19851991
"""
1986-
for conn in self._in_use_connections:
1987-
if self.should_update_connection(
1988-
conn, address_type_to_match, matching_address
1989-
):
1990-
self.update_connection_settings(
1991-
conn,
1992-
state=state,
1993-
relax_timeout=relax_timeout,
1994-
reset_host_address=reset_host_address,
1995-
reset_relax_timeout=reset_relax_timeout,
1996-
)
1997-
1998-
if include_free_connections:
1999-
for conn in self._available_connections:
1992+
with self._lock:
1993+
for conn in self._in_use_connections:
20001994
if self.should_update_connection(
20011995
conn, address_type_to_match, matching_address
20021996
):
@@ -2008,6 +2002,19 @@ def update_connections_settings(
20082002
reset_relax_timeout=reset_relax_timeout,
20092003
)
20102004

2005+
if include_free_connections:
2006+
for conn in self._available_connections:
2007+
if self.should_update_connection(
2008+
conn, address_type_to_match, matching_address
2009+
):
2010+
self.update_connection_settings(
2011+
conn,
2012+
state=state,
2013+
relax_timeout=relax_timeout,
2014+
reset_host_address=reset_host_address,
2015+
reset_relax_timeout=reset_relax_timeout,
2016+
)
2017+
20112018
def update_connection_kwargs(
20122019
self,
20132020
**kwargs,
@@ -2036,11 +2043,12 @@ def update_active_connections_for_reconnect(
20362043
:param tmp_relax_timeout: The relax timeout to use for the connection.
20372044
:param moving_address_src: The address of the node that is being moved.
20382045
"""
2039-
for conn in self._in_use_connections:
2040-
if self.should_update_connection(conn, "connected", moving_address_src):
2041-
self._update_connection_for_reconnect(
2042-
conn, tmp_host_address, tmp_relax_timeout
2043-
)
2046+
with self._lock:
2047+
for conn in self._in_use_connections:
2048+
if self.should_update_connection(conn, "connected", moving_address_src):
2049+
self._update_connection_for_reconnect(
2050+
conn, tmp_host_address, tmp_relax_timeout
2051+
)
20442052

20452053
def disconnect_and_reconfigure_free_connections(
20462054
self,
@@ -2058,12 +2066,12 @@ def disconnect_and_reconfigure_free_connections(
20582066
:param tmp_relax_timeout: The relax timeout to use for the connection.
20592067
:param moving_address_src: The address of the node that is being moved.
20602068
"""
2061-
2062-
for conn in self._available_connections:
2063-
if self.should_update_connection(conn, "connected", moving_address_src):
2064-
self._disconnect_and_update_connection_for_reconnect(
2065-
conn, tmp_host_address, tmp_relax_timeout
2066-
)
2069+
with self._lock:
2070+
for conn in self._available_connections:
2071+
if self.should_update_connection(conn, "connected", moving_address_src):
2072+
self._disconnect_and_update_connection_for_reconnect(
2073+
conn, tmp_host_address, tmp_relax_timeout
2074+
)
20672075

20682076
def _update_connection_for_reconnect(
20692077
self,
@@ -2340,22 +2348,9 @@ def update_connections_settings(
23402348
"""
23412349
Override base class method to work with BlockingConnectionPool's structure.
23422350
"""
2343-
if include_free_connections:
2344-
for conn in tuple(self._connections):
2345-
if self.should_update_connection(
2346-
conn, address_type_to_match, matching_address
2347-
):
2348-
self.update_connection_settings(
2349-
conn,
2350-
state=state,
2351-
relax_timeout=relax_timeout,
2352-
reset_host_address=reset_host_address,
2353-
reset_relax_timeout=reset_relax_timeout,
2354-
)
2355-
else:
2356-
connections_in_queue = {conn for conn in self.pool.queue if conn}
2357-
for conn in self._connections:
2358-
if conn not in connections_in_queue:
2351+
with self._lock:
2352+
if include_free_connections:
2353+
for conn in tuple(self._connections):
23592354
if self.should_update_connection(
23602355
conn, address_type_to_match, matching_address
23612356
):
@@ -2366,6 +2361,20 @@ def update_connections_settings(
23662361
reset_host_address=reset_host_address,
23672362
reset_relax_timeout=reset_relax_timeout,
23682363
)
2364+
else:
2365+
connections_in_queue = {conn for conn in self.pool.queue if conn}
2366+
for conn in self._connections:
2367+
if conn not in connections_in_queue:
2368+
if self.should_update_connection(
2369+
conn, address_type_to_match, matching_address
2370+
):
2371+
self.update_connection_settings(
2372+
conn,
2373+
state=state,
2374+
relax_timeout=relax_timeout,
2375+
reset_host_address=reset_host_address,
2376+
reset_relax_timeout=reset_relax_timeout,
2377+
)
23692378

23702379
def update_active_connections_for_reconnect(
23712380
self,
@@ -2409,15 +2418,16 @@ def disconnect_and_reconfigure_free_connections(
24092418
:param tmp_relax_timeout: The relax timeout to use for the connection.
24102419
:param moving_address_src: The address of the node that is being moved.
24112420
"""
2412-
existing_connections = self.pool.queue
2413-
2414-
for conn in existing_connections:
2415-
if conn:
2416-
if moving_address_src and conn.getpeername() != moving_address_src:
2417-
continue
2418-
self._disconnect_and_update_connection_for_reconnect(
2419-
conn, tmp_host_address, tmp_relax_timeout
2420-
)
2421+
with self._lock:
2422+
existing_connections = self.pool.queue
2423+
2424+
for conn in existing_connections:
2425+
if conn:
2426+
if moving_address_src and conn.getpeername() != moving_address_src:
2427+
continue
2428+
self._disconnect_and_update_connection_for_reconnect(
2429+
conn, tmp_host_address, tmp_relax_timeout
2430+
)
24212431

24222432
def _update_maintenance_events_config_for_connections(
24232433
self, maintenance_events_config

tests/test_maintenance_events_handling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ def test_maint_handler_init_for_existing_connections(self):
503503

504504
# Verify that maintenance events are initially disabled
505505
assert existing_conn._parser.node_moving_push_handler_func is None
506-
assert not hasattr(existing_conn, "_maintenance_event_connection_handler")
506+
assert existing_conn._maintenance_event_connection_handler is None
507507
assert existing_conn._parser.maintenance_push_handler_func is None
508508

509509
# Create a new enabled configuration and set up pool handler

0 commit comments

Comments
 (0)