Skip to content

Commit 4a40ee4

Browse files
authored
Added support for Pub/Sub mode in MultiDbClient (#3722)
* Added Database, Healthcheck, CircuitBreaker, FailureDetector * Added DatabaseSelector, exceptions, refactored existing entities * Added MultiDbConfig * Added DatabaseConfig * Added DatabaseConfig test coverage * Renamed DatabaseSelector into FailoverStrategy * Added CommandExecutor * Updated healthcheck to close circuit on success * Added thread-safeness * Added missing thread-safeness * Added missing thread-safenes for dispatcher * Refactored client to keep databases in WeightedList * Added database CRUD operations * Added on-fly configuration * Added background health checks * Added background healthcheck + half-open event * Refactored background scheduling * Added support for Active-Active pipeline * Refactored healthchecks * Added Pipeline testing * Added support for transactions * Removed code repetitions, fixed weight assignment, added loops enhancement, fixed data structure * Added missing doc blocks * Added support for Pub/Sub in MultiDBClient * Refactored configuration * Refactored failure detector * Refactored retry logic * Added scenario tests * Added pybreaker optional dependency * Added pybreaker to dev dependencies * Rename tests directory * Added scenario tests for Pipeline and Transaction * Added handling of ConnectionRefusedError, added timeouts so cluster could recover * Increased timeouts * Refactored integration tests * Added scenario tests for Pub/Sub * Updated healthcheck retry * Increased timeout to avoid unprepared state before tests * Added backoff retry and changed timeouts * Added retry for healthchecks to avoid fluctuations * Changed retry configuration for healthchecks * Fixed property name * Added check for thread results
1 parent 8c09cbe commit 4a40ee4

File tree

7 files changed

+392
-46
lines changed

7 files changed

+392
-46
lines changed

redis/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1217,6 +1217,7 @@ def run_in_thread(
12171217
sleep_time: float = 0.0,
12181218
daemon: bool = False,
12191219
exception_handler: Optional[Callable] = None,
1220+
pubsub = None
12201221
) -> "PubSubWorkerThread":
12211222
for channel, handler in self.channels.items():
12221223
if handler is None:
@@ -1230,8 +1231,9 @@ def run_in_thread(
12301231
f"Shard Channel: '{s_channel}' has no handler registered"
12311232
)
12321233

1234+
pubsub = self if pubsub is None else pubsub
12331235
thread = PubSubWorkerThread(
1234-
self, sleep_time, daemon=daemon, exception_handler=exception_handler
1236+
pubsub, sleep_time, daemon=daemon, exception_handler=exception_handler
12351237
)
12361238
thread.start()
12371239
return thread

redis/multidb/client.py

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import threading
22
import socket
3-
from typing import List, Any, Callable
3+
from typing import List, Any, Callable, Optional
44

55
from redis.background import BackgroundScheduler
6+
from redis.client import PubSubWorkerThread
67
from redis.exceptions import ConnectionError, TimeoutError
78
from redis.commands import RedisModuleCommands, CoreCommands
89
from redis.multidb.command_executor import DefaultCommandExecutor
@@ -201,6 +202,17 @@ def transaction(self, func: Callable[["Pipeline"], None], *watches, **options):
201202

202203
return self.command_executor.execute_transaction(func, *watches, *options)
203204

205+
def pubsub(self, **kwargs):
206+
"""
207+
Return a Publish/Subscribe object. With this object, you can
208+
subscribe to channels and listen for messages that get published to
209+
them.
210+
"""
211+
if not self.initialized:
212+
self.initialize()
213+
214+
return PubSub(self, **kwargs)
215+
204216
def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Exception], None] = None) -> None:
205217
"""
206218
Runs health checks on the given database until first failure.
@@ -311,3 +323,118 @@ def execute(self) -> List[Any]:
311323
return self._client.command_executor.execute_pipeline(tuple(self._command_stack))
312324
finally:
313325
self.reset()
326+
327+
class PubSub:
328+
"""
329+
PubSub object for multi database client.
330+
"""
331+
def __init__(self, client: MultiDBClient, **kwargs):
332+
self._client = client
333+
self._client.command_executor.pubsub(**kwargs)
334+
335+
def __enter__(self) -> "PubSub":
336+
return self
337+
338+
def __exit__(self, exc_type, exc_value, traceback) -> None:
339+
self.reset()
340+
341+
def __del__(self) -> None:
342+
try:
343+
# if this object went out of scope prior to shutting down
344+
# subscriptions, close the connection manually before
345+
# returning it to the connection pool
346+
self.reset()
347+
except Exception:
348+
pass
349+
350+
def reset(self) -> None:
351+
pass
352+
353+
def close(self) -> None:
354+
self.reset()
355+
356+
@property
357+
def subscribed(self) -> bool:
358+
return self._client.command_executor.active_pubsub.subscribed
359+
360+
def psubscribe(self, *args, **kwargs):
361+
"""
362+
Subscribe to channel patterns. Patterns supplied as keyword arguments
363+
expect a pattern name as the key and a callable as the value. A
364+
pattern's callable will be invoked automatically when a message is
365+
received on that pattern rather than producing a message via
366+
``listen()``.
367+
"""
368+
return self._client.command_executor.execute_pubsub_method('psubscribe', *args, **kwargs)
369+
370+
def punsubscribe(self, *args):
371+
"""
372+
Unsubscribe from the supplied patterns. If empty, unsubscribe from
373+
all patterns.
374+
"""
375+
return self._client.command_executor.execute_pubsub_method('punsubscribe', *args)
376+
377+
def subscribe(self, *args, **kwargs):
378+
"""
379+
Subscribe to channels. Channels supplied as keyword arguments expect
380+
a channel name as the key and a callable as the value. A channel's
381+
callable will be invoked automatically when a message is received on
382+
that channel rather than producing a message via ``listen()`` or
383+
``get_message()``.
384+
"""
385+
return self._client.command_executor.execute_pubsub_method('subscribe', *args, **kwargs)
386+
387+
def unsubscribe(self, *args):
388+
"""
389+
Unsubscribe from the supplied channels. If empty, unsubscribe from
390+
all channels
391+
"""
392+
return self._client.command_executor.execute_pubsub_method('unsubscribe', *args)
393+
394+
def ssubscribe(self, *args, **kwargs):
395+
"""
396+
Subscribes the client to the specified shard channels.
397+
Channels supplied as keyword arguments expect a channel name as the key
398+
and a callable as the value. A channel's callable will be invoked automatically
399+
when a message is received on that channel rather than producing a message via
400+
``listen()`` or ``get_sharded_message()``.
401+
"""
402+
return self._client.command_executor.execute_pubsub_method('ssubscribe', *args, **kwargs)
403+
404+
def sunsubscribe(self, *args):
405+
"""
406+
Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
407+
all shard_channels
408+
"""
409+
return self._client.command_executor.execute_pubsub_method('sunsubscribe', *args)
410+
411+
def get_message(
412+
self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
413+
):
414+
"""
415+
Get the next message if one is available, otherwise None.
416+
417+
If timeout is specified, the system will wait for `timeout` seconds
418+
before returning. Timeout should be specified as a floating point
419+
number, or None, to wait indefinitely.
420+
"""
421+
return self._client.command_executor.execute_pubsub_method(
422+
'get_message',
423+
ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
424+
)
425+
426+
get_sharded_message = get_message
427+
428+
def run_in_thread(
429+
self,
430+
sleep_time: float = 0.0,
431+
daemon: bool = False,
432+
exception_handler: Optional[Callable] = None,
433+
) -> "PubSubWorkerThread":
434+
return self._client.command_executor.execute_pubsub_run_in_thread(
435+
sleep_time=sleep_time,
436+
daemon=daemon,
437+
exception_handler=exception_handler,
438+
pubsub=self
439+
)
440+

redis/multidb/command_executor.py

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
from abc import ABC, abstractmethod
22
from datetime import datetime, timedelta
3-
from typing import List, Union, Optional, Callable
3+
from typing import List, Optional, Callable
44

5-
from redis.client import Pipeline
5+
from redis.client import Pipeline, PubSub, PubSubWorkerThread
66
from redis.event import EventDispatcherInterface, OnCommandsFailEvent
77
from redis.multidb.config import DEFAULT_AUTO_FALLBACK_INTERVAL
88
from redis.multidb.database import Database, AbstractDatabase, Databases
99
from redis.multidb.circuit import State as CBState
10-
from redis.multidb.event import RegisterCommandFailure
10+
from redis.multidb.event import RegisterCommandFailure, ActiveDatabaseChanged, ResubscribeOnActiveDatabaseChanged
1111
from redis.multidb.failover import FailoverStrategy
1212
from redis.multidb.failure_detector import FailureDetector
1313
from redis.retry import Retry
@@ -34,7 +34,7 @@ def databases(self) -> Databases:
3434

3535
@property
3636
@abstractmethod
37-
def active_database(self) -> Union[Database, None]:
37+
def active_database(self) -> Optional[Database]:
3838
"""Returns currently active database."""
3939
pass
4040

@@ -44,6 +44,23 @@ def active_database(self, database: AbstractDatabase) -> None:
4444
"""Sets currently active database."""
4545
pass
4646

47+
@abstractmethod
48+
def pubsub(self, **kwargs):
49+
"""Initializes a PubSub object on a currently active database"""
50+
pass
51+
52+
@property
53+
@abstractmethod
54+
def active_pubsub(self) -> Optional[PubSub]:
55+
"""Returns currently active pubsub."""
56+
pass
57+
58+
@active_pubsub.setter
59+
@abstractmethod
60+
def active_pubsub(self, pubsub: PubSub) -> None:
61+
"""Sets currently active pubsub."""
62+
pass
63+
4764
@property
4865
@abstractmethod
4966
def failover_strategy(self) -> FailoverStrategy:
@@ -103,7 +120,9 @@ def __init__(
103120
self._event_dispatcher = event_dispatcher
104121
self._auto_fallback_interval = auto_fallback_interval
105122
self._next_fallback_attempt: datetime
106-
self._active_database: Union[Database, None] = None
123+
self._active_database: Optional[Database] = None
124+
self._active_pubsub: Optional[PubSub] = None
125+
self._active_pubsub_kwargs = {}
107126
self._setup_event_dispatcher()
108127
self._schedule_next_fallback()
109128

@@ -128,8 +147,22 @@ def active_database(self) -> Optional[AbstractDatabase]:
128147

129148
@active_database.setter
130149
def active_database(self, database: AbstractDatabase) -> None:
150+
old_active = self._active_database
131151
self._active_database = database
132152

153+
if old_active is not None and old_active is not database:
154+
self._event_dispatcher.dispatch(
155+
ActiveDatabaseChanged(old_active, self._active_database, self, **self._active_pubsub_kwargs)
156+
)
157+
158+
@property
159+
def active_pubsub(self) -> Optional[PubSub]:
160+
return self._active_pubsub
161+
162+
@active_pubsub.setter
163+
def active_pubsub(self, pubsub: PubSub) -> None:
164+
self._active_pubsub = pubsub
165+
133166
@property
134167
def failover_strategy(self) -> FailoverStrategy:
135168
return self._failover_strategy
@@ -143,6 +176,7 @@ def auto_fallback_interval(self, auto_fallback_interval: int) -> None:
143176
self._auto_fallback_interval = auto_fallback_interval
144177

145178
def execute_command(self, *args, **options):
179+
"""Executes a command and returns the result."""
146180
def callback():
147181
return self._active_database.client.execute_command(*args, **options)
148182

@@ -170,6 +204,39 @@ def callback():
170204

171205
return self._execute_with_failure_detection(callback)
172206

207+
def pubsub(self, **kwargs):
208+
def callback():
209+
if self._active_pubsub is None:
210+
self._active_pubsub = self._active_database.client.pubsub(**kwargs)
211+
self._active_pubsub_kwargs = kwargs
212+
return None
213+
214+
return self._execute_with_failure_detection(callback)
215+
216+
def execute_pubsub_method(self, method_name: str, *args, **kwargs):
217+
"""
218+
Executes given method on active pub/sub.
219+
"""
220+
def callback():
221+
method = getattr(self.active_pubsub, method_name)
222+
return method(*args, **kwargs)
223+
224+
return self._execute_with_failure_detection(callback, *args)
225+
226+
def execute_pubsub_run_in_thread(
227+
self,
228+
pubsub,
229+
sleep_time: float = 0.0,
230+
daemon: bool = False,
231+
exception_handler: Optional[Callable] = None,
232+
) -> "PubSubWorkerThread":
233+
def callback():
234+
return self._active_pubsub.run_in_thread(
235+
sleep_time, daemon=daemon, exception_handler=exception_handler, pubsub=pubsub
236+
)
237+
238+
return self._execute_with_failure_detection(callback)
239+
173240
def _execute_with_failure_detection(self, callback: Callable, cmds: tuple = ()):
174241
"""
175242
Execute a commands execution callback with failure detection.
@@ -199,7 +266,7 @@ def _check_active_database(self):
199266
and self._next_fallback_attempt <= datetime.now()
200267
)
201268
):
202-
self._active_database = self._failover_strategy.database
269+
self.active_database = self._failover_strategy.database
203270
self._schedule_next_fallback()
204271

205272
def _schedule_next_fallback(self) -> None:
@@ -210,9 +277,11 @@ def _schedule_next_fallback(self) -> None:
210277

211278
def _setup_event_dispatcher(self):
212279
"""
213-
Registers command failure event listener.
280+
Registers necessary listeners.
214281
"""
215-
event_listener = RegisterCommandFailure(self._failure_detectors)
282+
failure_listener = RegisterCommandFailure(self._failure_detectors)
283+
resubscribe_listener = ResubscribeOnActiveDatabaseChanged()
216284
self._event_dispatcher.register_listeners({
217-
OnCommandsFailEvent: [event_listener],
285+
OnCommandsFailEvent: [failure_listener],
286+
ActiveDatabaseChanged: [resubscribe_listener],
218287
})

redis/multidb/event.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,58 @@
11
from typing import List
22

33
from redis.event import EventListenerInterface, OnCommandsFailEvent
4+
from redis.multidb.config import Databases
5+
from redis.multidb.database import AbstractDatabase
46
from redis.multidb.failure_detector import FailureDetector
57

8+
class ActiveDatabaseChanged:
9+
"""
10+
Event fired when an active database has been changed.
11+
"""
12+
def __init__(
13+
self,
14+
old_database: AbstractDatabase,
15+
new_database: AbstractDatabase,
16+
command_executor,
17+
**kwargs
18+
):
19+
self._old_database = old_database
20+
self._new_database = new_database
21+
self._command_executor = command_executor
22+
self._kwargs = kwargs
23+
24+
@property
25+
def old_database(self) -> AbstractDatabase:
26+
return self._old_database
27+
28+
@property
29+
def new_database(self) -> AbstractDatabase:
30+
return self._new_database
31+
32+
@property
33+
def command_executor(self):
34+
return self._command_executor
35+
36+
@property
37+
def kwargs(self):
38+
return self._kwargs
39+
40+
class ResubscribeOnActiveDatabaseChanged(EventListenerInterface):
41+
"""
42+
Re-subscribe currently active pub/sub to a new active database.
43+
"""
44+
def listen(self, event: ActiveDatabaseChanged):
45+
old_pubsub = event.command_executor.active_pubsub
46+
47+
if old_pubsub is not None:
48+
# Re-assign old channels and patterns so they will be automatically subscribed on connection.
49+
new_pubsub = event.new_database.client.pubsub(**event.kwargs)
50+
new_pubsub.channels = old_pubsub.channels
51+
new_pubsub.patterns = old_pubsub.patterns
52+
new_pubsub.shard_channels = old_pubsub.shard_channels
53+
new_pubsub.on_connect(None)
54+
event.command_executor.active_pubsub = new_pubsub
55+
old_pubsub.close()
656

757
class RegisterCommandFailure(EventListenerInterface):
858
"""

redis/multidb/healthcheck.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def __init__(
2121
retry: Retry,
2222
) -> None:
2323
self._retry = retry
24+
self._retry.update_supported_errors([ConnectionRefusedError])
2425

2526
@property
2627
def retry(self) -> Retry:

0 commit comments

Comments
 (0)