Skip to content

Commit 822fccd

Browse files
committed
Refactor to have less methods in pool classes and made some of the existing ones more generic
1 parent 2cdfa75 commit 822fccd

File tree

2 files changed

+171
-253
lines changed

2 files changed

+171
-253
lines changed

redis/connection.py

Lines changed: 112 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -1920,122 +1920,88 @@ def should_update_connection(
19201920
return False
19211921
return True
19221922

1923-
def set_maintenance_state_for_connections(
1923+
def update_connection_settings(
19241924
self,
1925-
state: "MaintenanceState",
1926-
matching_address: Optional[str] = None,
1927-
address_type_to_match: Literal["connected", "configured"] = "connected",
1928-
):
1929-
for conn in self._available_connections:
1930-
if self.should_update_connection(
1931-
conn, address_type_to_match, matching_address
1932-
):
1933-
conn.maintenance_state = state
1934-
for conn in self._in_use_connections:
1935-
if self.should_update_connection(
1936-
conn, address_type_to_match, matching_address
1937-
):
1938-
conn.maintenance_state = state
1939-
1940-
def set_maintenance_state_in_connection_kwargs(self, state: "MaintenanceState"):
1941-
self.connection_kwargs["maintenance_state"] = state
1942-
1943-
def add_tmp_config_to_connection_kwargs(
1944-
self,
1945-
tmp_host_address: str,
1946-
tmp_relax_timeout: Optional[float] = None,
1925+
conn: "Connection",
1926+
state: Optional["MaintenanceState"] = None,
1927+
relax_timeout: Optional[float] = None,
1928+
reset_host_address: bool = False,
1929+
reset_relax_timeout: bool = False,
19471930
):
19481931
"""
1949-
Store original connection configuration and apply temporary settings.
1950-
1951-
This method saves the current host, socket_timeout, and socket_connect_timeout values
1952-
in temporary storage fields (orig_*), then applies the provided temporary values
1953-
as the active connection configuration.
1954-
1955-
This is used when a cluster node is rebound to a different address during
1956-
maintenance operations. New connections created after this call will use the
1957-
temporary configuration until remove_tmp_config_from_connection_kwargs() is called.
1958-
1959-
When this method is called the pool will already be locked, so getting the pool
1960-
lock inside is not needed.
1961-
1962-
:param tmp_host_address: The temporary host address to use for new connections.
1963-
This parameter is required and will replace the current host.
1964-
:param tmp_relax_timeout: The temporary timeout value to use for both socket_timeout
1965-
and socket_connect_timeout. If -1 is provided, the timeout
1966-
settings are not modified (relax timeout is disabled).
1932+
Update the settings for a single connection.
19671933
"""
1968-
# Apply temporary values as active configuration
1969-
self.connection_kwargs.update({"host": tmp_host_address})
1934+
if state:
1935+
conn.maintenance_state = state
19701936

1971-
if tmp_relax_timeout != -1:
1972-
self.connection_kwargs.update(
1973-
{
1974-
"socket_timeout": tmp_relax_timeout,
1975-
"socket_connect_timeout": tmp_relax_timeout,
1976-
}
1937+
if reset_relax_timeout or reset_host_address:
1938+
conn.reset_tmp_settings(
1939+
reset_host_address=reset_host_address,
1940+
reset_relax_timeout=reset_relax_timeout,
19771941
)
19781942

1979-
def remove_tmp_config_from_connection_kwargs(self):
1980-
"""
1981-
Remove temporary configuration from connection kwargs and restore original values.
1982-
1983-
This method restores the original host address, socket timeout, and connect timeout
1984-
from their temporary storage back to the main connection kwargs, then clears the
1985-
temporary storage fields.
1986-
1987-
This is typically called when a cluster node maintenance operation is complete
1988-
and the connection should revert to its original configuration.
1943+
conn.update_current_socket_timeout(relax_timeout)
19891944

1990-
When this method is called the pool will already be locked, so getting the pool
1991-
lock inside is not needed.
1992-
"""
1993-
orig_host = self.connection_kwargs.get("orig_host_address")
1994-
orig_socket_timeout = self.connection_kwargs.get("orig_socket_timeout")
1995-
orig_connect_timeout = self.connection_kwargs.get("orig_socket_connect_timeout")
1996-
1997-
self.connection_kwargs.update(
1998-
{
1999-
"host": orig_host,
2000-
"socket_timeout": orig_socket_timeout,
2001-
"socket_connect_timeout": orig_connect_timeout,
2002-
}
2003-
)
2004-
2005-
def reset_connections_tmp_settings(
1945+
def update_connections_settings(
20061946
self,
2007-
moving_address: Optional[str] = None,
1947+
state: Optional["MaintenanceState"] = None,
1948+
relax_timeout: Optional[float] = None,
1949+
matching_address: Optional[str] = None,
1950+
address_type_to_match: Literal["connected", "configured"] = "connected",
20081951
reset_host_address: bool = False,
20091952
reset_relax_timeout: bool = False,
1953+
include_free_connections: bool = True,
20101954
):
20111955
"""
2012-
Restore original settings from temporary configuration for all connections in the pool.
1956+
Update the settings for all matching connections in the pool.
20131957
2014-
This method restores each connection's original host, socket_timeout, and socket_connect_timeout
2015-
values from their orig_* attributes back to the active connection configuration, then clears
2016-
the temporary storage attributes.
1958+
This method does not create new connections.
1959+
This method does not affect the connection kwargs.
20171960
2018-
This is used to restore connections to their original configuration after maintenance operations
2019-
that required temporary address/timeout changes are complete.
2020-
2021-
When this method is called the pool will already be locked, so getting the pool lock inside is not needed.
1961+
:param state: The maintenance state to set for the connection.
1962+
:param relax_timeout: The relax timeout to set for the connection.
1963+
:param matching_address: The address to match for the connection.
1964+
:param address_type_to_match: The type of address to match.
1965+
:param reset_host_address: Whether to reset the host address to the original address.
1966+
:param reset_relax_timeout: Whether to reset the relax timeout to the original timeout.
20221967
"""
2023-
with self._lock:
2024-
for conn in self._available_connections:
2025-
if moving_address and conn.host != moving_address:
2026-
continue
2027-
conn.reset_tmp_settings(
2028-
reset_host_address=reset_host_address,
2029-
reset_relax_timeout=reset_relax_timeout,
2030-
)
2031-
for conn in self._in_use_connections:
2032-
if moving_address and conn.host != moving_address:
2033-
continue
2034-
conn.reset_tmp_settings(
1968+
for conn in self._in_use_connections:
1969+
if self.should_update_connection(
1970+
conn, address_type_to_match, matching_address
1971+
):
1972+
self.update_connection_settings(
1973+
conn,
1974+
state=state,
1975+
relax_timeout=relax_timeout,
20351976
reset_host_address=reset_host_address,
20361977
reset_relax_timeout=reset_relax_timeout,
20371978
)
20381979

1980+
if include_free_connections:
1981+
for conn in self._available_connections:
1982+
if self.should_update_connection(
1983+
conn, address_type_to_match, matching_address
1984+
):
1985+
self.update_connection_settings(
1986+
conn,
1987+
state=state,
1988+
relax_timeout=relax_timeout,
1989+
reset_host_address=reset_host_address,
1990+
reset_relax_timeout=reset_relax_timeout,
1991+
)
1992+
1993+
def update_connection_kwargs(
1994+
self,
1995+
**kwargs,
1996+
):
1997+
"""
1998+
Update the connection kwargs for all future connections.
1999+
2000+
This method updates the connection kwargs for all future connections created by the pool.
2001+
Existing connections are not affected.
2002+
"""
2003+
self.connection_kwargs.update(kwargs)
2004+
20392005
def update_active_connections_for_reconnect(
20402006
self,
20412007
tmp_host_address: str,
@@ -2052,11 +2018,12 @@ def update_active_connections_for_reconnect(
20522018
:param tmp_relax_timeout: The relax timeout to use for the connection.
20532019
"""
20542020
for conn in self._in_use_connections:
2055-
if moving_address_src and conn.getpeername() != moving_address_src:
2056-
continue
2057-
self._update_connection_for_reconnect(
2058-
conn, tmp_host_address, tmp_relax_timeout
2059-
)
2021+
if self.should_update_connection(
2022+
conn, "connected", moving_address_src
2023+
):
2024+
self._update_connection_for_reconnect(
2025+
conn, tmp_host_address, tmp_relax_timeout
2026+
)
20602027

20612028
def disconnect_and_reconfigure_free_connections(
20622029
self,
@@ -2075,41 +2042,12 @@ def disconnect_and_reconfigure_free_connections(
20752042
"""
20762043

20772044
for conn in self._available_connections:
2078-
if moving_address_src and conn.getpeername() != moving_address_src:
2079-
continue
2080-
self._disconnect_and_update_connection_for_reconnect(
2081-
conn, tmp_host_address, tmp_relax_timeout
2082-
)
2083-
2084-
def update_connections_current_timeout(
2085-
self,
2086-
relax_timeout: Optional[float],
2087-
matching_address: Optional[str] = None,
2088-
address_type_to_match: Literal["connected", "configured"] = "connected",
2089-
include_free_connections: bool = False,
2090-
):
2091-
"""
2092-
Update the timeout either for all connections in the pool or just for the ones in use.
2093-
This is used when a cluster node is migrated to a different address.
2094-
2095-
When this method is called the pool will already be locked, so getting the pool lock inside is not needed.
2096-
2097-
:param relax_timeout: The relax timeout to use for the connection.
2098-
If -1 is provided - the relax timeout is disabled.
2099-
:param include_available_connections: Whether to include available connections in the update.
2100-
"""
2101-
for conn in self._in_use_connections:
21022045
if self.should_update_connection(
2103-
conn, address_type_to_match, matching_address
2046+
conn, "connected", moving_address_src
21042047
):
2105-
conn.update_current_socket_timeout(relax_timeout)
2106-
2107-
if include_free_connections:
2108-
for conn in self._available_connections:
2109-
if self.should_update_connection(
2110-
conn, address_type_to_match, matching_address
2111-
):
2112-
conn.update_current_socket_timeout(relax_timeout)
2048+
self._disconnect_and_update_connection_for_reconnect(
2049+
conn, tmp_host_address, tmp_relax_timeout
2050+
)
21132051

21142052
def _update_connection_for_reconnect(
21152053
self,
@@ -2373,6 +2311,46 @@ def disconnect(self):
23732311
pass
23742312
self._locked = False
23752313

2314+
def update_connections_settings(
2315+
self,
2316+
state: Optional["MaintenanceState"] = None,
2317+
relax_timeout: Optional[float] = None,
2318+
matching_address: Optional[str] = None,
2319+
address_type_to_match: Literal["connected", "configured"] = "connected",
2320+
reset_host_address: bool = False,
2321+
reset_relax_timeout: bool = False,
2322+
include_free_connections: bool = True,
2323+
):
2324+
"""
2325+
Override base class method to work with BlockingConnectionPool's structure.
2326+
"""
2327+
if include_free_connections:
2328+
for conn in tuple(self._connections):
2329+
if self.should_update_connection(
2330+
conn, address_type_to_match, matching_address
2331+
):
2332+
self.update_connection_settings(
2333+
conn,
2334+
state=state,
2335+
relax_timeout=relax_timeout,
2336+
reset_host_address=reset_host_address,
2337+
reset_relax_timeout=reset_relax_timeout,
2338+
)
2339+
else:
2340+
connections_in_queue = {conn for conn in self.pool.queue if conn}
2341+
for conn in self._connections:
2342+
if conn not in connections_in_queue:
2343+
if self.should_update_connection(
2344+
conn, address_type_to_match, matching_address
2345+
):
2346+
self.update_connection_settings(
2347+
conn,
2348+
state=state,
2349+
relax_timeout=relax_timeout,
2350+
reset_host_address=reset_host_address,
2351+
reset_relax_timeout=reset_relax_timeout,
2352+
)
2353+
23762354
def update_active_connections_for_reconnect(
23772355
self,
23782356
tmp_host_address: str,
@@ -2423,37 +2401,6 @@ def disconnect_and_reconfigure_free_connections(
24232401
conn, tmp_host_address, tmp_relax_timeout
24242402
)
24252403

2426-
def update_connections_current_timeout(
2427-
self,
2428-
relax_timeout: Optional[float] = None,
2429-
matching_address: Optional[str] = None,
2430-
address_type_to_match: Literal["connected", "configured"] = "connected",
2431-
include_free_connections: bool = False,
2432-
):
2433-
"""
2434-
Update the timeout for the current socket.
2435-
This is used when a cluster node is migrated to a different address.
2436-
2437-
When this method is called the pool will already be locked, so getting the pool lock inside is not needed.
2438-
2439-
:param relax_timeout: The relax timeout to use for the connection.
2440-
:param include_free_connections: Whether to include available connections in the update.
2441-
"""
2442-
if include_free_connections:
2443-
for conn in tuple(self._connections):
2444-
if self.should_update_connection(
2445-
conn, address_type_to_match, matching_address
2446-
):
2447-
conn.update_current_socket_timeout(relax_timeout)
2448-
else:
2449-
connections_in_queue = {conn for conn in self.pool.queue if conn}
2450-
for conn in self._connections:
2451-
if conn not in connections_in_queue:
2452-
if self.should_update_connection(
2453-
conn, address_type_to_match, matching_address
2454-
):
2455-
conn.update_current_socket_timeout(relax_timeout)
2456-
24572404
def _update_maintenance_events_config_for_connections(
24582405
self, maintenance_events_config
24592406
):
@@ -2469,25 +2416,6 @@ def _update_maintenance_events_configs_for_connections(
24692416
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
24702417
conn.maintenance_events_config = maintenance_events_pool_handler.config
24712418

2472-
def reset_connections_tmp_settings(
2473-
self,
2474-
moving_address: Optional[str] = None,
2475-
reset_host_address: bool = False,
2476-
reset_relax_timeout: bool = False,
2477-
):
2478-
"""
2479-
Override base class method to work with BlockingConnectionPool's structure.
2480-
2481-
Restore original settings from temporary configuration for all connections in the pool.
2482-
"""
2483-
for conn in tuple(self._connections):
2484-
if moving_address and conn.host != moving_address:
2485-
continue
2486-
conn.reset_tmp_settings(
2487-
reset_host_address=reset_host_address,
2488-
reset_relax_timeout=reset_relax_timeout,
2489-
)
2490-
24912419
def set_in_maintenance(self, in_maintenance: bool):
24922420
"""
24932421
Sets a flag that this Blocking ConnectionPool is in maintenance mode.
@@ -2497,14 +2425,3 @@ def set_in_maintenance(self, in_maintenance: bool):
24972425
"""
24982426
self._in_maintenance = in_maintenance
24992427

2500-
def set_maintenance_state_for_connections(
2501-
self,
2502-
state: "MaintenanceState",
2503-
matching_address: Optional[str] = None,
2504-
address_type_to_match: Literal["connected", "configured"] = "connected",
2505-
):
2506-
for conn in self._connections:
2507-
if self.should_update_connection(
2508-
conn, address_type_to_match, matching_address
2509-
):
2510-
conn.maintenance_state = state

0 commit comments

Comments
 (0)