Skip to content

Commit f552cd6

Browse files
authored
Merge branch 'feat/active-active' into vv-lag-aware-hc
2 parents 0d88c78 + fb500f6 commit f552cd6

File tree

7 files changed

+58
-19
lines changed

7 files changed

+58
-19
lines changed

redis/multidb/circuit.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,20 @@ def on_state_changed(self, cb: Callable[["CircuitBreaker", State, State], None])
5151
pass
5252

5353
class PBListener(pybreaker.CircuitBreakerListener):
54+
"""Wrapper for callback to be compatible with pybreaker implementation."""
5455
def __init__(
5556
self,
5657
cb: Callable[[CircuitBreaker, State, State], None],
5758
database,
5859
):
59-
"""Wrapper for callback to be compatible with pybreaker implementation."""
60+
"""
61+
Initialize a PBListener instance.
62+
63+
Args:
64+
cb: Callback function that will be called when the circuit breaker state changes.
65+
database: Database instance associated with this circuit breaker.
66+
"""
67+
6068
self._cb = cb
6169
self._database = database
6270

@@ -70,7 +78,15 @@ def state_change(self, cb, old_state, new_state):
7078

7179
class PBCircuitBreakerAdapter(CircuitBreaker):
7280
def __init__(self, cb: pybreaker.CircuitBreaker):
73-
"""Adapter for pybreaker CircuitBreaker."""
81+
"""
82+
Initialize a PBCircuitBreakerAdapter instance.
83+
84+
This adapter wraps pybreaker's CircuitBreaker implementation to make it compatible
85+
with our CircuitBreaker interface.
86+
87+
Args:
88+
cb: A pybreaker CircuitBreaker instance to be adapted.
89+
"""
7490
self._cb = cb
7591
self._state_pb_mapper = {
7692
State.CLOSED: self._cb.close,

redis/multidb/client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ def _check_databases_health(self, on_error: Callable[[Exception], None] = None):
251251
Runs health checks as a recurring task.
252252
Runs health checks against all databases.
253253
"""
254-
255254
for database, _ in self._databases:
256255
self._check_db_health(database, on_error)
257256

@@ -317,9 +316,11 @@ def pipeline_execute_command(self, *args, **options) -> "Pipeline":
317316
return self
318317

319318
def execute_command(self, *args, **kwargs):
319+
"""Adds a command to the stack"""
320320
return self.pipeline_execute_command(*args, **kwargs)
321321

322322
def execute(self) -> List[Any]:
323+
"""Execute all the commands in the current pipeline"""
323324
if not self._client.initialized:
324325
self._client.initialize()
325326

@@ -330,9 +331,16 @@ def execute(self) -> List[Any]:
330331

331332
class PubSub:
332333
"""
333-
PubSub object for multi database client.
334+
PubSub object for multi-database client.
334335
"""
335336
def __init__(self, client: MultiDBClient, **kwargs):
337+
"""Initialize the PubSub object for a multi-database client.
338+
339+
Args:
340+
client: MultiDBClient instance to use for pub/sub operations
341+
**kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
342+
"""
343+
336344
self._client = client
337345
self._client.command_executor.pubsub(**kwargs)
338346

redis/multidb/command_executor.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,15 @@ def __init__(
103103
auto_fallback_interval: float = DEFAULT_AUTO_FALLBACK_INTERVAL,
104104
):
105105
"""
106-
:param failure_detectors: List of failure detectors.
107-
:param databases: List of databases.
108-
:param failover_strategy: Strategy that defines the failover logic.
109-
:param event_dispatcher: Event dispatcher.
110-
:param auto_fallback_interval: Interval between fallback attempts. Fallback to a new database according to
111-
failover_strategy.
106+
Initialize the DefaultCommandExecutor instance.
107+
108+
Args:
109+
failure_detectors: List of failure detector instances to monitor database health
110+
databases: Collection of available databases to execute commands on
111+
command_retry: Retry policy for failed command execution
112+
failover_strategy: Strategy for handling database failover
113+
event_dispatcher: Interface for dispatching events
114+
auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
112115
"""
113116
for fd in failure_detectors:
114117
fd.set_command_executor(command_executor=self)
@@ -205,6 +208,9 @@ def callback():
205208
return self._execute_with_failure_detection(callback)
206209

207210
def pubsub(self, **kwargs):
211+
"""
212+
Initializes a PubSub object on a currently active database.
213+
"""
208214
def callback():
209215
if self._active_pubsub is None:
210216
self._active_pubsub = self._active_database.client.pubsub(**kwargs)

redis/multidb/database.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,13 @@ def __init__(
7474
state: State = State.DISCONNECTED,
7575
):
7676
"""
77-
param: client: Client instance for communication with the database.
78-
param: circuit: Circuit breaker for the current database.
79-
param: weight: Weight of current database. Database with the highest weight becomes Active.
80-
param: state: State of the current database.
77+
Initialize a new Database instance.
78+
79+
Args:
80+
client: Underlying Redis client instance for database operations
81+
circuit: Circuit breaker for handling database failures
82+
weight: Weight value used for database failover prioritization
83+
state: Initial database state, defaults to DISCONNECTED
8184
"""
8285
self._client = client
8386
self._cb = circuit

redis/multidb/event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def kwargs(self):
3939

4040
class ResubscribeOnActiveDatabaseChanged(EventListenerInterface):
4141
"""
42-
Re-subscribe currently active pub/sub to a new active database.
42+
Re-subscribe the currently active pub / sub to a new active database.
4343
"""
4444
def listen(self, event: ActiveDatabaseChanged):
4545
old_pubsub = event.command_executor.active_pubsub

redis/multidb/failover.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def set_databases(self, databases: Databases) -> None:
2424

2525
class WeightBasedFailoverStrategy(FailoverStrategy):
2626
"""
27-
Choose the active database with the highest weight.
27+
Failover strategy based on database weights.
2828
"""
2929
def __init__(
3030
self,

redis/multidb/failure_detector.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,15 @@ def __init__(
3232
error_types: Optional[List[Type[Exception]]] = None,
3333
) -> None:
3434
"""
35-
:param threshold: Threshold of failed commands over the duration after which database will be marked as failed.
36-
:param duration: Interval in seconds after which database will be marked as failed if threshold was exceeded.
37-
:param error_types: List of exception that has to be registered. By default, all exceptions are registered.
35+
Initialize a new CommandFailureDetector instance.
36+
37+
Args:
38+
threshold: The number of failures that must occur within the duration to trigger failure detection.
39+
duration: The time window in seconds during which failures are counted.
40+
error_types: Optional list of exception types to trigger failover. If None, all exceptions are counted.
41+
42+
The detector tracks command failures within a sliding time window. When the number of failures
43+
exceeds the threshold within the specified duration, it triggers failure detection.
3844
"""
3945
self._command_executor = None
4046
self._threshold = threshold

0 commit comments

Comments
 (0)