Skip to content

Commit 8c09cbe

Browse files
authored
Added support for Pipeline and transactions (#3707)
* Added Database, Healthcheck, CircuitBreaker, FailureDetector * Added DatabaseSelector, exceptions, refactored existing entities * Added MultiDbConfig * Added DatabaseConfig * Added DatabaseConfig test coverage * Renamed DatabaseSelector into FailoverStrategy * Added CommandExecutor * Updated healthcheck to close circuit on success * Added thread-safeness * Added missing thread-safeness * Added missing thread-safenes for dispatcher * Refactored client to keep databases in WeightedList * Added database CRUD operations * Added on-fly configuration * Added background health checks * Added background healthcheck + half-open event * Refactored background scheduling * Added support for Active-Active pipeline * Refactored healthchecks * Added Pipeline testing * Added support for transactions * Removed code repetitions, fixed weight assignment, added loops enhancement, fixed data structure * Added missing doc blocks * Refactored configuration * Refactored failure detector * Refactored retry logic * Added scenario tests * Added pybreaker optional dependency * Added pybreaker to dev dependencies * Rename tests directory * Added scenario tests for Pipeline and Transaction * Added handling of ConnectionRefusedError, added timeouts so cluster could recover * Increased timeouts * Refactored integration tests * Fixed property name * Removed sentinels * Removed unused method
1 parent e6d90e4 commit 8c09cbe

File tree

7 files changed

+655
-51
lines changed

7 files changed

+655
-51
lines changed

redis/event.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -251,21 +251,21 @@ def nodes(self) -> dict:
251251
def credential_provider(self) -> Union[CredentialProvider, None]:
252252
return self._credential_provider
253253

254-
class OnCommandFailEvent:
254+
class OnCommandsFailEvent:
255255
"""
256256
Event fired whenever a command fails during the execution.
257257
"""
258258
def __init__(
259259
self,
260-
command: tuple,
260+
commands: tuple,
261261
exception: Exception,
262262
):
263-
self._command = command
263+
self._commands = commands
264264
self._exception = exception
265265

266266
@property
267-
def command(self) -> tuple:
268-
return self._command
267+
def commands(self) -> tuple:
268+
return self._commands
269269

270270
@property
271271
def exception(self) -> Exception:

redis/multidb/client.py

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import threading
22
import socket
3-
from typing import Callable
3+
from typing import List, Any, Callable
44

55
from redis.background import BackgroundScheduler
66
from redis.exceptions import ConnectionError, TimeoutError
@@ -30,23 +30,22 @@ def __init__(self, config: MultiDbConfig):
3030
self._failover_strategy.set_databases(self._databases)
3131
self._auto_fallback_interval = config.auto_fallback_interval
3232
self._event_dispatcher = config.event_dispatcher
33-
self._command_executor = DefaultCommandExecutor(
33+
self._command_retry = config.command_retry
34+
self._command_retry.update_supported_errors((ConnectionRefusedError,))
35+
self.command_executor = DefaultCommandExecutor(
3436
failure_detectors=self._failure_detectors,
3537
databases=self._databases,
36-
command_retry=config.command_retry,
38+
command_retry=self._command_retry,
3739
failover_strategy=self._failover_strategy,
3840
event_dispatcher=self._event_dispatcher,
3941
auto_fallback_interval=self._auto_fallback_interval,
4042
)
41-
42-
for fd in self._failure_detectors:
43-
fd.set_command_executor(command_executor=self._command_executor)
44-
45-
self._initialized = False
43+
self.initialized = False
4644
self._hc_lock = threading.RLock()
4745
self._bg_scheduler = BackgroundScheduler()
46+
self._config = config
4847

49-
def _initialize(self):
48+
def initialize(self):
5049
"""
5150
Perform initialization of databases to define their initial state.
5251
"""
@@ -72,7 +71,7 @@ def raise_exception_on_failed_hc(error):
7271
# Set states according to a weights and circuit state
7372
if database.circuit.state == CBState.CLOSED and not is_active_db_found:
7473
database.state = DBState.ACTIVE
75-
self._command_executor.active_database = database
74+
self.command_executor.active_database = database
7675
is_active_db_found = True
7776
elif database.circuit.state == CBState.CLOSED and is_active_db_found:
7877
database.state = DBState.PASSIVE
@@ -82,7 +81,7 @@ def raise_exception_on_failed_hc(error):
8281
if not is_active_db_found:
8382
raise NoValidDatabaseException('Initial connection failed - no active database found')
8483

85-
self._initialized = True
84+
self.initialized = True
8685

8786
def get_databases(self) -> Databases:
8887
"""
@@ -110,7 +109,7 @@ def set_active_database(self, database: AbstractDatabase) -> None:
110109
highest_weighted_db, _ = self._databases.get_top_n(1)[0]
111110
highest_weighted_db.state = DBState.PASSIVE
112111
database.state = DBState.ACTIVE
113-
self._command_executor.active_database = database
112+
self.command_executor.active_database = database
114113
return
115114

116115
raise NoValidDatabaseException('Cannot set active database, database is unhealthy')
@@ -132,7 +131,7 @@ def add_database(self, database: AbstractDatabase):
132131
def _change_active_database(self, new_database: AbstractDatabase, highest_weight_database: AbstractDatabase):
133132
if new_database.weight > highest_weight_database.weight and new_database.circuit.state == CBState.CLOSED:
134133
new_database.state = DBState.ACTIVE
135-
self._command_executor.active_database = new_database
134+
self.command_executor.active_database = new_database
136135
highest_weight_database.state = DBState.PASSIVE
137136

138137
def remove_database(self, database: Database):
@@ -144,7 +143,7 @@ def remove_database(self, database: Database):
144143

145144
if highest_weight <= weight and highest_weighted_db.circuit.state == CBState.CLOSED:
146145
highest_weighted_db.state = DBState.ACTIVE
147-
self._command_executor.active_database = highest_weighted_db
146+
self.command_executor.active_database = highest_weighted_db
148147

149148
def update_database_weight(self, database: AbstractDatabase, weight: float):
150149
"""
@@ -182,10 +181,25 @@ def execute_command(self, *args, **options):
182181
"""
183182
Executes a single command and return its result.
184183
"""
185-
if not self._initialized:
186-
self._initialize()
184+
if not self.initialized:
185+
self.initialize()
186+
187+
return self.command_executor.execute_command(*args, **options)
188+
189+
def pipeline(self):
190+
"""
191+
Enters into pipeline mode of the client.
192+
"""
193+
return Pipeline(self)
187194

188-
return self._command_executor.execute_command(*args, **options)
195+
def transaction(self, func: Callable[["Pipeline"], None], *watches, **options):
196+
"""
197+
Executes callable as transaction.
198+
"""
199+
if not self.initialized:
200+
self.initialize()
201+
202+
return self.command_executor.execute_transaction(func, *watches, *options)
189203

190204
def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Exception], None] = None) -> None:
191205
"""
@@ -207,7 +221,7 @@ def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Excep
207221
database.circuit.state = CBState.OPEN
208222
elif is_healthy and database.circuit.state != CBState.CLOSED:
209223
database.circuit.state = CBState.CLOSED
210-
except (ConnectionError, TimeoutError, socket.timeout) as e:
224+
except (ConnectionError, TimeoutError, socket.timeout, ConnectionRefusedError) as e:
211225
if database.circuit.state != CBState.OPEN:
212226
database.circuit.state = CBState.OPEN
213227
is_healthy = False
@@ -219,7 +233,9 @@ def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Excep
219233
def _check_databases_health(self, on_error: Callable[[Exception], None] = None):
220234
"""
221235
Runs health checks as a recurring task.
236+
Runs health checks against all databases.
222237
"""
238+
223239
for database, _ in self._databases:
224240
self._check_db_health(database, on_error)
225241

@@ -232,4 +248,66 @@ def _on_circuit_state_change_callback(self, circuit: CircuitBreaker, old_state:
232248
self._bg_scheduler.run_once(DEFAULT_GRACE_PERIOD, _half_open_circuit, circuit)
233249

234250
def _half_open_circuit(circuit: CircuitBreaker):
235-
circuit.state = CBState.HALF_OPEN
251+
circuit.state = CBState.HALF_OPEN
252+
253+
254+
class Pipeline(RedisModuleCommands, CoreCommands):
255+
"""
256+
Pipeline implementation for multiple logical Redis databases.
257+
"""
258+
def __init__(self, client: MultiDBClient):
259+
self._command_stack = []
260+
self._client = client
261+
262+
def __enter__(self) -> "Pipeline":
263+
return self
264+
265+
def __exit__(self, exc_type, exc_value, traceback):
266+
self.reset()
267+
268+
def __del__(self):
269+
try:
270+
self.reset()
271+
except Exception:
272+
pass
273+
274+
def __len__(self) -> int:
275+
return len(self._command_stack)
276+
277+
def __bool__(self) -> bool:
278+
"""Pipeline instances should always evaluate to True"""
279+
return True
280+
281+
def reset(self) -> None:
282+
self._command_stack = []
283+
284+
def close(self) -> None:
285+
"""Close the pipeline"""
286+
self.reset()
287+
288+
def pipeline_execute_command(self, *args, **options) -> "Pipeline":
289+
"""
290+
Stage a command to be executed when execute() is next called
291+
292+
Returns the current Pipeline object back so commands can be
293+
chained together, such as:
294+
295+
pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
296+
297+
At some other point, you can then run: pipe.execute(),
298+
which will execute all commands queued in the pipe.
299+
"""
300+
self._command_stack.append((args, options))
301+
return self
302+
303+
def execute_command(self, *args, **kwargs):
304+
return self.pipeline_execute_command(*args, **kwargs)
305+
306+
def execute(self) -> List[Any]:
307+
if not self._client.initialized:
308+
self._client.initialize()
309+
310+
try:
311+
return self._client.command_executor.execute_pipeline(tuple(self._command_stack))
312+
finally:
313+
self.reset()

redis/multidb/command_executor.py

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from abc import ABC, abstractmethod
22
from datetime import datetime, timedelta
3-
from typing import List, Union, Optional
3+
from typing import List, Union, Optional, Callable
44

5-
from redis.event import EventDispatcherInterface, OnCommandFailEvent
5+
from redis.client import Pipeline
6+
from redis.event import EventDispatcherInterface, OnCommandsFailEvent
67
from redis.multidb.config import DEFAULT_AUTO_FALLBACK_INTERVAL
78
from redis.multidb.database import Database, AbstractDatabase, Databases
89
from redis.multidb.circuit import State as CBState
@@ -92,6 +93,9 @@ def __init__(
9293
:param auto_fallback_interval: Interval between fallback attempts. Fallback to a new database according to
9394
failover_strategy.
9495
"""
96+
for fd in failure_detectors:
97+
fd.set_command_executor(command_executor=self)
98+
9599
self._failure_detectors = failure_detectors
96100
self._databases = databases
97101
self._command_retry = command_retry
@@ -139,19 +143,49 @@ def auto_fallback_interval(self, auto_fallback_interval: int) -> None:
139143
self._auto_fallback_interval = auto_fallback_interval
140144

141145
def execute_command(self, *args, **options):
142-
self._check_active_database()
146+
def callback():
147+
return self._active_database.client.execute_command(*args, **options)
148+
149+
return self._execute_with_failure_detection(callback, args)
150+
151+
def execute_pipeline(self, command_stack: tuple):
152+
"""
153+
Executes a stack of commands in pipeline.
154+
"""
155+
def callback():
156+
with self._active_database.client.pipeline() as pipe:
157+
for command, options in command_stack:
158+
pipe.execute_command(*command, **options)
159+
160+
return pipe.execute()
161+
162+
return self._execute_with_failure_detection(callback, command_stack)
163+
164+
def execute_transaction(self, transaction: Callable[[Pipeline], None], *watches, **options):
165+
"""
166+
Executes a transaction block wrapped in callback.
167+
"""
168+
def callback():
169+
return self._active_database.client.transaction(transaction, *watches, **options)
170+
171+
return self._execute_with_failure_detection(callback)
172+
173+
def _execute_with_failure_detection(self, callback: Callable, cmds: tuple = ()):
174+
"""
175+
Execute a commands execution callback with failure detection.
176+
"""
177+
def wrapper():
178+
# On each retry we need to check active database as it might change.
179+
self._check_active_database()
180+
return callback()
143181

144182
return self._command_retry.call_with_retry(
145-
lambda: self._execute_command(*args, **options),
146-
lambda error: self._on_command_fail(error, *args),
183+
lambda: wrapper(),
184+
lambda error: self._on_command_fail(error, *cmds),
147185
)
148186

149-
def _execute_command(self, *args, **options):
150-
self._check_active_database()
151-
return self._active_database.client.execute_command(*args, **options)
152-
153187
def _on_command_fail(self, error, *args):
154-
self._event_dispatcher.dispatch(OnCommandFailEvent(args, error))
188+
self._event_dispatcher.dispatch(OnCommandsFailEvent(args, error))
155189

156190
def _check_active_database(self):
157191
"""
@@ -180,5 +214,5 @@ def _setup_event_dispatcher(self):
180214
"""
181215
event_listener = RegisterCommandFailure(self._failure_detectors)
182216
self._event_dispatcher.register_listeners({
183-
OnCommandFailEvent: [event_listener],
217+
OnCommandsFailEvent: [event_listener],
184218
})

redis/multidb/event.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import List
22

3-
from redis.event import EventListenerInterface, OnCommandFailEvent
3+
from redis.event import EventListenerInterface, OnCommandsFailEvent
44
from redis.multidb.failure_detector import FailureDetector
55

66

@@ -11,6 +11,6 @@ class RegisterCommandFailure(EventListenerInterface):
1111
def __init__(self, failure_detectors: List[FailureDetector]):
1212
self._failure_detectors = failure_detectors
1313

14-
def listen(self, event: OnCommandFailEvent) -> None:
14+
def listen(self, event: OnCommandsFailEvent) -> None:
1515
for failure_detector in self._failure_detectors:
16-
failure_detector.register_failure(event.exception, event.command)
16+
failure_detector.register_failure(event.exception, event.commands)

tests/test_multidb/test_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pybreaker
55
import pytest
66

7-
from redis.event import EventDispatcher, OnCommandFailEvent
7+
from redis.event import EventDispatcher, OnCommandsFailEvent
88
from redis.multidb.circuit import State as CBState, PBCircuitBreakerAdapter
99
from redis.multidb.config import DEFAULT_HEALTH_CHECK_RETRIES, DEFAULT_HEALTH_CHECK_BACKOFF, DEFAULT_FAILOVER_RETRIES, \
1010
DEFAULT_FAILOVER_BACKOFF
@@ -455,8 +455,8 @@ def test_add_new_failure_detector(
455455
mock_fd = mock_multi_db_config.failure_detectors[0]
456456

457457
# Event fired if command against mock_db1 would fail
458-
command_fail_event = OnCommandFailEvent(
459-
command=('SET', 'key', 'value'),
458+
command_fail_event = OnCommandsFailEvent(
459+
commands=('SET', 'key', 'value'),
460460
exception=Exception(),
461461
)
462462

0 commit comments

Comments
 (0)