@@ -445,6 +445,8 @@ def __init__(
445
445
if orig_socket_connect_timeout
446
446
else self .socket_connect_timeout
447
447
)
448
+ else :
449
+ self ._maintenance_event_connection_handler = None
448
450
self ._should_reconnect = False
449
451
self .maintenance_state = maintenance_state
450
452
@@ -511,8 +513,8 @@ def set_maintenance_event_pool_handler(
511
513
maintenance_event_pool_handler .handle_event
512
514
)
513
515
514
- # Initialize maintenance event connection handler if it doesn't exist
515
- if not hasattr ( self , " _maintenance_event_connection_handler" ) :
516
+ # Update maintenance event connection handler if it doesn't exist
517
+ if not self . _maintenance_event_connection_handler :
516
518
self ._maintenance_event_connection_handler = (
517
519
MaintenanceEventConnectionHandler (
518
520
self , maintenance_event_pool_handler .config
@@ -521,6 +523,10 @@ def set_maintenance_event_pool_handler(
521
523
self ._parser .set_maintenance_push_handler (
522
524
self ._maintenance_event_connection_handler .handle_event
523
525
)
526
+ else :
527
+ self ._maintenance_event_connection_handler .config = (
528
+ maintenance_event_pool_handler .config
529
+ )
524
530
525
531
def connect (self ):
526
532
"Connects to the Redis server if not already connected"
@@ -1983,20 +1989,8 @@ def update_connections_settings(
1983
1989
:param reset_relax_timeout: Whether to reset the relax timeout to the original timeout.
1984
1990
:param include_free_connections: Whether to include free/available connections.
1985
1991
"""
1986
- for conn in self ._in_use_connections :
1987
- if self .should_update_connection (
1988
- conn , address_type_to_match , matching_address
1989
- ):
1990
- self .update_connection_settings (
1991
- conn ,
1992
- state = state ,
1993
- relax_timeout = relax_timeout ,
1994
- reset_host_address = reset_host_address ,
1995
- reset_relax_timeout = reset_relax_timeout ,
1996
- )
1997
-
1998
- if include_free_connections :
1999
- for conn in self ._available_connections :
1992
+ with self ._lock :
1993
+ for conn in self ._in_use_connections :
2000
1994
if self .should_update_connection (
2001
1995
conn , address_type_to_match , matching_address
2002
1996
):
@@ -2008,6 +2002,19 @@ def update_connections_settings(
2008
2002
reset_relax_timeout = reset_relax_timeout ,
2009
2003
)
2010
2004
2005
+ if include_free_connections :
2006
+ for conn in self ._available_connections :
2007
+ if self .should_update_connection (
2008
+ conn , address_type_to_match , matching_address
2009
+ ):
2010
+ self .update_connection_settings (
2011
+ conn ,
2012
+ state = state ,
2013
+ relax_timeout = relax_timeout ,
2014
+ reset_host_address = reset_host_address ,
2015
+ reset_relax_timeout = reset_relax_timeout ,
2016
+ )
2017
+
2011
2018
def update_connection_kwargs (
2012
2019
self ,
2013
2020
** kwargs ,
@@ -2036,11 +2043,12 @@ def update_active_connections_for_reconnect(
2036
2043
:param tmp_relax_timeout: The relax timeout to use for the connection.
2037
2044
:param moving_address_src: The address of the node that is being moved.
2038
2045
"""
2039
- for conn in self ._in_use_connections :
2040
- if self .should_update_connection (conn , "connected" , moving_address_src ):
2041
- self ._update_connection_for_reconnect (
2042
- conn , tmp_host_address , tmp_relax_timeout
2043
- )
2046
+ with self ._lock :
2047
+ for conn in self ._in_use_connections :
2048
+ if self .should_update_connection (conn , "connected" , moving_address_src ):
2049
+ self ._update_connection_for_reconnect (
2050
+ conn , tmp_host_address , tmp_relax_timeout
2051
+ )
2044
2052
2045
2053
def disconnect_and_reconfigure_free_connections (
2046
2054
self ,
@@ -2058,12 +2066,12 @@ def disconnect_and_reconfigure_free_connections(
2058
2066
:param tmp_relax_timeout: The relax timeout to use for the connection.
2059
2067
:param moving_address_src: The address of the node that is being moved.
2060
2068
"""
2061
-
2062
- for conn in self ._available_connections :
2063
- if self .should_update_connection (conn , "connected" , moving_address_src ):
2064
- self ._disconnect_and_update_connection_for_reconnect (
2065
- conn , tmp_host_address , tmp_relax_timeout
2066
- )
2069
+ with self . _lock :
2070
+ for conn in self ._available_connections :
2071
+ if self .should_update_connection (conn , "connected" , moving_address_src ):
2072
+ self ._disconnect_and_update_connection_for_reconnect (
2073
+ conn , tmp_host_address , tmp_relax_timeout
2074
+ )
2067
2075
2068
2076
def _update_connection_for_reconnect (
2069
2077
self ,
@@ -2340,22 +2348,9 @@ def update_connections_settings(
2340
2348
"""
2341
2349
Override base class method to work with BlockingConnectionPool's structure.
2342
2350
"""
2343
- if include_free_connections :
2344
- for conn in tuple (self ._connections ):
2345
- if self .should_update_connection (
2346
- conn , address_type_to_match , matching_address
2347
- ):
2348
- self .update_connection_settings (
2349
- conn ,
2350
- state = state ,
2351
- relax_timeout = relax_timeout ,
2352
- reset_host_address = reset_host_address ,
2353
- reset_relax_timeout = reset_relax_timeout ,
2354
- )
2355
- else :
2356
- connections_in_queue = {conn for conn in self .pool .queue if conn }
2357
- for conn in self ._connections :
2358
- if conn not in connections_in_queue :
2351
+ with self ._lock :
2352
+ if include_free_connections :
2353
+ for conn in tuple (self ._connections ):
2359
2354
if self .should_update_connection (
2360
2355
conn , address_type_to_match , matching_address
2361
2356
):
@@ -2366,6 +2361,20 @@ def update_connections_settings(
2366
2361
reset_host_address = reset_host_address ,
2367
2362
reset_relax_timeout = reset_relax_timeout ,
2368
2363
)
2364
+ else :
2365
+ connections_in_queue = {conn for conn in self .pool .queue if conn }
2366
+ for conn in self ._connections :
2367
+ if conn not in connections_in_queue :
2368
+ if self .should_update_connection (
2369
+ conn , address_type_to_match , matching_address
2370
+ ):
2371
+ self .update_connection_settings (
2372
+ conn ,
2373
+ state = state ,
2374
+ relax_timeout = relax_timeout ,
2375
+ reset_host_address = reset_host_address ,
2376
+ reset_relax_timeout = reset_relax_timeout ,
2377
+ )
2369
2378
2370
2379
def update_active_connections_for_reconnect (
2371
2380
self ,
@@ -2409,15 +2418,16 @@ def disconnect_and_reconfigure_free_connections(
2409
2418
:param tmp_relax_timeout: The relax timeout to use for the connection.
2410
2419
:param moving_address_src: The address of the node that is being moved.
2411
2420
"""
2412
- existing_connections = self .pool .queue
2413
-
2414
- for conn in existing_connections :
2415
- if conn :
2416
- if moving_address_src and conn .getpeername () != moving_address_src :
2417
- continue
2418
- self ._disconnect_and_update_connection_for_reconnect (
2419
- conn , tmp_host_address , tmp_relax_timeout
2420
- )
2421
+ with self ._lock :
2422
+ existing_connections = self .pool .queue
2423
+
2424
+ for conn in existing_connections :
2425
+ if conn :
2426
+ if moving_address_src and conn .getpeername () != moving_address_src :
2427
+ continue
2428
+ self ._disconnect_and_update_connection_for_reconnect (
2429
+ conn , tmp_host_address , tmp_relax_timeout
2430
+ )
2421
2431
2422
2432
def _update_maintenance_events_config_for_connections (
2423
2433
self , maintenance_events_config
0 commit comments