Skip to content

Refactored docblocks #3744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions redis/multidb/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,20 @@ def on_state_changed(self, cb: Callable[["CircuitBreaker", State, State], None])
pass

class PBListener(pybreaker.CircuitBreakerListener):
"""Wrapper for callback to be compatible with pybreaker implementation."""
def __init__(
self,
cb: Callable[[CircuitBreaker, State, State], None],
database,
):
"""Wrapper for callback to be compatible with pybreaker implementation."""
"""
Initialize a PBListener instance.

Args:
cb: Callback function that will be called when the circuit breaker state changes.
database: Database instance associated with this circuit breaker.
"""

self._cb = cb
self._database = database

Expand All @@ -70,7 +78,15 @@ def state_change(self, cb, old_state, new_state):

class PBCircuitBreakerAdapter(CircuitBreaker):
def __init__(self, cb: pybreaker.CircuitBreaker):
"""Adapter for pybreaker CircuitBreaker."""
"""
Initialize a PBCircuitBreakerAdapter instance.

This adapter wraps pybreaker's CircuitBreaker implementation to make it compatible
with our CircuitBreaker interface.

Args:
cb: A pybreaker CircuitBreaker instance to be adapted.
"""
self._cb = cb
self._state_pb_mapper = {
State.CLOSED: self._cb.close,
Expand Down
12 changes: 10 additions & 2 deletions redis/multidb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ def _check_databases_health(self, on_error: Callable[[Exception], None] = None):
Runs health checks as a recurring task.
Runs health checks against all databases.
"""

for database, _ in self._databases:
self._check_db_health(database, on_error)

Expand Down Expand Up @@ -313,9 +312,11 @@ def pipeline_execute_command(self, *args, **options) -> "Pipeline":
return self

def execute_command(self, *args, **kwargs):
"""Adds a command to the stack"""
return self.pipeline_execute_command(*args, **kwargs)

def execute(self) -> List[Any]:
"""Execute all the commands in the current pipeline"""
if not self._client.initialized:
self._client.initialize()

Expand All @@ -326,9 +327,16 @@ def execute(self) -> List[Any]:

class PubSub:
"""
PubSub object for multi database client.
PubSub object for multi-database client.
"""
def __init__(self, client: MultiDBClient, **kwargs):
"""Initialize the PubSub object for a multi-database client.

Args:
client: MultiDBClient instance to use for pub/sub operations
**kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
"""

self._client = client
self._client.command_executor.pubsub(**kwargs)

Expand Down
18 changes: 12 additions & 6 deletions redis/multidb/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ def __init__(
auto_fallback_interval: float = DEFAULT_AUTO_FALLBACK_INTERVAL,
):
"""
:param failure_detectors: List of failure detectors.
:param databases: List of databases.
:param failover_strategy: Strategy that defines the failover logic.
:param event_dispatcher: Event dispatcher.
:param auto_fallback_interval: Interval between fallback attempts. Fallback to a new database according to
failover_strategy.
Initialize the DefaultCommandExecutor instance.

Args:
failure_detectors: List of failure detector instances to monitor database health
databases: Collection of available databases to execute commands on
command_retry: Retry policy for failed command execution
failover_strategy: Strategy for handling database failover
event_dispatcher: Interface for dispatching events
auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
"""
for fd in failure_detectors:
fd.set_command_executor(command_executor=self)
Expand Down Expand Up @@ -205,6 +208,9 @@ def callback():
return self._execute_with_failure_detection(callback)

def pubsub(self, **kwargs):
"""
Initializes a PubSub object on a currently active database.
"""
def callback():
if self._active_pubsub is None:
self._active_pubsub = self._active_database.client.pubsub(**kwargs)
Expand Down
11 changes: 7 additions & 4 deletions redis/multidb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ def __init__(
state: State = State.DISCONNECTED,
):
"""
param: client: Client instance for communication with the database.
param: circuit: Circuit breaker for the current database.
param: weight: Weight of current database. Database with the highest weight becomes Active.
param: state: State of the current database.
Initialize a new Database instance.

Args:
client: Underlying Redis client instance for database operations
circuit: Circuit breaker for handling database failures
weight: Weight value used for database failover prioritization
state: Initial database state, defaults to DISCONNECTED
"""
self._client = client
self._cb = circuit
Expand Down
2 changes: 1 addition & 1 deletion redis/multidb/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def kwargs(self):

class ResubscribeOnActiveDatabaseChanged(EventListenerInterface):
"""
Re-subscribe currently active pub/sub to a new active database.
Re-subscribe the currently active pub / sub to a new active database.
"""
def listen(self, event: ActiveDatabaseChanged):
old_pubsub = event.command_executor.active_pubsub
Expand Down
2 changes: 1 addition & 1 deletion redis/multidb/failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def set_databases(self, databases: Databases) -> None:

class WeightBasedFailoverStrategy(FailoverStrategy):
"""
Choose the active database with the highest weight.
Failover strategy based on database weights.
"""
def __init__(
self,
Expand Down
12 changes: 9 additions & 3 deletions redis/multidb/failure_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ def __init__(
error_types: Optional[List[Type[Exception]]] = None,
) -> None:
"""
:param threshold: Threshold of failed commands over the duration after which database will be marked as failed.
:param duration: Interval in seconds after which database will be marked as failed if threshold was exceeded.
:param error_types: List of exception that has to be registered. By default, all exceptions are registered.
Initialize a new CommandFailureDetector instance.

Args:
threshold: The number of failures that must occur within the duration to trigger failure detection.
duration: The time window in seconds during which failures are counted.
error_types: Optional list of exception types to trigger failover. If None, all exceptions are counted.

The detector tracks command failures within a sliding time window. When the number of failures
exceeds the threshold within the specified duration, it triggers failure detection.
"""
self._command_executor = None
self._threshold = threshold
Expand Down