Skip to content

Added LagAwareHealthCheck for MultiDBClient #3737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 22, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added redis/http/__init__.py
Empty file.
412 changes: 412 additions & 0 deletions redis/http/http_client.py

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions redis/multidb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ class MultiDBClient(RedisModuleCommands, CoreCommands):
"""
def __init__(self, config: MultiDbConfig):
self._databases = config.databases()
self._health_checks = config.default_health_checks() if config.health_checks is None else config.health_checks
self._health_checks = config.default_health_checks()

if config.additional_health_checks is not None:
self._health_checks.extend(config.additional_health_checks)

self._health_check_interval = config.health_check_interval
self._failure_detectors = config.default_failure_detectors() \
if config.failure_detectors is None else config.failure_detectors
Expand Down Expand Up @@ -233,7 +237,7 @@ def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Excep
database.circuit.state = CBState.OPEN
elif is_healthy and database.circuit.state != CBState.CLOSED:
database.circuit.state = CBState.CLOSED
except (ConnectionError, TimeoutError, socket.timeout, ConnectionRefusedError) as e:
except Exception as e:
if database.circuit.state != CBState.OPEN:
database.circuit.state = CBState.OPEN
is_healthy = False
Expand Down
9 changes: 4 additions & 5 deletions redis/multidb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
from redis.multidb.circuit import CircuitBreaker, PBCircuitBreakerAdapter
from redis.multidb.database import Database, Databases
from redis.multidb.failure_detector import FailureDetector, CommandFailureDetector
from redis.multidb.healthcheck import HealthCheck, EchoHealthCheck
from redis.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_RETRIES, \
DEFAULT_HEALTH_CHECK_BACKOFF
from redis.multidb.failover import FailoverStrategy, WeightBasedFailoverStrategy
from redis.retry import Retry

DEFAULT_GRACE_PERIOD = 5.0
DEFAULT_HEALTH_CHECK_INTERVAL = 5
DEFAULT_HEALTH_CHECK_RETRIES = 3
DEFAULT_HEALTH_CHECK_BACKOFF = ExponentialWithJitterBackoff(cap=10)
DEFAULT_FAILURES_THRESHOLD = 3
DEFAULT_FAILURES_DURATION = 2
DEFAULT_FAILOVER_RETRIES = 3
Expand Down Expand Up @@ -54,7 +53,7 @@ class MultiDbConfig:
failure_detectors: Optional list of failure detectors for monitoring database failures.
failure_threshold: Threshold for determining database failure.
failures_interval: Time interval for tracking database failures.
health_checks: Optional list of health checks performed on databases.
additional_health_checks: Optional list of health checks performed on databases.
health_check_interval: Time interval for executing health checks.
health_check_retries: Number of retry attempts for performing health checks.
health_check_backoff: Backoff strategy for health check retries.
Expand Down Expand Up @@ -89,7 +88,7 @@ class MultiDbConfig:
failure_detectors: Optional[List[FailureDetector]] = None
failure_threshold: int = DEFAULT_FAILURES_THRESHOLD
failures_interval: float = DEFAULT_FAILURES_DURATION
health_checks: Optional[List[HealthCheck]] = None
additional_health_checks: Optional[List[HealthCheck]] = None
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL
health_check_retries: int = DEFAULT_HEALTH_CHECK_RETRIES
health_check_backoff: AbstractBackoff = DEFAULT_HEALTH_CHECK_BACKOFF
Expand Down
6 changes: 2 additions & 4 deletions redis/multidb/failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from redis.multidb.circuit import State as CBState
from redis.multidb.exception import NoValidDatabaseException
from redis.retry import Retry
from redis.utils import dummy_fail


class FailoverStrategy(ABC):
Expand Down Expand Up @@ -37,7 +38,7 @@ def __init__(
def database(self) -> AbstractDatabase:
return self._retry.call_with_retry(
lambda: self._get_active_database(),
lambda _: self._dummy_fail()
lambda _: dummy_fail()
)

def set_databases(self, databases: Databases) -> None:
Expand All @@ -49,6 +50,3 @@ def _get_active_database(self) -> AbstractDatabase:
return database

raise NoValidDatabaseException('No valid database available for communication')

def _dummy_fail(self):
pass
106 changes: 100 additions & 6 deletions redis/multidb/healthcheck.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import logging
from abc import abstractmethod, ABC
from typing import Optional, Tuple, Union

from redis.backoff import ExponentialWithJitterBackoff
from redis.http.http_client import DEFAULT_TIMEOUT, HttpClient
from redis.retry import Retry
from redis.utils import dummy_fail

DEFAULT_HEALTH_CHECK_RETRIES = 3
DEFAULT_HEALTH_CHECK_BACKOFF = ExponentialWithJitterBackoff(cap=10)

logger = logging.getLogger(__name__)

class HealthCheck(ABC):

Expand All @@ -18,7 +28,7 @@ def check_health(self, database) -> bool:
class AbstractHealthCheck(HealthCheck):
def __init__(
self,
retry: Retry,
retry=Retry(retries=DEFAULT_HEALTH_CHECK_RETRIES, backoff=DEFAULT_HEALTH_CHECK_BACKOFF)
) -> None:
self._retry = retry
self._retry.update_supported_errors([ConnectionRefusedError])
Expand All @@ -34,8 +44,8 @@ def check_health(self, database) -> bool:

class EchoHealthCheck(AbstractHealthCheck):
def __init__(
self,
retry: Retry,
self,
retry=Retry(retries=DEFAULT_HEALTH_CHECK_RETRIES, backoff=DEFAULT_HEALTH_CHECK_BACKOFF),
) -> None:
"""
Check database healthiness by sending an echo request.
Expand All @@ -46,13 +56,97 @@ def __init__(
def check_health(self, database) -> bool:
return self._retry.call_with_retry(
lambda: self._returns_echoed_message(database),
lambda _: self._dummy_fail()
lambda _: dummy_fail()
)

def _returns_echoed_message(self, database) -> bool:
expected_message = ["healthcheck", b"healthcheck"]
actual_message = database.client.execute_command('ECHO', "healthcheck")
return actual_message in expected_message

def _dummy_fail(self):
pass
class LagAwareHealthCheck(AbstractHealthCheck):
"""
Health check available for Redis Enterprise deployments.
Verify via REST API that the database is healthy based on different lags.
"""
def __init__(
self,
retry=Retry(retries=DEFAULT_HEALTH_CHECK_RETRIES, backoff=DEFAULT_HEALTH_CHECK_BACKOFF),
rest_api_port: int = 9443,
availability_lag_tolerance: int = 100,
timeout: float = DEFAULT_TIMEOUT,
auth_basic: Optional[Tuple[str, str]] = None,
verify_tls: bool = True,
# TLS verification (server) options
ca_file: Optional[str] = None,
ca_path: Optional[str] = None,
ca_data: Optional[Union[str, bytes]] = None,
# Mutual TLS (client cert) options
client_cert_file: Optional[str] = None,
client_key_file: Optional[str] = None,
client_key_password: Optional[str] = None,
):
"""
Initialize LagAwareHealthCheck with the specified parameters.

Args:
retry: Retry configuration for health checks
rest_api_port: Port number for Redis Enterprise REST API (default: 9443)
availability_lag_tolerance: Maximum acceptable lag in milliseconds (default: 100)
timeout: Request timeout in seconds (default: DEFAULT_TIMEOUT)
auth_basic: Tuple of (username, password) for basic authentication
verify_tls: Whether to verify TLS certificates (default: True)
ca_file: Path to CA certificate file for TLS verification
ca_path: Path to CA certificates directory for TLS verification
ca_data: CA certificate data as string or bytes
client_cert_file: Path to client certificate file for mutual TLS
client_key_file: Path to client private key file for mutual TLS
client_key_password: Password for encrypted client private key
"""
super().__init__(
retry=retry,
)
self._http_client = HttpClient(
timeout=timeout,
auth_basic=auth_basic,
retry=self.retry,
verify_tls=verify_tls,
ca_file=ca_file,
ca_path=ca_path,
ca_data=ca_data,
client_cert_file=client_cert_file,
client_key_file=client_key_file,
client_key_password=client_key_password
)
self._rest_api_port = rest_api_port
self._availability_lag_tolerance = availability_lag_tolerance

def check_health(self, database) -> bool:
client = database.client
db_host = client.get_connection_kwargs()['host']
base_url = f"https://{db_host}:{self._rest_api_port}"
self._http_client.base_url = base_url

# Find bdb matching to the current database host
matching_bdb = None
for bdb in self._http_client.get("/v1/bdbs"):
for endpoint in bdb["endpoints"]:
if endpoint['dns_name'] == db_host:
matching_bdb = bdb
break

# In case if the host was set as public IP
for addr in endpoint['addr']:
if addr == db_host:
matching_bdb = bdb
break

if matching_bdb is None:
logger.warning("LagAwareHealthCheck failed: Couldn't find a matching bdb")
raise ValueError("Could not find a matching bdb")

url = f"/v1/bdbs/{matching_bdb['uid']}/availability?availability_lag_tolerance_ms={self._availability_lag_tolerance}"
self._http_client.get(url, expect_json=False)

# Status checked in an http client, otherwise HttpError will be raised
return True
5 changes: 4 additions & 1 deletion redis/retry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import socket
from time import sleep
from typing import TYPE_CHECKING, Any, Callable, Iterable, Tuple, Type, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Iterable, Tuple, Type, TypeVar, Optional

from redis.exceptions import ConnectionError, TimeoutError

Expand Down Expand Up @@ -73,6 +73,7 @@ def call_with_retry(
self,
do: Callable[[], T],
fail: Callable[[Exception], Any],
is_retryable: Optional[Callable[[Exception], bool]] = None
) -> T:
"""
Execute an operation that might fail and returns its result, or
Expand All @@ -86,6 +87,8 @@ def call_with_retry(
try:
return do()
except self._supported_errors as error:
if is_retryable and not is_retryable(error):
raise
failures += 1
fail(error)
if self._retries >= 0 and failures > self._retries:
Expand Down
6 changes: 6 additions & 0 deletions redis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,9 @@ def truncate_text(txt, max_length=100):
return textwrap.shorten(
text=txt, width=max_length, placeholder="...", break_long_words=True
)

def dummy_fail():
"""
Fake function for a Retry object if you don't need to handle each failure.
"""
pass
Empty file added tests/test_http/__init__.py
Empty file.
Loading