Skip to content

Commit e6d90e4

Browse files
authored
MultiDbClient implementation (#3696)
* Added Database, Healthcheck, CircuitBreaker, FailureDetector * Added DatabaseSelector, exceptions, refactored existing entities * Added MultiDbConfig * Added DatabaseConfig * Added DatabaseConfig test coverage * Renamed DatabaseSelector into FailoverStrategy * Added CommandExecutor * Updated healthcheck to close circuit on success * Added thread-safeness * Added missing thread-safeness * Added missing thread-safenes for dispatcher * Refactored client to keep databases in WeightedList * Added database CRUD operations * Added on-fly configuration * Added background health checks * Added background healthcheck + half-open event * Refactored background scheduling * Refactored healthchecks * Removed code repetitions, fixed weight assignment, added loops enhancement, fixed data structure * Refactored configuration * Refactored failure detector * Refactored retry logic * Added scenario tests * Added pybreaker optional dependency * Added pybreaker to dev dependencies * Rename tests directory * Remove redundant checks * Handle retries if default is not set * Removed all Sentinel related
1 parent 513c8d0 commit e6d90e4

35 files changed

+2995
-16
lines changed

dev_requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ uvloop
1414
vulture>=2.3.0
1515
numpy>=1.24.0
1616
redis-entraid==1.0.0
17+
pybreaker>=1.4.0

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ ocsp = [
4242
jwt = [
4343
"PyJWT>=2.9.0",
4444
]
45+
circuit_breaker = [
46+
"pybreaker>=1.4.0"
47+
]
4548

4649
[project.urls]
4750
Changes = "https://github.com/redis/redis-py/releases"

redis/background.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import asyncio
2+
import threading
3+
from typing import Callable
4+
5+
class BackgroundScheduler:
6+
"""
7+
Schedules background tasks execution either in separate thread or in the running event loop.
8+
"""
9+
def __init__(self):
10+
self._next_timer = None
11+
12+
def __del__(self):
13+
if self._next_timer:
14+
self._next_timer.cancel()
15+
16+
def run_once(self, delay: float, callback: Callable, *args):
17+
"""
18+
Runs callable task once after certain delay in seconds.
19+
"""
20+
# Run loop in a separate thread to unblock main thread.
21+
loop = asyncio.new_event_loop()
22+
thread = threading.Thread(
23+
target=_start_event_loop_in_thread,
24+
args=(loop, self._call_later, delay, callback, *args),
25+
daemon=True
26+
)
27+
thread.start()
28+
29+
def run_recurring(
30+
self,
31+
interval: float,
32+
callback: Callable,
33+
*args
34+
):
35+
"""
36+
Runs recurring callable task with given interval in seconds.
37+
"""
38+
# Run loop in a separate thread to unblock main thread.
39+
loop = asyncio.new_event_loop()
40+
41+
thread = threading.Thread(
42+
target=_start_event_loop_in_thread,
43+
args=(loop, self._call_later_recurring, interval, callback, *args),
44+
daemon=True
45+
)
46+
thread.start()
47+
48+
def _call_later(self, loop: asyncio.AbstractEventLoop, delay: float, callback: Callable, *args):
49+
self._next_timer = loop.call_later(delay, callback, *args)
50+
51+
def _call_later_recurring(
52+
self,
53+
loop: asyncio.AbstractEventLoop,
54+
interval: float,
55+
callback: Callable,
56+
*args
57+
):
58+
self._call_later(
59+
loop, interval, self._execute_recurring, loop, interval, callback, *args
60+
)
61+
62+
def _execute_recurring(
63+
self,
64+
loop: asyncio.AbstractEventLoop,
65+
interval: float,
66+
callback: Callable,
67+
*args
68+
):
69+
"""
70+
Executes recurring callable task with given interval in seconds.
71+
"""
72+
callback(*args)
73+
74+
self._call_later(
75+
loop, interval, self._execute_recurring, loop, interval, callback, *args
76+
)
77+
78+
79+
def _start_event_loop_in_thread(event_loop: asyncio.AbstractEventLoop, call_soon_cb: Callable, *args):
80+
"""
81+
Starts event loop in a thread and schedule callback as soon as event loop is ready.
82+
Used to be able to schedule tasks using loop.call_later.
83+
84+
:param event_loop:
85+
:return:
86+
"""
87+
asyncio.set_event_loop(event_loop)
88+
event_loop.call_soon(call_soon_cb, event_loop, *args)
89+
event_loop.run_forever()

redis/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ def _send_command_parse_response(self, conn, command_name, *args, **options):
603603
conn.send_command(*args, **options)
604604
return self.parse_response(conn, command_name, **options)
605605

606-
def _close_connection(self, conn) -> None:
606+
def _close_connection(self, conn, error, *args) -> None:
607607
"""
608608
Close the connection before retrying.
609609
@@ -633,7 +633,7 @@ def _execute_command(self, *args, **options):
633633
lambda: self._send_command_parse_response(
634634
conn, command_name, *args, **options
635635
),
636-
lambda _: self._close_connection(conn),
636+
lambda error: self._close_connection(conn, error, *args),
637637
)
638638
finally:
639639
if self._single_connection_client:

redis/data_structure.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import threading
2+
from typing import List, Any, TypeVar, Generic, Union
3+
4+
from redis.typing import Number
5+
6+
T = TypeVar('T')
7+
8+
class WeightedList(Generic[T]):
9+
"""
10+
Thread-safe weighted list.
11+
"""
12+
def __init__(self):
13+
self._items: List[tuple[Any, Number]] = []
14+
self._lock = threading.RLock()
15+
16+
def add(self, item: Any, weight: float) -> None:
17+
"""Add item with weight, maintaining sorted order"""
18+
with self._lock:
19+
# Find insertion point using binary search
20+
left, right = 0, len(self._items)
21+
while left < right:
22+
mid = (left + right) // 2
23+
if self._items[mid][1] < weight:
24+
right = mid
25+
else:
26+
left = mid + 1
27+
28+
self._items.insert(left, (item, weight))
29+
30+
def remove(self, item):
31+
"""Remove first occurrence of item"""
32+
with self._lock:
33+
for i, (stored_item, weight) in enumerate(self._items):
34+
if stored_item == item:
35+
self._items.pop(i)
36+
return weight
37+
raise ValueError("Item not found")
38+
39+
def get_by_weight_range(self, min_weight: float, max_weight: float) -> List[tuple[Any, Number]]:
40+
"""Get all items within weight range"""
41+
with self._lock:
42+
result = []
43+
for item, weight in self._items:
44+
if min_weight <= weight <= max_weight:
45+
result.append((item, weight))
46+
return result
47+
48+
def get_top_n(self, n: int) -> List[tuple[Any, Number]]:
49+
"""Get top N the highest weighted items"""
50+
with self._lock:
51+
return [(item, weight) for item, weight in self._items[:n]]
52+
53+
def update_weight(self, item, new_weight: float):
54+
with self._lock:
55+
"""Update weight of an item"""
56+
old_weight = self.remove(item)
57+
self.add(item, new_weight)
58+
return old_weight
59+
60+
def __iter__(self):
61+
"""Iterate in descending weight order"""
62+
with self._lock:
63+
items_copy = self._items.copy() # Create snapshot as lock released after each 'yield'
64+
65+
for item, weight in items_copy:
66+
yield item, weight
67+
68+
def __len__(self):
69+
with self._lock:
70+
return len(self._items)
71+
72+
def __getitem__(self, index) -> tuple[Any, Number]:
73+
with self._lock:
74+
item, weight = self._items[index]
75+
return item, weight

redis/event.py

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import threading
33
from abc import ABC, abstractmethod
44
from enum import Enum
5-
from typing import List, Optional, Union
5+
from typing import List, Optional, Union, Dict, Type
66

77
from redis.auth.token import TokenInterface
88
from redis.credentials import CredentialProvider, StreamingCredentialProvider
@@ -42,6 +42,11 @@ def dispatch(self, event: object):
4242
async def dispatch_async(self, event: object):
4343
pass
4444

45+
@abstractmethod
46+
def register_listeners(self, mappings: Dict[Type[object], List[EventListenerInterface]]):
47+
"""Register additional listeners."""
48+
pass
49+
4550

4651
class EventException(Exception):
4752
"""
@@ -56,11 +61,14 @@ def __init__(self, exception: Exception, event: object):
5661

5762
class EventDispatcher(EventDispatcherInterface):
5863
# TODO: Make dispatcher to accept external mappings.
59-
def __init__(self):
64+
def __init__(
65+
self,
66+
event_listeners: Optional[Dict[Type[object], List[EventListenerInterface]]] = None,
67+
):
6068
"""
61-
Mapping should be extended for any new events or listeners to be added.
69+
Dispatcher that dispatches events to listeners associated with given event.
6270
"""
63-
self._event_listeners_mapping = {
71+
self._event_listeners_mapping: Dict[Type[object], List[EventListenerInterface]]= {
6472
AfterConnectionReleasedEvent: [
6573
ReAuthConnectionListener(),
6674
],
@@ -77,17 +85,35 @@ def __init__(self):
7785
],
7886
}
7987

88+
self._lock = threading.Lock()
89+
self._async_lock = asyncio.Lock()
90+
91+
if event_listeners:
92+
self.register_listeners(event_listeners)
93+
8094
def dispatch(self, event: object):
81-
listeners = self._event_listeners_mapping.get(type(event))
95+
with self._lock:
96+
listeners = self._event_listeners_mapping.get(type(event), [])
8297

83-
for listener in listeners:
84-
listener.listen(event)
98+
for listener in listeners:
99+
listener.listen(event)
85100

86101
async def dispatch_async(self, event: object):
87-
listeners = self._event_listeners_mapping.get(type(event))
102+
with self._async_lock:
103+
listeners = self._event_listeners_mapping.get(type(event), [])
104+
105+
for listener in listeners:
106+
await listener.listen(event)
88107

89-
for listener in listeners:
90-
await listener.listen(event)
108+
def register_listeners(self, event_listeners: Dict[Type[object], List[EventListenerInterface]]):
109+
with self._lock:
110+
for event_type in event_listeners:
111+
if event_type in self._event_listeners_mapping:
112+
self._event_listeners_mapping[event_type] = list(
113+
set(self._event_listeners_mapping[event_type] + event_listeners[event_type])
114+
)
115+
else:
116+
self._event_listeners_mapping[event_type] = event_listeners[event_type]
91117

92118

93119
class AfterConnectionReleasedEvent:
@@ -225,6 +251,25 @@ def nodes(self) -> dict:
225251
def credential_provider(self) -> Union[CredentialProvider, None]:
226252
return self._credential_provider
227253

254+
class OnCommandFailEvent:
255+
"""
256+
Event fired whenever a command fails during the execution.
257+
"""
258+
def __init__(
259+
self,
260+
command: tuple,
261+
exception: Exception,
262+
):
263+
self._command = command
264+
self._exception = exception
265+
266+
@property
267+
def command(self) -> tuple:
268+
return self._command
269+
270+
@property
271+
def exception(self) -> Exception:
272+
return self._exception
228273

229274
class ReAuthConnectionListener(EventListenerInterface):
230275
"""

redis/multidb/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)