From 101869297b5aba87b54ed51e380ffd0334348254 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Thu, 28 Aug 2025 09:54:07 +0300 Subject: [PATCH 1/7] Adding e2e scenario tests for maintenance push notifications handling. --- redis/connection.py | 44 +- redis/maintenance_events.py | 2 +- tests/test_scenario/conftest.py | 113 +++ .../test_scenario/hitless_upgrade_helpers.py | 332 ++++++++ tests/test_scenario/test_hitless_upgrade.py | 796 ++++++++++++++++++ 5 files changed, 1268 insertions(+), 19 deletions(-) create mode 100644 tests/test_scenario/conftest.py create mode 100644 tests/test_scenario/hitless_upgrade_helpers.py create mode 100644 tests/test_scenario/test_hitless_upgrade.py diff --git a/redis/connection.py b/redis/connection.py index 0e7de5a584..4db404ce72 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -1720,7 +1720,7 @@ def __init__( self._cache_factory = cache_factory if connection_kwargs.get("cache_config") or connection_kwargs.get("cache"): - if connection_kwargs.get("protocol") not in [3, "3"]: + if self.connection_kwargs.get("protocol") not in [3, "3"]: raise RedisError("Client caching is only supported with RESP version 3") cache = self.connection_kwargs.get("cache") @@ -1741,31 +1741,21 @@ def __init__( connection_kwargs.pop("cache", None) connection_kwargs.pop("cache_config", None) - if connection_kwargs.get( + if self.connection_kwargs.get( "maintenance_events_pool_handler" - ) or connection_kwargs.get("maintenance_events_config"): - if connection_kwargs.get("protocol") not in [3, "3"]: + ) or self.connection_kwargs.get("maintenance_events_config"): + if self.connection_kwargs.get("protocol") not in [3, "3"]: raise RedisError( "Push handlers on connection are only supported with RESP version 3" ) - config = connection_kwargs.get("maintenance_events_config", None) or ( - connection_kwargs.get("maintenance_events_pool_handler").config - if connection_kwargs.get("maintenance_events_pool_handler") + config = self.connection_kwargs.get("maintenance_events_config", None) or ( + self.connection_kwargs.get("maintenance_events_pool_handler").config + if self.connection_kwargs.get("maintenance_events_pool_handler") else None ) if config and config.enabled: - connection_kwargs.update( - { - "orig_host_address": connection_kwargs.get("host"), - "orig_socket_timeout": connection_kwargs.get( - "socket_timeout", None - ), - "orig_socket_connect_timeout": connection_kwargs.get( - "socket_connect_timeout", None - ), - } - ) + self._update_connection_kwargs_for_maintenance_events() self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None) if self._event_dispatcher is None: @@ -1821,6 +1811,7 @@ def set_maintenance_events_pool_handler( "maintenance_events_config": maintenance_events_pool_handler.config, } ) + self._update_connection_kwargs_for_maintenance_events() self._update_maintenance_events_configs_for_connections( maintenance_events_pool_handler @@ -1838,6 +1829,23 @@ def _update_maintenance_events_configs_for_connections( conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler) conn.maintenance_events_config = maintenance_events_pool_handler.config + def _update_connection_kwargs_for_maintenance_events(self): + """Store original connection parameters for maintenance events.""" + if self.connection_kwargs.get("orig_host_address", None) is None: + # if orig_host_address is None it means we haven't + # configured the original values yet + self.connection_kwargs.update( + { + "orig_host_address": self.connection_kwargs.get("host"), + "orig_socket_timeout": self.connection_kwargs.get( + "socket_timeout", None + ), + "orig_socket_connect_timeout": self.connection_kwargs.get( + "socket_connect_timeout", None + ), + } + ) + def reset(self) -> None: self._created_connections = 0 self._available_connections = [] diff --git a/redis/maintenance_events.py b/redis/maintenance_events.py index cd22d29ed5..a3636019ed 100644 --- a/redis/maintenance_events.py +++ b/redis/maintenance_events.py @@ -449,7 +449,7 @@ def __init__( self, enabled: bool = True, proactive_reconnect: bool = True, - relax_timeout: Optional[Number] = 20, + relax_timeout: Optional[Number] = 10, endpoint_type: Optional[EndpointType] = None, ): """ diff --git a/tests/test_scenario/conftest.py b/tests/test_scenario/conftest.py new file mode 100644 index 0000000000..297df9bd27 --- /dev/null +++ b/tests/test_scenario/conftest.py @@ -0,0 +1,113 @@ +import json +import logging +import os +from typing import Optional +from urllib.parse import urlparse +import pytest + +from redis.backoff import ExponentialWithJitterBackoff, NoBackoff +from redis.client import Redis +from redis.maintenance_events import EndpointType, MaintenanceEventsConfig +from redis.retry import Retry +from tests.test_scenario.fault_injector_client import FaultInjectorClient + +RELAX_TIMEOUT = 30 +CLIENT_TIMEOUT = 5 + + +@pytest.fixture() +def endpoint_name(request): + return request.config.getoption("--endpoint-name") or os.getenv( + "REDIS_ENDPOINT_NAME", "m-standard" + ) + + +@pytest.fixture() +def endpoints_config(endpoint_name: str): + endpoints_config = os.getenv("REDIS_ENDPOINTS_CONFIG_PATH", None) + + if not (endpoints_config and os.path.exists(endpoints_config)): + raise FileNotFoundError(f"Endpoints config file not found: {endpoints_config}") + + try: + with open(endpoints_config, "r") as f: + data = json.load(f) + db = data[endpoint_name] + return db + except Exception as e: + raise ValueError( + f"Failed to load endpoints config file: {endpoints_config}" + ) from e + + +@pytest.fixture() +def fault_injector_client(): + url = os.getenv("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324") + return FaultInjectorClient(url) + + +@pytest.fixture() +def client_maint_events(endpoints_config): + return _get_client_maint_events(endpoints_config) + + +def _get_client_maint_events( + endpoints_config, + enable_maintenance_events: bool = True, + endpoint_type: Optional[EndpointType] = None, + enable_relax_timeout: bool = True, + enable_proactive_reconnect: bool = True, + disable_retries: bool = False, + socket_timeout: Optional[float] = None, +): + """Create Redis client with maintenance events enabled.""" + + # Get credentials from the configuration + username = endpoints_config.get("username") + password = endpoints_config.get("password") + + # Parse host and port from endpoints URL + endpoints = endpoints_config.get("endpoints", []) + if not endpoints: + raise ValueError("No endpoints found in configuration") + + parsed = urlparse(endpoints[0]) + host = parsed.hostname + port = parsed.port + + if not host: + raise ValueError(f"Could not parse host from endpoint URL: {endpoints[0]}") + + logging.info(f"Connecting to Redis Enterprise: {host}:{port} with user: {username}") + + # Configure maintenance events + maintenance_config = MaintenanceEventsConfig( + enabled=enable_maintenance_events, + proactive_reconnect=enable_proactive_reconnect, + relax_timeout=RELAX_TIMEOUT if enable_relax_timeout else -1, + endpoint_type=endpoint_type, + ) + + # Create Redis client with maintenance events config + # This will automatically create the MaintenanceEventPoolHandler + if disable_retries: + retry = Retry(NoBackoff(), 0) + else: + retry = Retry(backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3) + + client = Redis( + host=host, + port=port, + socket_timeout=CLIENT_TIMEOUT if socket_timeout is None else socket_timeout, + username=username, + password=password, + protocol=3, # RESP3 required for push notifications + maintenance_events_config=maintenance_config, + retry=retry, + ) + logging.info("Redis client created with maintenance events enabled") + logging.info(f"Client uses Protocol: {client.connection_pool.get_protocol()}") + maintenance_handler_exists = client.maintenance_events_pool_handler is not None + logging.info(f"Maintenance events pool handler: {maintenance_handler_exists}") + + return client diff --git a/tests/test_scenario/hitless_upgrade_helpers.py b/tests/test_scenario/hitless_upgrade_helpers.py new file mode 100644 index 0000000000..3cf86d863f --- /dev/null +++ b/tests/test_scenario/hitless_upgrade_helpers.py @@ -0,0 +1,332 @@ +import logging +import time +from typing import Any, Dict, Optional, Tuple +import pytest + +from redis.client import Redis +from redis.connection import Connection +from tests.test_scenario.fault_injector_client import ( + ActionRequest, + ActionType, + FaultInjectorClient, +) + + +class TaskStatuses: + """Class to hold completed statuses constants.""" + + FAILED = "failed" + FINISHED = "finished" + SUCCESS = "success" + RUNNING = "running" + + COMPLETED_STATUSES = [FAILED, FINISHED, SUCCESS] + + +class ClientValidations: + @staticmethod + def wait_push_notification( + redis_client: Redis, + timeout: int = 120, + connection: Optional[Connection] = None, + ): + """Wait for a push notification to be received.""" + start_time = time.time() + check_interval = 1 # Check more frequently during operations + test_conn = ( + connection if connection else redis_client.connection_pool.get_connection() + ) + + try: + while time.time() - start_time < timeout: + try: + if test_conn.can_read(timeout=0.5): + # reading is important, it triggers the push notification + push_response = test_conn.read_response(push_request=True) + logging.debug( + f"Push notification has been received. Response: {push_response}" + ) + return + except Exception as e: + logging.error(f"Error reading push notification: {e}") + break + time.sleep(check_interval) + finally: + # Release the connection back to the pool + try: + if not connection: + redis_client.connection_pool.release(test_conn) + except Exception as e: + logging.error(f"Error releasing connection: {e}") + + +class ClusterOperations: + @staticmethod + def get_operation_result( + fault_injector: FaultInjectorClient, + action_id: str, + timeout: int = 60, + ) -> Tuple[str, dict]: + """Get the result of a specific action""" + start_time = time.time() + check_interval = 3 + while time.time() - start_time < timeout: + try: + status_result = fault_injector.get_action_status(action_id) + operation_status = status_result.get("status", "unknown") + + if operation_status in TaskStatuses.COMPLETED_STATUSES: + logging.debug( + f"Operation {action_id} completed with status: " + f"{operation_status}" + ) + return operation_status, status_result + + time.sleep(check_interval) + except Exception as e: + logging.warning(f"Error checking operation status: {e}") + time.sleep(check_interval) + else: + raise TimeoutError(f"Timeout waiting for operation {action_id}") + + @staticmethod + def get_cluster_nodes_info( + fault_injector: FaultInjectorClient, + endpoint_config: Dict[str, Any], + timeout: int = 60, + ) -> Dict[str, Any]: + """Get cluster nodes information from Redis Enterprise.""" + try: + # Use rladmin status to get node information + bdb_id = endpoint_config.get("bdb_id") + get_status_action = ActionRequest( + action_type=ActionType.EXECUTE_RLADMIN_COMMAND, + parameters={ + "rladmin_command": "status", + "bdb_id": bdb_id, + }, + ) + trigger_action_result = fault_injector.trigger_action(get_status_action) + action_id = trigger_action_result.get("action_id") + if not action_id: + raise ValueError( + f"Failed to trigger get cluster status action for bdb_id {bdb_id}: {trigger_action_result}" + ) + + status, action_status_check_response = ( + ClusterOperations.get_operation_result( + fault_injector, action_id, timeout=timeout + ) + ) + + if status != TaskStatuses.SUCCESS: + pytest.fail( + f"Failed to get cluster nodes info: {action_status_check_response}" + ) + logging.info( + f"Completed cluster nodes info reading: {action_status_check_response}" + ) + return action_status_check_response + + except Exception as e: + pytest.fail(f"Failed to get cluster nodes info: {e}") + + @staticmethod + def find_target_node_and_empty_node( + fault_injector: FaultInjectorClient, + endpoint_config: Dict[str, Any], + ) -> Tuple[str, str]: + """Find the node with master shards and the node with no shards. + + Returns: + tuple: (target_node, empty_node) where target_node has master shards + and empty_node has no shards + """ + cluster_info = ClusterOperations.get_cluster_nodes_info( + fault_injector, endpoint_config + ) + output = cluster_info.get("output", {}).get("output", "") + + if not output: + raise ValueError("No cluster status output found") + + # Parse the sections to find nodes with master shards and nodes with no shards + lines = output.split("\n") + shards_section_started = False + nodes_section_started = False + + # Get all node IDs from CLUSTER NODES section + all_nodes = set() + nodes_with_shards = set() + master_nodes = set() + + for line in lines: + line = line.strip() + + # Start of CLUSTER NODES section + if line.startswith("CLUSTER NODES:"): + nodes_section_started = True + continue + elif line.startswith("DATABASES:"): + nodes_section_started = False + continue + elif nodes_section_started and line and not line.startswith("NODE:ID"): + # Parse node line: node:1 master 10.0.101.206 ... (ignore the role) + parts = line.split() + if len(parts) >= 1: + node_id = parts[0].replace("*", "") # Remove * prefix if present + all_nodes.add(node_id) + + # Start of SHARDS section - only care about shard roles here + if line.startswith("SHARDS:"): + shards_section_started = True + continue + elif shards_section_started and line.startswith("DB:ID"): + continue + elif shards_section_started and line and not line.startswith("ENDPOINTS:"): + # Parse shard line: db:1 m-standard redis:1 node:2 master 0-8191 1.4MB OK + parts = line.split() + if len(parts) >= 5: + node_id = parts[3] # node:2 + shard_role = parts[4] # master/slave - this is what matters + + nodes_with_shards.add(node_id) + if shard_role == "master": + master_nodes.add(node_id) + elif line.startswith("ENDPOINTS:") or not line: + shards_section_started = False + + # Find empty node (node with no shards) + empty_nodes = all_nodes - nodes_with_shards + + logging.debug(f"All nodes: {all_nodes}") + logging.debug(f"Nodes with shards: {nodes_with_shards}") + logging.debug(f"Master nodes: {master_nodes}") + logging.debug(f"Empty nodes: {empty_nodes}") + + if not empty_nodes: + raise ValueError("No empty nodes (nodes without shards) found") + + if not master_nodes: + raise ValueError("No nodes with master shards found") + + # Return the first available empty node and master node (numeric part only) + empty_node = next(iter(empty_nodes)).split(":")[1] # node:1 -> 1 + target_node = next(iter(master_nodes)).split(":")[1] # node:2 -> 2 + + return target_node, empty_node + + @staticmethod + def find_endpoint_for_bind( + fault_injector: FaultInjectorClient, + endpoint_config: Dict[str, Any], + timeout: int = 60, + ) -> str: + """Find the endpoint ID from cluster status. + + Returns: + str: The endpoint ID (e.g., "1:1") + """ + cluster_info = ClusterOperations.get_cluster_nodes_info( + fault_injector, endpoint_config, timeout + ) + output = cluster_info.get("output", {}).get("output", "") + + if not output: + raise ValueError("No cluster status output found") + + # Parse the ENDPOINTS section to find endpoint ID + lines = output.split("\n") + endpoints_section_started = False + + for line in lines: + line = line.strip() + + # Start of ENDPOINTS section + if line.startswith("ENDPOINTS:"): + endpoints_section_started = True + continue + elif line.startswith("SHARDS:"): + endpoints_section_started = False + break + elif endpoints_section_started and line and not line.startswith("DB:ID"): + # Parse endpoint line: db:1 m-standard endpoint:1:1 node:2 single No + parts = line.split() + if len(parts) >= 3: + endpoint_full = parts[2] # endpoint:1:1 + if endpoint_full.startswith("endpoint:"): + endpoint_id = endpoint_full.replace("endpoint:", "") # 1:1 + return endpoint_id + + raise ValueError("No endpoint ID found in cluster status") + + @staticmethod + def execute_rladmin_migrate( + fault_injector: FaultInjectorClient, + endpoint_config: Dict[str, Any], + target_node: str, + empty_node: str, + ) -> str: + """Execute rladmin migrate command and wait for completion.""" + command = f"migrate node {target_node} all_shards target_node {empty_node}" + + # Get bdb_id from endpoint configuration + bdb_id = endpoint_config.get("bdb_id") + + try: + # Correct parameter format for fault injector + parameters = { + "bdb_id": bdb_id, + "rladmin_command": command, # Just the command without "rladmin" prefix + } + + logging.debug(f"Executing rladmin_command with parameter: {parameters}") + + action = ActionRequest( + action_type=ActionType.EXECUTE_RLADMIN_COMMAND, parameters=parameters + ) + result = fault_injector.trigger_action(action) + + logging.debug(f"Migrate command action result: {result}") + + action_id = result.get("action_id") + + if not action_id: + raise Exception(f"Failed to trigger migrate action: {result}") + return action_id + except Exception as e: + raise Exception(f"Failed to execute rladmin migrate: {e}") + + @staticmethod + def execute_rladmin_bind_endpoint( + fault_injector: FaultInjectorClient, + endpoint_config: Dict[str, Any], + endpoint_id: str, + ) -> str: + """Execute rladmin bind endpoint command and wait for completion.""" + command = f"bind endpoint {endpoint_id} policy single" + + bdb_id = endpoint_config.get("bdb_id") + + try: + parameters = { + "rladmin_command": command, # Just the command without "rladmin" prefix + "bdb_id": bdb_id, + } + + logging.info(f"Executing rladmin_command with parameter: {parameters}") + action = ActionRequest( + action_type=ActionType.EXECUTE_RLADMIN_COMMAND, parameters=parameters + ) + result = fault_injector.trigger_action(action) + logging.info( + f"Migrate command {command} with parameters {parameters} trigger result: {result}" + ) + + action_id = result.get("action_id") + + if not action_id: + raise Exception(f"Failed to trigger bind endpoint action: {result}") + return action_id + except Exception as e: + raise Exception(f"Failed to execute rladmin bind endpoint: {e}") diff --git a/tests/test_scenario/test_hitless_upgrade.py b/tests/test_scenario/test_hitless_upgrade.py new file mode 100644 index 0000000000..6af22defe4 --- /dev/null +++ b/tests/test_scenario/test_hitless_upgrade.py @@ -0,0 +1,796 @@ +"""Tests for Redis Enterprise moving push notifications with real cluster operations.""" + +import logging +from queue import Queue +from threading import Thread +import threading +import time +from typing import Any, Dict + +import pytest + +from redis import Redis +from redis.maintenance_events import EndpointType, MaintenanceState +from tests.test_scenario.conftest import ( + CLIENT_TIMEOUT, + RELAX_TIMEOUT, + _get_client_maint_events, +) +from tests.test_scenario.fault_injector_client import ( + FaultInjectorClient, +) +from tests.test_scenario.hitless_upgrade_helpers import ( + ClientValidations, + ClusterOperations, + TaskStatuses, +) + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s %(levelname)s %(message)s", + filemode="w", + filename="./test_hitless_upgrade.log", +) + +BIND_TIMEOUT = 60 +MIGRATE_TIMEOUT = 120 + + +class TestPushNotifications: + """ + Test Redis Enterprise maintenance push notifications with real cluster + operations. + """ + + @pytest.fixture(autouse=True) + def setup_and_cleanup( + self, + client_maint_events: Redis, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + ): + # Initialize cleanup flags first to ensure they exist even if setup fails + self._migration_executed = False + self._bind_executed = False + self.target_node = None + self.empty_node = None + self.endpoint_id = None + + try: + self.target_node, self.empty_node = ( + ClusterOperations.find_target_node_and_empty_node( + fault_injector_client, endpoints_config + ) + ) + logging.info( + f"Using target_node: {self.target_node}, empty_node: {self.empty_node}" + ) + except Exception as e: + pytest.fail(f"Failed to find target and empty nodes: {e}") + + try: + self.endpoint_id = ClusterOperations.find_endpoint_for_bind( + fault_injector_client, endpoints_config + ) + logging.info(f"Using endpoint: {self.endpoint_id}") + except Exception as e: + pytest.fail(f"Failed to find endpoint for bind operation: {e}") + + # Ensure setup completed successfully + if not self.target_node or not self.empty_node: + pytest.fail("Setup failed: target_node or empty_node not available") + if not self.endpoint_id: + pytest.fail("Setup failed: endpoint_id not available") + + # Yield control to the test + yield + + # Cleanup code - this will run even if the test fails + logging.info("Starting cleanup...") + try: + client_maint_events.close() + except Exception as e: + logging.error(f"Failed to close client: {e}") + + # Only attempt cleanup if we have the necessary attributes and they were executed + if self._migration_executed: + try: + if self.target_node and self.empty_node: + self._execute_migration( + fault_injector_client=fault_injector_client, + endpoints_config=endpoints_config, + target_node=self.empty_node, + empty_node=self.target_node, + ) + logging.info("Migration cleanup completed") + except Exception as e: + logging.error(f"Failed to revert migration: {e}") + + if self._bind_executed: + try: + if self.endpoint_id: + self._execute_bind( + fault_injector_client, endpoints_config, self.endpoint_id + ) + logging.info("Bind cleanup completed") + except Exception as e: + logging.error(f"Failed to revert bind endpoint: {e}") + + logging.info("Cleanup finished") + + def _execute_migration( + self, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + target_node: str, + empty_node: str, + ): + migrate_action_id = ClusterOperations.execute_rladmin_migrate( + fault_injector=fault_injector_client, + endpoint_config=endpoints_config, + target_node=target_node, + empty_node=empty_node, + ) + + self._migration_executed = True + + migrate_status, migrate_result = ClusterOperations.get_operation_result( + fault_injector_client, migrate_action_id, timeout=MIGRATE_TIMEOUT + ) + if migrate_status != TaskStatuses.SUCCESS: + pytest.fail(f"Failed to execute rladmin migrate: {migrate_result}") + + def _execute_bind( + self, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + endpoint_id: str, + ): + bind_action_id = ClusterOperations.execute_rladmin_bind_endpoint( + fault_injector_client, endpoints_config, endpoint_id + ) + + self._bind_executed = True + + bind_status, bind_result = ClusterOperations.get_operation_result( + fault_injector_client, bind_action_id, timeout=BIND_TIMEOUT + ) + if bind_status != TaskStatuses.SUCCESS: + pytest.fail(f"Failed to execute rladmin bind endpoint: {bind_result}") + + def _execute_migrate_bind_flow( + self, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + target_node: str, + empty_node: str, + endpoint_id: str, + ): + self._execute_migration( + fault_injector_client=fault_injector_client, + endpoints_config=endpoints_config, + target_node=target_node, + empty_node=empty_node, + ) + self._execute_bind( + fault_injector_client=fault_injector_client, + endpoints_config=endpoints_config, + endpoint_id=endpoint_id, + ) + + def _get_all_connections_in_pool(self, client: Redis): + connections = [] + if hasattr(client.connection_pool, "_available_connections"): + for conn in client.connection_pool._available_connections: + connections.append(conn) + if hasattr(client.connection_pool, "_in_use_connections"): + for conn in client.connection_pool._in_use_connections: + connections.append(conn) + if hasattr(client.connection_pool, "_connections"): + # This is the case for BlockingConnectionPool + for conn in client.connection_pool._connections: + connections.append(conn) + return connections + + def _validate_maintenance_state( + self, client: Redis, expected_matching_conns_count: int + ): + """Validate the client connections are in the expected state after migration.""" + matching_conns_count = 0 + connections = self._get_all_connections_in_pool(client) + + for conn in connections: + if ( + conn._sock is not None + and conn._sock.gettimeout() == RELAX_TIMEOUT + and conn.maintenance_state == MaintenanceState.MAINTENANCE + ): + matching_conns_count += 1 + assert matching_conns_count == expected_matching_conns_count + + def _validate_moving_state( + self, + client: Redis, + configured_endpoint_type: EndpointType, + expected_matching_connected_conns_count: int, + expected_matching_disconnected_conns_count: int, + ): + """Validate the client connections are in the expected state after migration.""" + matching_connected_conns_count = 0 + matching_disconnected_conns_count = 0 + connections = self._get_all_connections_in_pool(client) + for conn in connections: + endpoint_configured_correctly = bool( + ( + configured_endpoint_type == EndpointType.NONE + and conn.host == conn.orig_host_address + ) + or ( + configured_endpoint_type != EndpointType.NONE + and conn.host != conn.orig_host_address + ) + ) + if ( + conn._sock is not None + and conn._sock.gettimeout() == RELAX_TIMEOUT + and conn.maintenance_state == MaintenanceState.MOVING + and endpoint_configured_correctly + ): + matching_connected_conns_count += 1 + elif ( + conn._sock is None + and conn.maintenance_state == MaintenanceState.MOVING + and conn.socket_timeout == RELAX_TIMEOUT + and endpoint_configured_correctly + ): + matching_disconnected_conns_count += 1 + else: + pass + assert matching_connected_conns_count == expected_matching_connected_conns_count + assert ( + matching_disconnected_conns_count + == expected_matching_disconnected_conns_count + ) + + def _validate_default_state( + self, client: Redis, expected_matching_conns_count: int + ): + """Validate the client connections are in the expected state after migration.""" + matching_conns_count = 0 + connections = self._get_all_connections_in_pool(client) + + for conn in connections: + if conn._sock is None: + if ( + conn.maintenance_state == MaintenanceState.NONE + and conn.socket_timeout == CLIENT_TIMEOUT + and conn.host == conn.orig_host_address + ): + matching_conns_count += 1 + elif ( + conn._sock.gettimeout() == CLIENT_TIMEOUT + and conn.maintenance_state == MaintenanceState.NONE + and conn.host == conn.orig_host_address + ): + matching_conns_count += 1 + assert matching_conns_count == expected_matching_conns_count + + def _validate_default_notif_disabled_state( + self, client: Redis, expected_matching_conns_count: int + ): + """Validate the client connections are in the expected state after migration.""" + matching_conns_count = 0 + connections = self._get_all_connections_in_pool(client) + + for conn in connections: + if conn._sock is None: + if ( + conn.maintenance_state == MaintenanceState.NONE + and conn.socket_timeout == CLIENT_TIMEOUT + and not hasattr(conn, "orig_host_address") + ): + matching_conns_count += 1 + elif ( + conn._sock.gettimeout() == CLIENT_TIMEOUT + and conn.maintenance_state == MaintenanceState.NONE + and not hasattr(conn, "orig_host_address") + ): + matching_conns_count += 1 + assert matching_conns_count == expected_matching_conns_count + + @pytest.mark.timeout(300) # 5 minutes timeout for this test + def test_receive_migrating_and_moving_push_notification( + self, + client_maint_events: Redis, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + ): + """ + Test the push notifications are received when executing cluster operations. + + """ + + logging.info("Executing rladmin migrate command...") + migrate_thread = Thread( + target=self._execute_migration, + name="migrate_thread", + args=( + fault_injector_client, + endpoints_config, + self.target_node, + self.empty_node, + ), + ) + migrate_thread.start() + + logging.info("Waiting for MIGRATING push notifications...") + ClientValidations.wait_push_notification( + client_maint_events, timeout=MIGRATE_TIMEOUT + ) + + logging.info("Validating connection migrating state...") + conn = client_maint_events.connection_pool.get_connection() + assert conn.maintenance_state == MaintenanceState.MAINTENANCE + assert conn._sock.gettimeout() == RELAX_TIMEOUT + client_maint_events.connection_pool.release(conn) + + logging.info("Waiting for MIGRATED push notifications...") + ClientValidations.wait_push_notification( + client_maint_events, timeout=MIGRATE_TIMEOUT + ) + + logging.info("Validating connection states...") + conn = client_maint_events.connection_pool.get_connection() + assert conn.maintenance_state == MaintenanceState.NONE + assert conn._sock.gettimeout() == CLIENT_TIMEOUT + client_maint_events.connection_pool.release(conn) + + migrate_thread.join() + + logging.info("Executing rladmin bind endpoint command...") + + bind_thread = Thread( + target=self._execute_bind, + name="bind_thread", + args=(fault_injector_client, endpoints_config, self.endpoint_id), + ) + bind_thread.start() + + logging.info("Waiting for MOVING push notifications...") + ClientValidations.wait_push_notification( + client_maint_events, timeout=BIND_TIMEOUT + ) + + logging.info("Validating connection states...") + conn = client_maint_events.connection_pool.get_connection() + assert conn.maintenance_state == MaintenanceState.MOVING + assert conn._sock.gettimeout() == RELAX_TIMEOUT + + logging.info("Waiting for moving ttl to expire") + time.sleep(BIND_TIMEOUT) + + logging.info("Validating connection states...") + assert conn.maintenance_state == MaintenanceState.NONE + assert conn.socket_timeout == CLIENT_TIMEOUT + assert conn._sock.gettimeout() == CLIENT_TIMEOUT + client_maint_events.connection_pool.release(conn) + bind_thread.join() + + @pytest.mark.timeout(300) # 5 minutes timeout + @pytest.mark.parametrize( + "endpoint_type", + [ + EndpointType.EXTERNAL_FQDN, + EndpointType.EXTERNAL_IP, + EndpointType.NONE, + ], + ) + def test_timeout_handling_during_migrating_and_moving( + self, + endpoint_type: EndpointType, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + ): + """ + Test the push notifications are received when executing cluster operations. + + """ + logging.info(f"Testing timeout handling for endpoint type: {endpoint_type}") + client = _get_client_maint_events(endpoints_config, endpoint_type) + + # Create three connections in the pool + logging.info("Creating three connections in the pool.") + conns = [] + for _ in range(3): + conns.append(client.connection_pool.get_connection()) + # Release the connections + for conn in conns: + client.connection_pool.release(conn) + + logging.info("Executing rladmin migrate command...") + migrate_thread = Thread( + target=self._execute_migration, + name="migrate_thread", + args=( + fault_injector_client, + endpoints_config, + self.target_node, + self.empty_node, + ), + ) + migrate_thread.start() + + logging.info("Waiting for MIGRATING push notifications...") + # this will consume the notification in one of the connections + ClientValidations.wait_push_notification(client, timeout=MIGRATE_TIMEOUT) + + self._validate_maintenance_state(client, expected_matching_conns_count=1) + self._validate_default_state(client, expected_matching_conns_count=2) + + logging.info("Waiting for MIGRATED push notifications...") + ClientValidations.wait_push_notification(client, timeout=MIGRATE_TIMEOUT) + + logging.info("Validating connection states after MIGRATED ...") + self._validate_default_state(client, expected_matching_conns_count=3) + + migrate_thread.join() + + logging.info("Executing rladmin bind endpoint command...") + + bind_thread = Thread( + target=self._execute_bind, + name="bind_thread", + args=(fault_injector_client, endpoints_config, self.endpoint_id), + ) + bind_thread.start() + + logging.info("Waiting for MOVING push notifications...") + # this will consume the notification in one of the connections + # and will handle the states of the rest + # the consumed connection will be disconnected during + # releasing it back to the pool and as a result we will have + # 3 disconnected connections in the pool + ClientValidations.wait_push_notification(client, timeout=BIND_TIMEOUT) + + if endpoint_type == EndpointType.NONE: + logging.info( + "Waiting for moving ttl/2 to expire to validate proactive reconnection" + ) + time.sleep(8) + + logging.info("Validating connections states...") + self._validate_moving_state( + client, + endpoint_type, + expected_matching_connected_conns_count=0, + expected_matching_disconnected_conns_count=3, + ) + # during get_connection() the connection will be reconnected + # either to the address provided in the moving event or to the original address + # depending on the configured endpoint type + # with this call we test if we are able to connect to the new address + conn = client.connection_pool.get_connection() + self._validate_moving_state( + client, + endpoint_type, + expected_matching_connected_conns_count=1, + expected_matching_disconnected_conns_count=2, + ) + client.connection_pool.release(conn) + + logging.info("Waiting for moving ttl to expire") + time.sleep(BIND_TIMEOUT) + + logging.info("Validating connection states...") + self._validate_default_state(client, expected_matching_conns_count=3) + bind_thread.join() + + @pytest.mark.timeout(300) # 5 minutes timeout + @pytest.mark.parametrize( + "endpoint_type", + [ + EndpointType.EXTERNAL_FQDN, + EndpointType.EXTERNAL_IP, + EndpointType.NONE, + ], + ) + def test_new_connection_handling_during_migrating_and_moving( + self, + endpoint_type: EndpointType, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + ): + logging.info(f"Testing timeout handling for endpoint type: {endpoint_type}") + client = _get_client_maint_events(endpoints_config, endpoint_type) + + logging.info("Creating one connection in the pool.") + first_conn = client.connection_pool.get_connection() + + logging.info("Executing rladmin migrate command...") + migrate_thread = Thread( + target=self._execute_migration, + name="migrate_thread", + args=( + fault_injector_client, + endpoints_config, + self.target_node, + self.empty_node, + ), + ) + migrate_thread.start() + + logging.info("Waiting for MIGRATING push notifications...") + # this will consume the notification in the provided connection + ClientValidations.wait_push_notification( + client, timeout=MIGRATE_TIMEOUT, connection=first_conn + ) + + self._validate_maintenance_state(client, expected_matching_conns_count=1) + + # validate that new connections will also receive the moving event + logging.info( + "Creating second connection in the pool" + " and expect it to receive the migrating as well." + ) + + second_connection = client.connection_pool.get_connection() + ClientValidations.wait_push_notification( + client, timeout=MIGRATE_TIMEOUT, connection=second_connection + ) + # second_connection.send_command("PING") + # resp = second_connection.read_response() + # assert resp == b"PONG" + + logging.info( + "Validating connection states after MIGRATING for both connections ..." + ) + self._validate_maintenance_state(client, expected_matching_conns_count=2) + + logging.info("Waiting for MIGRATED push notifications on both connections ...") + ClientValidations.wait_push_notification( + client, timeout=MIGRATE_TIMEOUT, connection=first_conn + ) + ClientValidations.wait_push_notification( + client, timeout=MIGRATE_TIMEOUT, connection=second_connection + ) + + client.connection_pool.release(first_conn) + client.connection_pool.release(second_connection) + + migrate_thread.join() + + logging.info("Executing rladmin bind endpoint command...") + + bind_thread = Thread( + target=self._execute_bind, + name="bind_thread", + args=(fault_injector_client, endpoints_config, self.endpoint_id), + ) + bind_thread.start() + + logging.info("Waiting for MOVING push notifications on random connection ...") + # this will consume the notification in one of the connections + # and will handle the states of the rest + # the consumed connection will be disconnected during + # releasing it back to the pool and as a result we will have + # 3 disconnected connections in the pool + ClientValidations.wait_push_notification(client, timeout=BIND_TIMEOUT) + + if endpoint_type == EndpointType.NONE: + logging.info( + "Waiting for moving ttl/2 to expire to validate proactive reconnection" + ) + time.sleep(8) + + # validate that new connections will also receive the moving event + connections = [] + for _ in range(3): + connections.append(client.connection_pool.get_connection()) + for conn in connections: + client.connection_pool.release(conn) + + logging.info("Validating connections states during MOVING ...") + # during get_connection() the existing connection will be reconnected + # either to the address provided in the moving event or to the original address + # depending on the configured endpoint type + # with this call we test if we are able to connect to the new address + # new connection should also be marked as moving + self._validate_moving_state( + client, + endpoint_type, + expected_matching_connected_conns_count=3, + expected_matching_disconnected_conns_count=0, + ) + + logging.info("Waiting for moving ttl to expire") + time.sleep(BIND_TIMEOUT) + + logging.info("Validating connection states after MOVING has expired ...") + self._validate_default_state(client, expected_matching_conns_count=3) + bind_thread.join() + + @pytest.mark.timeout(300) + def test_disabled_handling_during_migrating_and_moving( + self, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + ): + logging.info("Creating client with disabled notifications.") + client = _get_client_maint_events( + endpoints_config, + enable_maintenance_events=False, + ) + + logging.info("Creating one connection in the pool.") + first_conn = client.connection_pool.get_connection() + + logging.info("Executing rladmin migrate command...") + migrate_thread = Thread( + target=self._execute_migration, + name="migrate_thread", + args=( + fault_injector_client, + endpoints_config, + self.target_node, + self.empty_node, + ), + ) + migrate_thread.start() + + logging.info("Waiting for MIGRATING push notifications...") + # this will consume the notification in the provided connection + ClientValidations.wait_push_notification( + client, timeout=5, connection=first_conn + ) + + self._validate_default_notif_disabled_state( + client, expected_matching_conns_count=1 + ) + + # validate that new connections will also receive the moving event + logging.info( + "Creating second connection in the pool" + " and expect it to receive the migrating as well." + ) + + second_connection = client.connection_pool.get_connection() + ClientValidations.wait_push_notification( + client, timeout=5, connection=second_connection + ) + + logging.info( + "Validating connection states after MIGRATING for both connections ..." + ) + self._validate_default_notif_disabled_state( + client, expected_matching_conns_count=2 + ) + + logging.info("Waiting for MIGRATED push notifications on both connections ...") + ClientValidations.wait_push_notification( + client, timeout=5, connection=first_conn + ) + ClientValidations.wait_push_notification( + client, timeout=5, connection=second_connection + ) + + client.connection_pool.release(first_conn) + client.connection_pool.release(second_connection) + + migrate_thread.join() + + logging.info("Executing rladmin bind endpoint command...") + + bind_thread = Thread( + target=self._execute_bind, + name="bind_thread", + args=(fault_injector_client, endpoints_config, self.endpoint_id), + ) + bind_thread.start() + + logging.info("Waiting for MOVING push notifications on random connection ...") + # this will consume the notification in one of the connections + # and will handle the states of the rest + # the consumed connection will be disconnected during + # releasing it back to the pool and as a result we will have + # 3 disconnected connections in the pool + ClientValidations.wait_push_notification(client, timeout=10) + + # validate that new connections will also receive the moving event + connections = [] + for _ in range(3): + connections.append(client.connection_pool.get_connection()) + for conn in connections: + client.connection_pool.release(conn) + + logging.info("Validating connections states during MOVING ...") + self._validate_default_notif_disabled_state( + client, expected_matching_conns_count=3 + ) + + logging.info("Waiting for moving ttl to expire") + time.sleep(30) + + logging.info("Validating connection states after MOVING has expired ...") + self._validate_default_notif_disabled_state( + client, expected_matching_conns_count=3 + ) + bind_thread.join() + + @pytest.mark.timeout(300) + @pytest.mark.parametrize( + "endpoint_type", + [ + EndpointType.EXTERNAL_FQDN, + EndpointType.EXTERNAL_IP, + EndpointType.NONE, + ], + ) + def test_command_execution_during_migrating_and_moving( + self, + fault_injector_client: FaultInjectorClient, + endpoints_config: Dict[str, Any], + endpoint_type: EndpointType, + ): + """ + Test command execution during migrating and moving events. + + This test validates that: + 1. Commands can be executed during MIGRATING and MOVING events + 2. Commands are not blocked by the events + 3. Commands are executed successfully + """ + errors = Queue() + execution_duration = 180 + socket_timeout = 0.5 + + client = _get_client_maint_events( + endpoints_config, + endpoint_type=endpoint_type, + disable_retries=True, + socket_timeout=socket_timeout, + enable_maintenance_events=True, + ) + + migrate_and_bind_thread = Thread( + target=self._execute_migrate_bind_flow, + name="migrate_and_bind_thread", + args=( + fault_injector_client, + endpoints_config, + self.target_node, + self.empty_node, + self.endpoint_id, + ), + ) + migrate_and_bind_thread.start() + + def execute_commands(duration: int, errors: Queue): + start = time.time() + while time.time() - start < duration: + try: + client.set("key", "value") + client.get("key") + except Exception as e: + errors.put( + f"Command failed in thread {threading.current_thread().name}: {e}" + ) + + threads = [] + for _ in range(10): + thread = Thread( + target=execute_commands, + name="command_execution_thread", + args=( + execution_duration, + errors, + ), + ) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + migrate_and_bind_thread.join() + + assert errors.empty(), f"Errors occurred in threads: {errors.queue}" From 9fee5ca3d49b1c90945f46ec4f5b676f4a876de5 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Fri, 29 Aug 2025 15:02:17 +0300 Subject: [PATCH 2/7] Excluding scenario tests from standard tests invoking --- tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tasks.py b/tasks.py index 52decf08e7..85f2003f2c 100644 --- a/tasks.py +++ b/tasks.py @@ -58,11 +58,11 @@ def standalone_tests( if uvloop: run( - f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --cov=./ --cov-report=xml:coverage_resp{protocol}_uvloop.xml -m 'not onlycluster{extra_markers}' --uvloop --junit-xml=standalone-resp{protocol}-uvloop-results.xml" + f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=scenario --cov=./ --cov-report=xml:coverage_resp{protocol}_uvloop.xml -m 'not onlycluster{extra_markers}' --uvloop --junit-xml=standalone-resp{protocol}-uvloop-results.xml" ) else: run( - f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --cov=./ --cov-report=xml:coverage_resp{protocol}.xml -m 'not onlycluster{extra_markers}' --junit-xml=standalone-resp{protocol}-results.xml" + f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=scenario --cov=./ --cov-report=xml:coverage_resp{protocol}.xml -m 'not onlycluster{extra_markers}' --junit-xml=standalone-resp{protocol}-results.xml" ) @@ -74,11 +74,11 @@ def cluster_tests(c, uvloop=False, protocol=2, profile=False): cluster_tls_url = "rediss://localhost:27379/0" if uvloop: run( - f"pytest {profile_arg} --protocol={protocol} --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}_uvloop.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-uvloop-results.xml --uvloop" + f"pytest {profile_arg} --protocol={protocol} --ignore=scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}_uvloop.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-uvloop-results.xml --uvloop" ) else: run( - f"pytest {profile_arg} --protocol={protocol} --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-results.xml" + f"pytest {profile_arg} --protocol={protocol} --ignore=scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-results.xml" ) From 4c2c85a3e71be24555e20238ea5b8ac453d729b7 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Fri, 29 Aug 2025 17:50:28 +0300 Subject: [PATCH 3/7] Fixing ignore filter for e2e scenario tests to be skipped during redis-py unit/integration pipeline runs --- tasks.py | 8 ++++---- tests/test_scenario/test_hitless_upgrade.py | 3 --- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/tasks.py b/tasks.py index 85f2003f2c..20f9f245aa 100644 --- a/tasks.py +++ b/tasks.py @@ -58,11 +58,11 @@ def standalone_tests( if uvloop: run( - f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=scenario --cov=./ --cov-report=xml:coverage_resp{protocol}_uvloop.xml -m 'not onlycluster{extra_markers}' --uvloop --junit-xml=standalone-resp{protocol}-uvloop-results.xml" + f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_resp{protocol}_uvloop.xml -m 'not onlycluster{extra_markers}' --uvloop --junit-xml=standalone-resp{protocol}-uvloop-results.xml" ) else: run( - f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=scenario --cov=./ --cov-report=xml:coverage_resp{protocol}.xml -m 'not onlycluster{extra_markers}' --junit-xml=standalone-resp{protocol}-results.xml" + f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_resp{protocol}.xml -m 'not onlycluster{extra_markers}' --junit-xml=standalone-resp{protocol}-results.xml" ) @@ -74,11 +74,11 @@ def cluster_tests(c, uvloop=False, protocol=2, profile=False): cluster_tls_url = "rediss://localhost:27379/0" if uvloop: run( - f"pytest {profile_arg} --protocol={protocol} --ignore=scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}_uvloop.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-uvloop-results.xml --uvloop" + f"pytest {profile_arg} --protocol={protocol} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}_uvloop.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-uvloop-results.xml --uvloop" ) else: run( - f"pytest {profile_arg} --protocol={protocol} --ignore=scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-results.xml" + f"pytest {profile_arg} --protocol={protocol} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-results.xml" ) diff --git a/tests/test_scenario/test_hitless_upgrade.py b/tests/test_scenario/test_hitless_upgrade.py index 6af22defe4..86292f7b28 100644 --- a/tests/test_scenario/test_hitless_upgrade.py +++ b/tests/test_scenario/test_hitless_upgrade.py @@ -537,9 +537,6 @@ def test_new_connection_handling_during_migrating_and_moving( ClientValidations.wait_push_notification( client, timeout=MIGRATE_TIMEOUT, connection=second_connection ) - # second_connection.send_command("PING") - # resp = second_connection.read_response() - # assert resp == b"PONG" logging.info( "Validating connection states after MIGRATING for both connections ..." From dae0ff1583f0a83a3fce50e67b3bdf9dad6dd8e6 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Fri, 29 Aug 2025 17:58:39 +0300 Subject: [PATCH 4/7] Fixing unit test after applying correct expected value for default timeout. --- redis/connection.py | 2 +- tests/test_maintenance_events.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index 4db404ce72..2c1a985638 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -1832,7 +1832,7 @@ def _update_maintenance_events_configs_for_connections( def _update_connection_kwargs_for_maintenance_events(self): """Store original connection parameters for maintenance events.""" if self.connection_kwargs.get("orig_host_address", None) is None: - # if orig_host_address is None it means we haven't + # If orig_host_address is None it means we haven't # configured the original values yet self.connection_kwargs.update( { diff --git a/tests/test_maintenance_events.py b/tests/test_maintenance_events.py index 916c93b9ad..69bdad2947 100644 --- a/tests/test_maintenance_events.py +++ b/tests/test_maintenance_events.py @@ -381,7 +381,7 @@ def test_init_defaults(self): config = MaintenanceEventsConfig() assert config.enabled is True assert config.proactive_reconnect is True - assert config.relax_timeout == 20 + assert config.relax_timeout == 10 def test_init_custom_values(self): """Test MaintenanceEventsConfig initialization with custom values.""" From 209fe7e355f50f876f662146d766ae7b06bbd605 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Mon, 1 Sep 2025 07:47:27 +0300 Subject: [PATCH 5/7] Ignoring e2e tests for validate building and installing pipeline actions --- .github/workflows/install_and_test.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/install_and_test.sh b/.github/workflows/install_and_test.sh index e647126539..88cf27b206 100755 --- a/.github/workflows/install_and_test.sh +++ b/.github/workflows/install_and_test.sh @@ -45,4 +45,6 @@ pytest -m 'not onlycluster' CLUSTER_URL="redis://localhost:16379/0" CLUSTER_SSL_URL="rediss://localhost:27379/0" pytest -m 'not onlynoncluster and not redismod and not ssl' \ - --redis-url="${CLUSTER_URL}" --redis-ssl-url="${CLUSTER_SSL_URL}" + --ignore=tests/test_scenario \ + --redis-url="${CLUSTER_URL}" \ + --redis-ssl-url="${CLUSTER_SSL_URL}" From eaeedc1e006edc6728cbe57c0777afc9866b24b3 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Mon, 1 Sep 2025 08:57:14 +0300 Subject: [PATCH 6/7] Ignoring e2e tests for validate building and installing pipeline actions - second try + some test helpers improvements --- .github/workflows/install_and_test.sh | 2 +- redis/_parsers/base.py | 2 +- tests/test_maintenance_events_handling.py | 4 +--- tests/test_scenario/conftest.py | 9 ++++++++- tests/test_scenario/hitless_upgrade_helpers.py | 5 +++-- tests/test_scenario/test_hitless_upgrade.py | 15 ++++++++++----- 6 files changed, 24 insertions(+), 13 deletions(-) diff --git a/.github/workflows/install_and_test.sh b/.github/workflows/install_and_test.sh index 88cf27b206..c90027389c 100755 --- a/.github/workflows/install_and_test.sh +++ b/.github/workflows/install_and_test.sh @@ -40,7 +40,7 @@ cd ${TESTDIR} # install, run tests pip install ${PKG} # Redis tests -pytest -m 'not onlycluster' +pytest -m 'not onlycluster' --ignore=tests/test_scenario # RedisCluster tests CLUSTER_URL="redis://localhost:16379/0" CLUSTER_SSL_URL="rediss://localhost:27379/0" diff --git a/redis/_parsers/base.py b/redis/_parsers/base.py index 9fe77bcd3b..be7b5cf1d0 100644 --- a/redis/_parsers/base.py +++ b/redis/_parsers/base.py @@ -191,7 +191,7 @@ def parse_moving_msg(response): # Expected message format is: MOVING