Skip to content

Commit bed2e40

Browse files
committed
Fixed BlockingConnectionPool locking strategy. Removed debug logging. Refactored the maintenance events tests not to be multithreaded - we don't need it for those tests.
1 parent 7b57a22 commit bed2e40

File tree

6 files changed

+329
-262
lines changed

6 files changed

+329
-262
lines changed

redis/asyncio/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,8 @@ def __init__(
13081308
)
13091309
self._condition = asyncio.Condition()
13101310
self.timeout = timeout
1311+
self._in_maintenance = False
1312+
self._locked = False
13111313

13121314
@deprecated_args(
13131315
args_to_warn=["*"],

redis/client.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -668,9 +668,6 @@ def _execute_command(self, *args, **options):
668668

669669
finally:
670670
if conn and conn.should_reconnect():
671-
logging.debug(
672-
f"***** Redis reconnect before exit _execute_command --> notification for {conn._sock.getpeername()}"
673-
)
674671
self._close_connection(conn)
675672
conn.connect()
676673
if self._single_connection_client:
@@ -963,9 +960,6 @@ def _execute(self, conn, command, *args, **kwargs):
963960
lambda _: self._reconnect(conn),
964961
)
965962
if conn.should_reconnect():
966-
logging.debug(
967-
f"***** PubSub --> Reconnect on notification for {conn._sock.getpeername()}"
968-
)
969963
self._reconnect(conn)
970964

971965
return response

redis/connection.py

Lines changed: 78 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,20 @@ def set_parser(self, parser_class):
431431
def set_maintenance_event_pool_handler(
432432
self, maintenance_event_pool_handler: MaintenanceEventPoolHandler
433433
):
434-
self._parser.set_node_moving_push_handler(maintenance_event_pool_handler)
434+
self._parser.set_node_moving_push_handler(
435+
maintenance_event_pool_handler.handle_event
436+
)
437+
438+
# Initialize maintenance event connection handler if it doesn't exist
439+
if not hasattr(self, "_maintenance_event_connection_handler"):
440+
self._maintenance_event_connection_handler = (
441+
MaintenanceEventConnectionHandler(
442+
self, maintenance_event_pool_handler.config
443+
)
444+
)
445+
self._parser.set_maintenance_push_handler(
446+
self._maintenance_event_connection_handler.handle_event
447+
)
435448

436449
def connect(self):
437450
"Connects to the Redis server if not already connected"
@@ -802,10 +815,6 @@ def should_reconnect(self):
802815
def update_current_socket_timeout(self, relax_timeout: Optional[float] = None):
803816
if self._sock:
804817
timeout = relax_timeout if relax_timeout != -1 else self.socket_timeout
805-
logging.debug(
806-
f"***** Connection --> Updating timeout for {self._sock.getpeername()}"
807-
f" to timeout {timeout}; relax_timeout: {relax_timeout}"
808-
)
809818
self._sock.settimeout(timeout)
810819
self.update_parser_buffer_timeout(timeout)
811820

@@ -858,10 +867,6 @@ def _connect(self):
858867
# ipv4/ipv6, but we want to set options prior to calling
859868
# socket.connect()
860869
err = None
861-
if self.tmp_host_address is not None:
862-
logging.debug(
863-
f"***** Connection --> Using tmp_host_address: {self.tmp_host_address}"
864-
)
865870
host = self.tmp_host_address or self.host
866871

867872
for res in socket.getaddrinfo(
@@ -882,31 +887,18 @@ def _connect(self):
882887

883888
# set the socket_connect_timeout before we connect
884889
if self.tmp_relax_timeout != -1:
885-
logging.debug(
886-
f"***** Connection connect --> Using relax_timeout: {self.tmp_relax_timeout}"
887-
)
888890
sock.settimeout(self.tmp_relax_timeout)
889891
else:
890-
logging.debug(
891-
f"***** Connection connect --> Using default socket_connect_timeout: {self.socket_connect_timeout}"
892-
)
893892
sock.settimeout(self.socket_connect_timeout)
894893

895894
# connect
896895
sock.connect(socket_address)
897896

898897
# set the socket_timeout now that we're connected
899898
if self.tmp_relax_timeout != -1:
900-
logging.debug(
901-
f"***** Connection --> Using relax_timeout: {self.tmp_relax_timeout}"
902-
)
903899
sock.settimeout(self.tmp_relax_timeout)
904900
else:
905-
logging.debug(
906-
f"***** Connection --> Using default socket_timeout: {self.socket_timeout}"
907-
)
908901
sock.settimeout(self.socket_timeout)
909-
logging.debug(f"Connected to {sock.getpeername()}")
910902
return sock
911903

912904
except OSError as _:
@@ -1606,14 +1598,10 @@ def _update_maintenance_events_configs_for_connections(
16061598
):
16071599
with self._lock:
16081600
for conn in self._available_connections:
1609-
conn.set_maintenance_events_pool_handler(
1610-
maintenance_events_pool_handler
1611-
)
1601+
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
16121602
conn.maintenance_events_config = maintenance_events_pool_handler.config
16131603
for conn in self._in_use_connections:
1614-
conn.set_maintenance_events_pool_handler(
1615-
maintenance_events_pool_handler
1616-
)
1604+
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
16171605
conn.maintenance_events_config = maintenance_events_pool_handler.config
16181606

16191607
def reset(self) -> None:
@@ -1755,9 +1743,6 @@ def release(self, connection: "Connection") -> None:
17551743

17561744
if self.owns_connection(connection):
17571745
if connection.should_reconnect():
1758-
logging.debug(
1759-
f"***** Pool--> disconnecting in release {connection._sock.getpeername()}"
1760-
)
17611746
connection.disconnect()
17621747
self._available_connections.append(connection)
17631748
self._event_dispatcher.dispatch(
@@ -1917,21 +1902,13 @@ def update_connections_current_timeout(
19171902
If -1 is provided - the relax timeout is disabled.
19181903
:param include_available_connections: Whether to include available connections in the update.
19191904
"""
1920-
logging.debug(f"***** Pool --> Updating timeouts. New value: {relax_timeout}")
1921-
start_time = time.time()
1922-
19231905
for conn in self._in_use_connections:
19241906
self._update_connection_timeout(conn, relax_timeout)
19251907

19261908
if include_free_connections:
19271909
for conn in self._available_connections:
19281910
self._update_connection_timeout(conn, relax_timeout)
19291911

1930-
execution_time_us = (time.time() - start_time) * 1000000
1931-
logging.error(
1932-
f"###### TIMEOUTS execution time: {execution_time_us:.0f} microseconds"
1933-
)
1934-
19351912
def _update_connection_for_reconnect(
19361913
self,
19371914
connection: "Connection",
@@ -2021,6 +1998,8 @@ def __init__(
20211998
):
20221999
self.queue_class = queue_class
20232000
self.timeout = timeout
2001+
self._in_maintenance = False
2002+
self._locked = False
20242003
super().__init__(
20252004
connection_class=connection_class,
20262005
max_connections=max_connections,
@@ -2029,7 +2008,10 @@ def __init__(
20292008

20302009
def reset(self):
20312010
# Create and fill up a thread safe queue with ``None`` values.
2032-
with self._lock:
2011+
try:
2012+
if self._in_maintenance:
2013+
self._lock.acquire()
2014+
self._locked = True
20332015
self.pool = self.queue_class(self.max_connections)
20342016
while True:
20352017
try:
@@ -2040,6 +2022,13 @@ def reset(self):
20402022
# Keep a list of actual connection instances so that we can
20412023
# disconnect them later.
20422024
self._connections = []
2025+
finally:
2026+
if self._locked:
2027+
try:
2028+
self._lock.release()
2029+
except Exception:
2030+
pass
2031+
self._locked = False
20432032

20442033
# this must be the last operation in this method. while reset() is
20452034
# called when holding _fork_lock, other threads in this process
@@ -2054,7 +2043,10 @@ def reset(self):
20542043

20552044
def make_connection(self):
20562045
"Make a fresh connection."
2057-
with self._lock:
2046+
try:
2047+
if self._in_maintenance:
2048+
self._lock.acquire()
2049+
self._locked = True
20582050
if self.cache is not None:
20592051
connection = CacheProxyConnection(
20602052
self.connection_class(**self.connection_kwargs),
@@ -2066,6 +2058,13 @@ def make_connection(self):
20662058

20672059
self._connections.append(connection)
20682060
return connection
2061+
finally:
2062+
if self._locked:
2063+
try:
2064+
self._lock.release()
2065+
except Exception:
2066+
pass
2067+
self._locked = False
20692068

20702069
@deprecated_args(
20712070
args_to_warn=["*"],
@@ -2090,7 +2089,10 @@ def get_connection(self, command_name=None, *keys, **options):
20902089
# Try and get a connection from the pool. If one isn't available within
20912090
# self.timeout then raise a ``ConnectionError``.
20922091
connection = None
2093-
with self._lock:
2092+
try:
2093+
if self._in_maintenance:
2094+
self._lock.acquire()
2095+
self._locked = True
20942096
try:
20952097
connection = self.pool.get(block=True, timeout=self.timeout)
20962098
except Empty:
@@ -2102,6 +2104,13 @@ def get_connection(self, command_name=None, *keys, **options):
21022104
# a new connection to add to the pool.
21032105
if connection is None:
21042106
connection = self.make_connection()
2107+
finally:
2108+
if self._locked:
2109+
try:
2110+
self._lock.release()
2111+
except Exception:
2112+
pass
2113+
self._locked = False
21052114

21062115
try:
21072116
# ensure this connection is connected to Redis
@@ -2130,7 +2139,10 @@ def release(self, connection):
21302139
# Make sure we haven't changed process.
21312140
self._checkpid()
21322141

2133-
with self._lock:
2142+
try:
2143+
if self._in_maintenance:
2144+
self._lock.acquire()
2145+
self._locked = True
21342146
if not self.owns_connection(connection):
21352147
# pool doesn't own this connection. do not add it back
21362148
# to the pool. instead add a None value which is a placeholder
@@ -2140,24 +2152,39 @@ def release(self, connection):
21402152
self.pool.put_nowait(None)
21412153
return
21422154
if connection.should_reconnect():
2143-
logging.debug(
2144-
f"***** Blocking Pool--> disconnecting in release {connection._sock.getpeername()}"
2145-
)
21462155
connection.disconnect()
21472156
# Put the connection back into the pool.
21482157
try:
2158+
print("Releasing connection - in the pool")
21492159
self.pool.put_nowait(connection)
21502160
except Full:
21512161
# perhaps the pool has been reset() after a fork? regardless,
21522162
# we don't want this connection
21532163
pass
2164+
finally:
2165+
if self._locked:
2166+
try:
2167+
self._lock.release()
2168+
except Exception:
2169+
pass
2170+
self._locked = False
21542171

21552172
def disconnect(self):
21562173
"Disconnects all connections in the pool."
21572174
self._checkpid()
2158-
with self._lock:
2175+
try:
2176+
if self._in_maintenance:
2177+
self._lock.acquire()
2178+
self._locked = True
21592179
for connection in self._connections:
21602180
connection.disconnect()
2181+
finally:
2182+
if self._locked:
2183+
try:
2184+
self._lock.release()
2185+
except Exception:
2186+
pass
2187+
self._locked = False
21612188

21622189
def update_active_connections_for_reconnect(
21632190
self,
@@ -2236,3 +2263,7 @@ def _update_maintenance_events_configs_for_connections(
22362263
conn.maintenance_events_config = (
22372264
maintenance_events_pool_handler.config
22382265
)
2266+
2267+
def set_in_maintenance(self, in_maintenance: bool):
2268+
"""Set the maintenance mode for the connection pool."""
2269+
self._in_maintenance = in_maintenance

0 commit comments

Comments
 (0)