diff --git a/redis/multidb/circuit.py b/redis/multidb/circuit.py index 9211173c83..79c8a5f379 100644 --- a/redis/multidb/circuit.py +++ b/redis/multidb/circuit.py @@ -51,12 +51,20 @@ def on_state_changed(self, cb: Callable[["CircuitBreaker", State, State], None]) pass class PBListener(pybreaker.CircuitBreakerListener): + """Wrapper for callback to be compatible with pybreaker implementation.""" def __init__( self, cb: Callable[[CircuitBreaker, State, State], None], database, ): - """Wrapper for callback to be compatible with pybreaker implementation.""" + """ + Initialize a PBListener instance. + + Args: + cb: Callback function that will be called when the circuit breaker state changes. + database: Database instance associated with this circuit breaker. + """ + self._cb = cb self._database = database @@ -70,7 +78,15 @@ def state_change(self, cb, old_state, new_state): class PBCircuitBreakerAdapter(CircuitBreaker): def __init__(self, cb: pybreaker.CircuitBreaker): - """Adapter for pybreaker CircuitBreaker.""" + """ + Initialize a PBCircuitBreakerAdapter instance. + + This adapter wraps pybreaker's CircuitBreaker implementation to make it compatible + with our CircuitBreaker interface. + + Args: + cb: A pybreaker CircuitBreaker instance to be adapted. + """ self._cb = cb self._state_pb_mapper = { State.CLOSED: self._cb.close, diff --git a/redis/multidb/client.py b/redis/multidb/client.py index 172017f036..1073ea8168 100644 --- a/redis/multidb/client.py +++ b/redis/multidb/client.py @@ -247,7 +247,6 @@ def _check_databases_health(self, on_error: Callable[[Exception], None] = None): Runs health checks as a recurring task. Runs health checks against all databases. """ - for database, _ in self._databases: self._check_db_health(database, on_error) @@ -313,9 +312,11 @@ def pipeline_execute_command(self, *args, **options) -> "Pipeline": return self def execute_command(self, *args, **kwargs): + """Adds a command to the stack""" return self.pipeline_execute_command(*args, **kwargs) def execute(self) -> List[Any]: + """Execute all the commands in the current pipeline""" if not self._client.initialized: self._client.initialize() @@ -326,9 +327,16 @@ def execute(self) -> List[Any]: class PubSub: """ - PubSub object for multi database client. + PubSub object for multi-database client. """ def __init__(self, client: MultiDBClient, **kwargs): + """Initialize the PubSub object for a multi-database client. + + Args: + client: MultiDBClient instance to use for pub/sub operations + **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation + """ + self._client = client self._client.command_executor.pubsub(**kwargs) diff --git a/redis/multidb/command_executor.py b/redis/multidb/command_executor.py index 795ef8f8b1..40370c2e18 100644 --- a/redis/multidb/command_executor.py +++ b/redis/multidb/command_executor.py @@ -103,12 +103,15 @@ def __init__( auto_fallback_interval: float = DEFAULT_AUTO_FALLBACK_INTERVAL, ): """ - :param failure_detectors: List of failure detectors. - :param databases: List of databases. - :param failover_strategy: Strategy that defines the failover logic. - :param event_dispatcher: Event dispatcher. - :param auto_fallback_interval: Interval between fallback attempts. Fallback to a new database according to - failover_strategy. + Initialize the DefaultCommandExecutor instance. + + Args: + failure_detectors: List of failure detector instances to monitor database health + databases: Collection of available databases to execute commands on + command_retry: Retry policy for failed command execution + failover_strategy: Strategy for handling database failover + event_dispatcher: Interface for dispatching events + auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database """ for fd in failure_detectors: fd.set_command_executor(command_executor=self) @@ -205,6 +208,9 @@ def callback(): return self._execute_with_failure_detection(callback) def pubsub(self, **kwargs): + """ + Initializes a PubSub object on a currently active database. + """ def callback(): if self._active_pubsub is None: self._active_pubsub = self._active_database.client.pubsub(**kwargs) diff --git a/redis/multidb/database.py b/redis/multidb/database.py index 15db52e909..204b7c91f3 100644 --- a/redis/multidb/database.py +++ b/redis/multidb/database.py @@ -74,10 +74,13 @@ def __init__( state: State = State.DISCONNECTED, ): """ - param: client: Client instance for communication with the database. - param: circuit: Circuit breaker for the current database. - param: weight: Weight of current database. Database with the highest weight becomes Active. - param: state: State of the current database. + Initialize a new Database instance. + + Args: + client: Underlying Redis client instance for database operations + circuit: Circuit breaker for handling database failures + weight: Weight value used for database failover prioritization + state: Initial database state, defaults to DISCONNECTED """ self._client = client self._cb = circuit diff --git a/redis/multidb/event.py b/redis/multidb/event.py index 2598bc4d06..7b16d4ba88 100644 --- a/redis/multidb/event.py +++ b/redis/multidb/event.py @@ -39,7 +39,7 @@ def kwargs(self): class ResubscribeOnActiveDatabaseChanged(EventListenerInterface): """ - Re-subscribe currently active pub/sub to a new active database. + Re-subscribe the currently active pub / sub to a new active database. """ def listen(self, event: ActiveDatabaseChanged): old_pubsub = event.command_executor.active_pubsub diff --git a/redis/multidb/failover.py b/redis/multidb/failover.py index a4c825aac1..541f3413dc 100644 --- a/redis/multidb/failover.py +++ b/redis/multidb/failover.py @@ -23,7 +23,7 @@ def set_databases(self, databases: Databases) -> None: class WeightBasedFailoverStrategy(FailoverStrategy): """ - Choose the active database with the highest weight. + Failover strategy based on database weights. """ def __init__( self, diff --git a/redis/multidb/failure_detector.py b/redis/multidb/failure_detector.py index 50f1c839bd..3280fa6c32 100644 --- a/redis/multidb/failure_detector.py +++ b/redis/multidb/failure_detector.py @@ -32,9 +32,15 @@ def __init__( error_types: Optional[List[Type[Exception]]] = None, ) -> None: """ - :param threshold: Threshold of failed commands over the duration after which database will be marked as failed. - :param duration: Interval in seconds after which database will be marked as failed if threshold was exceeded. - :param error_types: List of exception that has to be registered. By default, all exceptions are registered. + Initialize a new CommandFailureDetector instance. + + Args: + threshold: The number of failures that must occur within the duration to trigger failure detection. + duration: The time window in seconds during which failures are counted. + error_types: Optional list of exception types to trigger failover. If None, all exceptions are counted. + + The detector tracks command failures within a sliding time window. When the number of failures + exceeds the threshold within the specified duration, it triggers failure detection. """ self._command_executor = None self._threshold = threshold