Skip to content

Commit 91ac8cf

Browse files
committed
Fixed BlockingConnectionPool locking strategy. Removed debug logging. Refactored the maintenance events tests not to be multithreaded - we don't need it for those tests.
1 parent 06a7ea7 commit 91ac8cf

File tree

6 files changed

+329
-262
lines changed

6 files changed

+329
-262
lines changed

redis/asyncio/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,6 +1302,8 @@ def __init__(
13021302
)
13031303
self._condition = asyncio.Condition()
13041304
self.timeout = timeout
1305+
self._in_maintenance = False
1306+
self._locked = False
13051307

13061308
@deprecated_args(
13071309
args_to_warn=["*"],

redis/client.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -668,9 +668,6 @@ def _execute_command(self, *args, **options):
668668

669669
finally:
670670
if conn and conn.should_reconnect():
671-
logging.debug(
672-
f"***** Redis reconnect before exit _execute_command --> notification for {conn._sock.getpeername()}"
673-
)
674671
self._close_connection(conn)
675672
conn.connect()
676673
if self._single_connection_client:
@@ -963,9 +960,6 @@ def _execute(self, conn, command, *args, **kwargs):
963960
lambda _: self._reconnect(conn),
964961
)
965962
if conn.should_reconnect():
966-
logging.debug(
967-
f"***** PubSub --> Reconnect on notification for {conn._sock.getpeername()}"
968-
)
969963
self._reconnect(conn)
970964

971965
return response

redis/connection.py

Lines changed: 78 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,20 @@ def set_parser(self, parser_class):
430430
def set_maintenance_event_pool_handler(
431431
self, maintenance_event_pool_handler: MaintenanceEventPoolHandler
432432
):
433-
self._parser.set_node_moving_push_handler(maintenance_event_pool_handler)
433+
self._parser.set_node_moving_push_handler(
434+
maintenance_event_pool_handler.handle_event
435+
)
436+
437+
# Initialize maintenance event connection handler if it doesn't exist
438+
if not hasattr(self, "_maintenance_event_connection_handler"):
439+
self._maintenance_event_connection_handler = (
440+
MaintenanceEventConnectionHandler(
441+
self, maintenance_event_pool_handler.config
442+
)
443+
)
444+
self._parser.set_maintenance_push_handler(
445+
self._maintenance_event_connection_handler.handle_event
446+
)
434447

435448
def connect(self):
436449
"Connects to the Redis server if not already connected"
@@ -796,10 +809,6 @@ def should_reconnect(self):
796809
def update_current_socket_timeout(self, relax_timeout: Optional[float] = None):
797810
if self._sock:
798811
timeout = relax_timeout if relax_timeout != -1 else self.socket_timeout
799-
logging.debug(
800-
f"***** Connection --> Updating timeout for {self._sock.getpeername()}"
801-
f" to timeout {timeout}; relax_timeout: {relax_timeout}"
802-
)
803812
self._sock.settimeout(timeout)
804813
self.update_parser_buffer_timeout(timeout)
805814

@@ -852,10 +861,6 @@ def _connect(self):
852861
# ipv4/ipv6, but we want to set options prior to calling
853862
# socket.connect()
854863
err = None
855-
if self.tmp_host_address is not None:
856-
logging.debug(
857-
f"***** Connection --> Using tmp_host_address: {self.tmp_host_address}"
858-
)
859864
host = self.tmp_host_address or self.host
860865

861866
for res in socket.getaddrinfo(
@@ -876,31 +881,18 @@ def _connect(self):
876881

877882
# set the socket_connect_timeout before we connect
878883
if self.tmp_relax_timeout != -1:
879-
logging.debug(
880-
f"***** Connection connect --> Using relax_timeout: {self.tmp_relax_timeout}"
881-
)
882884
sock.settimeout(self.tmp_relax_timeout)
883885
else:
884-
logging.debug(
885-
f"***** Connection connect --> Using default socket_connect_timeout: {self.socket_connect_timeout}"
886-
)
887886
sock.settimeout(self.socket_connect_timeout)
888887

889888
# connect
890889
sock.connect(socket_address)
891890

892891
# set the socket_timeout now that we're connected
893892
if self.tmp_relax_timeout != -1:
894-
logging.debug(
895-
f"***** Connection --> Using relax_timeout: {self.tmp_relax_timeout}"
896-
)
897893
sock.settimeout(self.tmp_relax_timeout)
898894
else:
899-
logging.debug(
900-
f"***** Connection --> Using default socket_timeout: {self.socket_timeout}"
901-
)
902895
sock.settimeout(self.socket_timeout)
903-
logging.debug(f"Connected to {sock.getpeername()}")
904896
return sock
905897

906898
except OSError as _:
@@ -1599,14 +1591,10 @@ def _update_maintenance_events_configs_for_connections(
15991591
):
16001592
with self._lock:
16011593
for conn in self._available_connections:
1602-
conn.set_maintenance_events_pool_handler(
1603-
maintenance_events_pool_handler
1604-
)
1594+
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
16051595
conn.maintenance_events_config = maintenance_events_pool_handler.config
16061596
for conn in self._in_use_connections:
1607-
conn.set_maintenance_events_pool_handler(
1608-
maintenance_events_pool_handler
1609-
)
1597+
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
16101598
conn.maintenance_events_config = maintenance_events_pool_handler.config
16111599

16121600
def reset(self) -> None:
@@ -1748,9 +1736,6 @@ def release(self, connection: "Connection") -> None:
17481736

17491737
if self.owns_connection(connection):
17501738
if connection.should_reconnect():
1751-
logging.debug(
1752-
f"***** Pool--> disconnecting in release {connection._sock.getpeername()}"
1753-
)
17541739
connection.disconnect()
17551740
self._available_connections.append(connection)
17561741
self._event_dispatcher.dispatch(
@@ -1910,21 +1895,13 @@ def update_connections_current_timeout(
19101895
If -1 is provided - the relax timeout is disabled.
19111896
:param include_available_connections: Whether to include available connections in the update.
19121897
"""
1913-
logging.debug(f"***** Pool --> Updating timeouts. New value: {relax_timeout}")
1914-
start_time = time.time()
1915-
19161898
for conn in self._in_use_connections:
19171899
self._update_connection_timeout(conn, relax_timeout)
19181900

19191901
if include_free_connections:
19201902
for conn in self._available_connections:
19211903
self._update_connection_timeout(conn, relax_timeout)
19221904

1923-
execution_time_us = (time.time() - start_time) * 1000000
1924-
logging.error(
1925-
f"###### TIMEOUTS execution time: {execution_time_us:.0f} microseconds"
1926-
)
1927-
19281905
def _update_connection_for_reconnect(
19291906
self,
19301907
connection: "Connection",
@@ -2014,6 +1991,8 @@ def __init__(
20141991
):
20151992
self.queue_class = queue_class
20161993
self.timeout = timeout
1994+
self._in_maintenance = False
1995+
self._locked = False
20171996
super().__init__(
20181997
connection_class=connection_class,
20191998
max_connections=max_connections,
@@ -2022,7 +2001,10 @@ def __init__(
20222001

20232002
def reset(self):
20242003
# Create and fill up a thread safe queue with ``None`` values.
2025-
with self._lock:
2004+
try:
2005+
if self._in_maintenance:
2006+
self._lock.acquire()
2007+
self._locked = True
20262008
self.pool = self.queue_class(self.max_connections)
20272009
while True:
20282010
try:
@@ -2033,6 +2015,13 @@ def reset(self):
20332015
# Keep a list of actual connection instances so that we can
20342016
# disconnect them later.
20352017
self._connections = []
2018+
finally:
2019+
if self._locked:
2020+
try:
2021+
self._lock.release()
2022+
except Exception:
2023+
pass
2024+
self._locked = False
20362025

20372026
# this must be the last operation in this method. while reset() is
20382027
# called when holding _fork_lock, other threads in this process
@@ -2047,7 +2036,10 @@ def reset(self):
20472036

20482037
def make_connection(self):
20492038
"Make a fresh connection."
2050-
with self._lock:
2039+
try:
2040+
if self._in_maintenance:
2041+
self._lock.acquire()
2042+
self._locked = True
20512043
if self.cache is not None:
20522044
connection = CacheProxyConnection(
20532045
self.connection_class(**self.connection_kwargs),
@@ -2059,6 +2051,13 @@ def make_connection(self):
20592051

20602052
self._connections.append(connection)
20612053
return connection
2054+
finally:
2055+
if self._locked:
2056+
try:
2057+
self._lock.release()
2058+
except Exception:
2059+
pass
2060+
self._locked = False
20622061

20632062
@deprecated_args(
20642063
args_to_warn=["*"],
@@ -2083,7 +2082,10 @@ def get_connection(self, command_name=None, *keys, **options):
20832082
# Try and get a connection from the pool. If one isn't available within
20842083
# self.timeout then raise a ``ConnectionError``.
20852084
connection = None
2086-
with self._lock:
2085+
try:
2086+
if self._in_maintenance:
2087+
self._lock.acquire()
2088+
self._locked = True
20872089
try:
20882090
connection = self.pool.get(block=True, timeout=self.timeout)
20892091
except Empty:
@@ -2095,6 +2097,13 @@ def get_connection(self, command_name=None, *keys, **options):
20952097
# a new connection to add to the pool.
20962098
if connection is None:
20972099
connection = self.make_connection()
2100+
finally:
2101+
if self._locked:
2102+
try:
2103+
self._lock.release()
2104+
except Exception:
2105+
pass
2106+
self._locked = False
20982107

20992108
try:
21002109
# ensure this connection is connected to Redis
@@ -2123,7 +2132,10 @@ def release(self, connection):
21232132
# Make sure we haven't changed process.
21242133
self._checkpid()
21252134

2126-
with self._lock:
2135+
try:
2136+
if self._in_maintenance:
2137+
self._lock.acquire()
2138+
self._locked = True
21272139
if not self.owns_connection(connection):
21282140
# pool doesn't own this connection. do not add it back
21292141
# to the pool. instead add a None value which is a placeholder
@@ -2133,24 +2145,39 @@ def release(self, connection):
21332145
self.pool.put_nowait(None)
21342146
return
21352147
if connection.should_reconnect():
2136-
logging.debug(
2137-
f"***** Blocking Pool--> disconnecting in release {connection._sock.getpeername()}"
2138-
)
21392148
connection.disconnect()
21402149
# Put the connection back into the pool.
21412150
try:
2151+
print("Releasing connection - in the pool")
21422152
self.pool.put_nowait(connection)
21432153
except Full:
21442154
# perhaps the pool has been reset() after a fork? regardless,
21452155
# we don't want this connection
21462156
pass
2157+
finally:
2158+
if self._locked:
2159+
try:
2160+
self._lock.release()
2161+
except Exception:
2162+
pass
2163+
self._locked = False
21472164

21482165
def disconnect(self):
21492166
"Disconnects all connections in the pool."
21502167
self._checkpid()
2151-
with self._lock:
2168+
try:
2169+
if self._in_maintenance:
2170+
self._lock.acquire()
2171+
self._locked = True
21522172
for connection in self._connections:
21532173
connection.disconnect()
2174+
finally:
2175+
if self._locked:
2176+
try:
2177+
self._lock.release()
2178+
except Exception:
2179+
pass
2180+
self._locked = False
21542181

21552182
def update_active_connections_for_reconnect(
21562183
self,
@@ -2229,3 +2256,7 @@ def _update_maintenance_events_configs_for_connections(
22292256
conn.maintenance_events_config = (
22302257
maintenance_events_pool_handler.config
22312258
)
2259+
2260+
def set_in_maintenance(self, in_maintenance: bool):
2261+
"""Set the maintenance mode for the connection pool."""
2262+
self._in_maintenance = in_maintenance

0 commit comments

Comments
 (0)