Skip to content

Commit 1018692

Browse files
committed
Adding e2e scenario tests for maintenance push notifications handling.
1 parent 4cdf082 commit 1018692

File tree

5 files changed

+1268
-19
lines changed

5 files changed

+1268
-19
lines changed

redis/connection.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,7 +1720,7 @@ def __init__(
17201720
self._cache_factory = cache_factory
17211721

17221722
if connection_kwargs.get("cache_config") or connection_kwargs.get("cache"):
1723-
if connection_kwargs.get("protocol") not in [3, "3"]:
1723+
if self.connection_kwargs.get("protocol") not in [3, "3"]:
17241724
raise RedisError("Client caching is only supported with RESP version 3")
17251725

17261726
cache = self.connection_kwargs.get("cache")
@@ -1741,31 +1741,21 @@ def __init__(
17411741
connection_kwargs.pop("cache", None)
17421742
connection_kwargs.pop("cache_config", None)
17431743

1744-
if connection_kwargs.get(
1744+
if self.connection_kwargs.get(
17451745
"maintenance_events_pool_handler"
1746-
) or connection_kwargs.get("maintenance_events_config"):
1747-
if connection_kwargs.get("protocol") not in [3, "3"]:
1746+
) or self.connection_kwargs.get("maintenance_events_config"):
1747+
if self.connection_kwargs.get("protocol") not in [3, "3"]:
17481748
raise RedisError(
17491749
"Push handlers on connection are only supported with RESP version 3"
17501750
)
1751-
config = connection_kwargs.get("maintenance_events_config", None) or (
1752-
connection_kwargs.get("maintenance_events_pool_handler").config
1753-
if connection_kwargs.get("maintenance_events_pool_handler")
1751+
config = self.connection_kwargs.get("maintenance_events_config", None) or (
1752+
self.connection_kwargs.get("maintenance_events_pool_handler").config
1753+
if self.connection_kwargs.get("maintenance_events_pool_handler")
17541754
else None
17551755
)
17561756

17571757
if config and config.enabled:
1758-
connection_kwargs.update(
1759-
{
1760-
"orig_host_address": connection_kwargs.get("host"),
1761-
"orig_socket_timeout": connection_kwargs.get(
1762-
"socket_timeout", None
1763-
),
1764-
"orig_socket_connect_timeout": connection_kwargs.get(
1765-
"socket_connect_timeout", None
1766-
),
1767-
}
1768-
)
1758+
self._update_connection_kwargs_for_maintenance_events()
17691759

17701760
self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None)
17711761
if self._event_dispatcher is None:
@@ -1821,6 +1811,7 @@ def set_maintenance_events_pool_handler(
18211811
"maintenance_events_config": maintenance_events_pool_handler.config,
18221812
}
18231813
)
1814+
self._update_connection_kwargs_for_maintenance_events()
18241815

18251816
self._update_maintenance_events_configs_for_connections(
18261817
maintenance_events_pool_handler
@@ -1838,6 +1829,23 @@ def _update_maintenance_events_configs_for_connections(
18381829
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
18391830
conn.maintenance_events_config = maintenance_events_pool_handler.config
18401831

1832+
def _update_connection_kwargs_for_maintenance_events(self):
1833+
"""Store original connection parameters for maintenance events."""
1834+
if self.connection_kwargs.get("orig_host_address", None) is None:
1835+
# if orig_host_address is None it means we haven't
1836+
# configured the original values yet
1837+
self.connection_kwargs.update(
1838+
{
1839+
"orig_host_address": self.connection_kwargs.get("host"),
1840+
"orig_socket_timeout": self.connection_kwargs.get(
1841+
"socket_timeout", None
1842+
),
1843+
"orig_socket_connect_timeout": self.connection_kwargs.get(
1844+
"socket_connect_timeout", None
1845+
),
1846+
}
1847+
)
1848+
18411849
def reset(self) -> None:
18421850
self._created_connections = 0
18431851
self._available_connections = []

redis/maintenance_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ def __init__(
449449
self,
450450
enabled: bool = True,
451451
proactive_reconnect: bool = True,
452-
relax_timeout: Optional[Number] = 20,
452+
relax_timeout: Optional[Number] = 10,
453453
endpoint_type: Optional[EndpointType] = None,
454454
):
455455
"""

tests/test_scenario/conftest.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import json
2+
import logging
3+
import os
4+
from typing import Optional
5+
from urllib.parse import urlparse
6+
import pytest
7+
8+
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
9+
from redis.client import Redis
10+
from redis.maintenance_events import EndpointType, MaintenanceEventsConfig
11+
from redis.retry import Retry
12+
from tests.test_scenario.fault_injector_client import FaultInjectorClient
13+
14+
RELAX_TIMEOUT = 30
15+
CLIENT_TIMEOUT = 5
16+
17+
18+
@pytest.fixture()
19+
def endpoint_name(request):
20+
return request.config.getoption("--endpoint-name") or os.getenv(
21+
"REDIS_ENDPOINT_NAME", "m-standard"
22+
)
23+
24+
25+
@pytest.fixture()
26+
def endpoints_config(endpoint_name: str):
27+
endpoints_config = os.getenv("REDIS_ENDPOINTS_CONFIG_PATH", None)
28+
29+
if not (endpoints_config and os.path.exists(endpoints_config)):
30+
raise FileNotFoundError(f"Endpoints config file not found: {endpoints_config}")
31+
32+
try:
33+
with open(endpoints_config, "r") as f:
34+
data = json.load(f)
35+
db = data[endpoint_name]
36+
return db
37+
except Exception as e:
38+
raise ValueError(
39+
f"Failed to load endpoints config file: {endpoints_config}"
40+
) from e
41+
42+
43+
@pytest.fixture()
44+
def fault_injector_client():
45+
url = os.getenv("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324")
46+
return FaultInjectorClient(url)
47+
48+
49+
@pytest.fixture()
50+
def client_maint_events(endpoints_config):
51+
return _get_client_maint_events(endpoints_config)
52+
53+
54+
def _get_client_maint_events(
55+
endpoints_config,
56+
enable_maintenance_events: bool = True,
57+
endpoint_type: Optional[EndpointType] = None,
58+
enable_relax_timeout: bool = True,
59+
enable_proactive_reconnect: bool = True,
60+
disable_retries: bool = False,
61+
socket_timeout: Optional[float] = None,
62+
):
63+
"""Create Redis client with maintenance events enabled."""
64+
65+
# Get credentials from the configuration
66+
username = endpoints_config.get("username")
67+
password = endpoints_config.get("password")
68+
69+
# Parse host and port from endpoints URL
70+
endpoints = endpoints_config.get("endpoints", [])
71+
if not endpoints:
72+
raise ValueError("No endpoints found in configuration")
73+
74+
parsed = urlparse(endpoints[0])
75+
host = parsed.hostname
76+
port = parsed.port
77+
78+
if not host:
79+
raise ValueError(f"Could not parse host from endpoint URL: {endpoints[0]}")
80+
81+
logging.info(f"Connecting to Redis Enterprise: {host}:{port} with user: {username}")
82+
83+
# Configure maintenance events
84+
maintenance_config = MaintenanceEventsConfig(
85+
enabled=enable_maintenance_events,
86+
proactive_reconnect=enable_proactive_reconnect,
87+
relax_timeout=RELAX_TIMEOUT if enable_relax_timeout else -1,
88+
endpoint_type=endpoint_type,
89+
)
90+
91+
# Create Redis client with maintenance events config
92+
# This will automatically create the MaintenanceEventPoolHandler
93+
if disable_retries:
94+
retry = Retry(NoBackoff(), 0)
95+
else:
96+
retry = Retry(backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3)
97+
98+
client = Redis(
99+
host=host,
100+
port=port,
101+
socket_timeout=CLIENT_TIMEOUT if socket_timeout is None else socket_timeout,
102+
username=username,
103+
password=password,
104+
protocol=3, # RESP3 required for push notifications
105+
maintenance_events_config=maintenance_config,
106+
retry=retry,
107+
)
108+
logging.info("Redis client created with maintenance events enabled")
109+
logging.info(f"Client uses Protocol: {client.connection_pool.get_protocol()}")
110+
maintenance_handler_exists = client.maintenance_events_pool_handler is not None
111+
logging.info(f"Maintenance events pool handler: {maintenance_handler_exists}")
112+
113+
return client

0 commit comments

Comments
 (0)