Skip to content

Commit 7b57a22

Browse files
committed
Adding moving integration-like tests
1 parent 8691475 commit 7b57a22

File tree

4 files changed

+557
-121
lines changed

4 files changed

+557
-121
lines changed

redis/_parsers/base.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,10 @@ def __del__(self):
129129
def on_connect(self, connection):
130130
"Called when the socket connects"
131131
self._sock = connection._sock
132-
self._buffer = SocketBuffer(
133-
self._sock, self.socket_read_size, connection.socket_timeout
134-
)
132+
timeout = connection.socket_timeout
133+
if connection.tmp_relax_timeout != -1:
134+
timeout = connection.tmp_relax_timeout
135+
self._buffer = SocketBuffer(self._sock, self.socket_read_size, timeout)
135136
self.encoder = connection.encoder
136137

137138
def on_disconnect(self):
@@ -203,7 +204,7 @@ def handle_push_response(self, response, **kwargs):
203204
return self.invalidation_push_handler_func(response)
204205
if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
205206
if msg_type in _MOVING_MESSAGE:
206-
host, port = response[2].split(":")
207+
host, port = response[2].decode().split(":")
207208
ttl = response[1]
208209
id = 1 # Hardcoded value for sync parser
209210
notification = NodeMovingEvent(id, host, port, ttl)

redis/connection.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,10 @@ def update_current_socket_timeout(self, relax_timeout: Optional[float] = None):
807807
f" to timeout {timeout}; relax_timeout: {relax_timeout}"
808808
)
809809
self._sock.settimeout(timeout)
810+
self.update_parser_buffer_timeout(timeout)
811+
812+
def update_parser_buffer_timeout(self, timeout: Optional[float] = None):
813+
if self._parser and self._parser._buffer:
810814
self._parser._buffer.socket_timeout = timeout
811815

812816
def update_tmp_settings(
@@ -1901,7 +1905,7 @@ def disconnect_and_reconfigure_free_connections(
19011905
def update_connections_current_timeout(
19021906
self,
19031907
relax_timeout: Optional[float],
1904-
include_available_connections: bool = False,
1908+
include_free_connections: bool = False,
19051909
):
19061910
"""
19071911
Update the timeout either for all connections in the pool or just for the ones in use.
@@ -1919,7 +1923,7 @@ def update_connections_current_timeout(
19191923
for conn in self._in_use_connections:
19201924
self._update_connection_timeout(conn, relax_timeout)
19211925

1922-
if include_available_connections:
1926+
if include_free_connections:
19231927
for conn in self._available_connections:
19241928
self._update_connection_timeout(conn, relax_timeout)
19251929

@@ -2164,8 +2168,6 @@ def update_active_connections_for_reconnect(
21642168
connections_in_queue = {conn for conn in self.pool.queue if conn}
21652169
for conn in self._connections:
21662170
if conn not in connections_in_queue:
2167-
if tmp_relax_timeout != -1:
2168-
conn.update_socket_timeout(tmp_relax_timeout)
21692171
self._update_connection_for_reconnect(
21702172
conn, tmp_host_address, tmp_relax_timeout
21712173
)
@@ -2184,14 +2186,24 @@ def disconnect_and_reconfigure_free_connections(
21842186
conn, tmp_host_address, tmp_relax_timeout
21852187
)
21862188

2187-
def update_connections_current_timeout(self, relax_timeout: Optional[float] = None):
2189+
def update_connections_current_timeout(
2190+
self,
2191+
relax_timeout: Optional[float] = None,
2192+
include_free_connections: bool = False,
2193+
):
21882194
logging.debug(
21892195
f"***** Blocking Pool --> Updating timeouts. relax_timeout: {relax_timeout}"
21902196
)
21912197

21922198
with self._lock:
2193-
for conn in tuple(self._connections):
2194-
self._update_connection_timeout(conn, relax_timeout)
2199+
if include_free_connections:
2200+
for conn in tuple(self._connections):
2201+
self._update_connection_timeout(conn, relax_timeout)
2202+
else:
2203+
connections_in_queue = {conn for conn in self.pool.queue if conn}
2204+
for conn in self._connections:
2205+
if conn not in connections_in_queue:
2206+
self._update_connection_timeout(conn, relax_timeout)
21952207

21962208
def update_connections_tmp_settings(
21972209
self,

redis/maintenance_events.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -323,25 +323,6 @@ def handle_event(self, notification: MaintenanceEvent):
323323
else:
324324
logging.error(f"Unhandled notification type: {notification}")
325325

326-
def handle_node_moved_event(self):
327-
with self._lock:
328-
self.pool.update_connection_kwargs_with_tmp_settings(
329-
tmp_host_address=None,
330-
tmp_relax_timeout=-1,
331-
)
332-
with self.pool._lock:
333-
if self.config.is_relax_timeouts_enabled():
334-
# reset the timeout for existing connections
335-
self.pool.update_connections_current_timeout(
336-
relax_timeout=-1, include_available_connections=True
337-
)
338-
logging.debug("***** MOVING END--> TIMEOUTS RESET")
339-
340-
self.pool.update_connections_tmp_settings(
341-
tmp_host_address=None, tmp_relax_timeout=-1
342-
)
343-
logging.debug("***** MOVING END--> TMP SETTINGS ADDRESS RESET")
344-
345326
def handle_node_moving_event(self, event: NodeMovingEvent):
346327
if (
347328
not self.config.proactive_reconnect
@@ -403,6 +384,26 @@ def handle_node_moving_event(self, event: NodeMovingEvent):
403384
f"###### MOVING total execution time: {execution_time_us:.0f} microseconds"
404385
)
405386

387+
def handle_node_moved_event(self):
388+
logging.debug("***** MOVING END--> Starting to revert the changes.")
389+
with self._lock:
390+
self.pool.update_connection_kwargs_with_tmp_settings(
391+
tmp_host_address=None,
392+
tmp_relax_timeout=-1,
393+
)
394+
with self.pool._lock:
395+
if self.config.is_relax_timeouts_enabled():
396+
# reset the timeout for existing connections
397+
self.pool.update_connections_current_timeout(
398+
relax_timeout=-1, include_free_connections=True
399+
)
400+
logging.debug("***** MOVING END--> TIMEOUTS RESET")
401+
402+
self.pool.update_connections_tmp_settings(
403+
tmp_host_address=None, tmp_relax_timeout=-1
404+
)
405+
logging.debug("***** MOVING END--> TMP SETTINGS ADDRESS RESET")
406+
406407

407408
class MaintenanceEventConnectionHandler:
408409
def __init__(

0 commit comments

Comments
 (0)