Skip to content

Commit 06a7ea7

Browse files
committed
Adding moving integration-like tests
1 parent 8765b9c commit 06a7ea7

File tree

4 files changed

+557
-121
lines changed

4 files changed

+557
-121
lines changed

redis/_parsers/base.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,10 @@ def __del__(self):
129129
def on_connect(self, connection):
130130
"Called when the socket connects"
131131
self._sock = connection._sock
132-
self._buffer = SocketBuffer(
133-
self._sock, self.socket_read_size, connection.socket_timeout
134-
)
132+
timeout = connection.socket_timeout
133+
if connection.tmp_relax_timeout != -1:
134+
timeout = connection.tmp_relax_timeout
135+
self._buffer = SocketBuffer(self._sock, self.socket_read_size, timeout)
135136
self.encoder = connection.encoder
136137

137138
def on_disconnect(self):
@@ -203,7 +204,7 @@ def handle_push_response(self, response, **kwargs):
203204
return self.invalidation_push_handler_func(response)
204205
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
205206
if msg_type in _MOVING_MESSAGE:
206-
host, port = response[2].split(":")
207+
host, port = response[2].decode().split(":")
207208
ttl = response[1]
208209
id = 1 # Hardcoded value for sync parser
209210
notification = NodeMovingEvent(id, host, port, ttl)

redis/connection.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,10 @@ def update_current_socket_timeout(self, relax_timeout: Optional[float] = None):
801801
f" to timeout {timeout}; relax_timeout: {relax_timeout}"
802802
)
803803
self._sock.settimeout(timeout)
804+
self.update_parser_buffer_timeout(timeout)
805+
806+
def update_parser_buffer_timeout(self, timeout: Optional[float] = None):
807+
if self._parser and self._parser._buffer:
804808
self._parser._buffer.socket_timeout = timeout
805809

806810
def update_tmp_settings(
@@ -1894,7 +1898,7 @@ def disconnect_and_reconfigure_free_connections(
18941898
def update_connections_current_timeout(
18951899
self,
18961900
relax_timeout: Optional[float],
1897-
include_available_connections: bool = False,
1901+
include_free_connections: bool = False,
18981902
):
18991903
"""
19001904
Update the timeout either for all connections in the pool or just for the ones in use.
@@ -1912,7 +1916,7 @@ def update_connections_current_timeout(
19121916
for conn in self._in_use_connections:
19131917
self._update_connection_timeout(conn, relax_timeout)
19141918

1915-
if include_available_connections:
1919+
if include_free_connections:
19161920
for conn in self._available_connections:
19171921
self._update_connection_timeout(conn, relax_timeout)
19181922

@@ -2157,8 +2161,6 @@ def update_active_connections_for_reconnect(
21572161
connections_in_queue = {conn for conn in self.pool.queue if conn}
21582162
for conn in self._connections:
21592163
if conn not in connections_in_queue:
2160-
if tmp_relax_timeout != -1:
2161-
conn.update_socket_timeout(tmp_relax_timeout)
21622164
self._update_connection_for_reconnect(
21632165
conn, tmp_host_address, tmp_relax_timeout
21642166
)
@@ -2177,14 +2179,24 @@ def disconnect_and_reconfigure_free_connections(
21772179
conn, tmp_host_address, tmp_relax_timeout
21782180
)
21792181

2180-
def update_connections_current_timeout(self, relax_timeout: Optional[float] = None):
2182+
def update_connections_current_timeout(
2183+
self,
2184+
relax_timeout: Optional[float] = None,
2185+
include_free_connections: bool = False,
2186+
):
21812187
logging.debug(
21822188
f"***** Blocking Pool --> Updating timeouts. relax_timeout: {relax_timeout}"
21832189
)
21842190

21852191
with self._lock:
2186-
for conn in tuple(self._connections):
2187-
self._update_connection_timeout(conn, relax_timeout)
2192+
if include_free_connections:
2193+
for conn in tuple(self._connections):
2194+
self._update_connection_timeout(conn, relax_timeout)
2195+
else:
2196+
connections_in_queue = {conn for conn in self.pool.queue if conn}
2197+
for conn in self._connections:
2198+
if conn not in connections_in_queue:
2199+
self._update_connection_timeout(conn, relax_timeout)
21882200

21892201
def update_connections_tmp_settings(
21902202
self,

redis/maintenance_events.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -323,25 +323,6 @@ def handle_event(self, notification: MaintenanceEvent):
323323
else:
324324
logging.error(f"Unhandled notification type: {notification}")
325325

326-
def handle_node_moved_event(self):
327-
with self._lock:
328-
self.pool.update_connection_kwargs_with_tmp_settings(
329-
tmp_host_address=None,
330-
tmp_relax_timeout=-1,
331-
)
332-
with self.pool._lock:
333-
if self.config.is_relax_timeouts_enabled():
334-
# reset the timeout for existing connections
335-
self.pool.update_connections_current_timeout(
336-
relax_timeout=-1, include_available_connections=True
337-
)
338-
logging.debug("***** MOVING END--> TIMEOUTS RESET")
339-
340-
self.pool.update_connections_tmp_settings(
341-
tmp_host_address=None, tmp_relax_timeout=-1
342-
)
343-
logging.debug("***** MOVING END--> TMP SETTINGS ADDRESS RESET")
344-
345326
def handle_node_moving_event(self, event: NodeMovingEvent):
346327
if (
347328
not self.config.proactive_reconnect
@@ -403,6 +384,26 @@ def handle_node_moving_event(self, event: NodeMovingEvent):
403384
f"###### MOVING total execution time: {execution_time_us:.0f} microseconds"
404385
)
405386

387+
def handle_node_moved_event(self):
388+
logging.debug("***** MOVING END--> Starting to revert the changes.")
389+
with self._lock:
390+
self.pool.update_connection_kwargs_with_tmp_settings(
391+
tmp_host_address=None,
392+
tmp_relax_timeout=-1,
393+
)
394+
with self.pool._lock:
395+
if self.config.is_relax_timeouts_enabled():
396+
# reset the timeout for existing connections
397+
self.pool.update_connections_current_timeout(
398+
relax_timeout=-1, include_free_connections=True
399+
)
400+
logging.debug("***** MOVING END--> TIMEOUTS RESET")
401+
402+
self.pool.update_connections_tmp_settings(
403+
tmp_host_address=None, tmp_relax_timeout=-1
404+
)
405+
logging.debug("***** MOVING END--> TMP SETTINGS ADDRESS RESET")
406+
406407

407408
class MaintenanceEventConnectionHandler:
408409
def __init__(

0 commit comments

Comments
 (0)