Skip to content

Added support for Pub/Sub mode in MultiDbClient #3722

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 52 commits into from
Aug 13, 2025
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ac86280
Added Database, Healthcheck, CircuitBreaker, FailureDetector
vladvildanov Jun 13, 2025
4f4a53c
Added DatabaseSelector, exceptions, refactored existing entities
vladvildanov Jun 17, 2025
acc68ef
Added MultiDbConfig
vladvildanov Jun 17, 2025
255bb0e
Added DatabaseConfig
vladvildanov Jun 17, 2025
79db257
Added DatabaseConfig test coverage
vladvildanov Jun 17, 2025
8790db1
Renamed DatabaseSelector into FailoverStrategy
vladvildanov Jun 18, 2025
b3ad8da
Added CommandExecutor
vladvildanov Jun 18, 2025
3a1dc9c
Updated healthcheck to close circuit on success
vladvildanov Jun 18, 2025
9bb9235
Added thread-safeness
vladvildanov Jun 19, 2025
3218e36
Added missing thread-safeness
vladvildanov Jun 19, 2025
4cdb6f4
Added missing thread-safenes for dispatcher
vladvildanov Jun 19, 2025
6914467
Refactored client to keep databases in WeightedList
vladvildanov Jun 19, 2025
5b94757
Added database CRUD operations
vladvildanov Jun 26, 2025
daba501
Added on-fly configuration
vladvildanov Jun 26, 2025
061e518
Added background health checks
vladvildanov Jun 27, 2025
a562774
Added background healthcheck + half-open event
vladvildanov Jul 2, 2025
3ab1367
Refactored background scheduling
vladvildanov Jul 3, 2025
3a55dcd
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov Jul 4, 2025
46afaea
Added support for Active-Active pipeline
vladvildanov Jul 4, 2025
badef0e
Refactored healthchecks
vladvildanov Jul 7, 2025
0cdeebf
Merge branch 'vv-multi-db-client' of github.com:redis/redis-py into v…
vladvildanov Jul 7, 2025
f16b646
Added Pipeline testing
vladvildanov Jul 7, 2025
7e43b40
Added support for transactions
vladvildanov Jul 14, 2025
fcc6035
Removed code repetitions, fixed weight assignment, added loops enhanc…
vladvildanov Jul 15, 2025
7e815ad
Merge branch 'vv-multi-db-client' of github.com:redis/redis-py into v…
vladvildanov Jul 15, 2025
0563024
Added missing doc blocks
vladvildanov Jul 15, 2025
f64e10d
Added support for Pub/Sub in MultiDBClient
vladvildanov Jul 17, 2025
d5dc65c
Refactored configuration
vladvildanov Jul 17, 2025
7086822
Refactored failure detector
vladvildanov Jul 18, 2025
2561d6f
Refactored retry logic
vladvildanov Jul 18, 2025
6b0689a
Merge branch 'vv-multi-db-client' of github.com:redis/redis-py into v…
vladvildanov Jul 18, 2025
e15a38b
Merge branch 'vv-active-active-pipeline' of github.com:redis/redis-py…
vladvildanov Jul 18, 2025
a0af5b3
Added scenario tests
vladvildanov Jul 24, 2025
aaed8d7
Added pybreaker optional dependency
vladvildanov Jul 24, 2025
0551618
Added pybreaker to dev dependencies
vladvildanov Jul 24, 2025
1d288e6
Rename tests directory
vladvildanov Jul 24, 2025
0c644f2
Merge branch 'vv-multi-db-client' of github.com:redis/redis-py into v…
vladvildanov Jul 24, 2025
8922aa8
Added scenario tests for Pipeline and Transaction
vladvildanov Jul 24, 2025
94eff21
Added handling of ConnectionRefusedError, added timeouts so cluster c…
vladvildanov Jul 24, 2025
7fa7c07
Increased timeouts
vladvildanov Jul 24, 2025
2cb8cac
Refactored integration tests
vladvildanov Jul 25, 2025
e76aea3
Merge branch 'vv-active-active-pipeline' of github.com:redis/redis-py…
vladvildanov Jul 29, 2025
2de5d09
Added scenario tests for Pub/Sub
vladvildanov Jul 30, 2025
a7f03c0
Updated healthcheck retry
vladvildanov Jul 30, 2025
0505e0a
Increased timeout to avoid unprepared state before tests
vladvildanov Jul 30, 2025
a7a7b6d
Added backoff retry and changed timeouts
vladvildanov Jul 31, 2025
faa18ae
Added retry for healthchecks to avoid fluctuations
vladvildanov Jul 31, 2025
7d5e957
Changed retry configuration for healthchecks
vladvildanov Jul 31, 2025
ed93cfc
Fixed property name
vladvildanov Jul 31, 2025
bee15d9
Merge branch 'vv-active-active-pipeline' of github.com:redis/redis-py…
vladvildanov Jul 31, 2025
b9d727e
Merge branch 'feat/active-active' of github.com:redis/redis-py into v…
vladvildanov Aug 11, 2025
897dadb
Added check for thread results
vladvildanov Aug 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ uvloop
vulture>=2.3.0
numpy>=1.24.0
redis-entraid==1.0.0
pybreaker>=1.4.0
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ ocsp = [
jwt = [
"PyJWT>=2.9.0",
]
circuit_breaker = [
"pybreaker>=1.4.0"
]

[project.urls]
Changes = "https://github.com/redis/redis-py/releases"
Expand Down
89 changes: 89 additions & 0 deletions redis/background.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
import threading
from typing import Callable

class BackgroundScheduler:
"""
Schedules background tasks execution either in separate thread or in the running event loop.
"""
def __init__(self):
self._next_timer = None

def __del__(self):
if self._next_timer:
self._next_timer.cancel()

def run_once(self, delay: float, callback: Callable, *args):
"""
Runs callable task once after certain delay in seconds.
"""
# Run loop in a separate thread to unblock main thread.
loop = asyncio.new_event_loop()
thread = threading.Thread(
target=_start_event_loop_in_thread,
args=(loop, self._call_later, delay, callback, *args),
daemon=True
)
thread.start()

def run_recurring(
self,
interval: float,
callback: Callable,
*args
):
"""
Runs recurring callable task with given interval in seconds.
"""
# Run loop in a separate thread to unblock main thread.
loop = asyncio.new_event_loop()

thread = threading.Thread(
target=_start_event_loop_in_thread,
args=(loop, self._call_later_recurring, interval, callback, *args),
daemon=True
)
thread.start()

def _call_later(self, loop: asyncio.AbstractEventLoop, delay: float, callback: Callable, *args):
self._next_timer = loop.call_later(delay, callback, *args)

def _call_later_recurring(
self,
loop: asyncio.AbstractEventLoop,
interval: float,
callback: Callable,
*args
):
self._call_later(
loop, interval, self._execute_recurring, loop, interval, callback, *args
)

def _execute_recurring(
self,
loop: asyncio.AbstractEventLoop,
interval: float,
callback: Callable,
*args
):
"""
Executes recurring callable task with given interval in seconds.
"""
callback(*args)

self._call_later(
loop, interval, self._execute_recurring, loop, interval, callback, *args
)


def _start_event_loop_in_thread(event_loop: asyncio.AbstractEventLoop, call_soon_cb: Callable, *args):
"""
Starts event loop in a thread and schedule callback as soon as event loop is ready.
Used to be able to schedule tasks using loop.call_later.

:param event_loop:
:return:
"""
asyncio.set_event_loop(event_loop)
event_loop.call_soon(call_soon_cb, event_loop, *args)
event_loop.run_forever()
8 changes: 5 additions & 3 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ def _send_command_parse_response(self, conn, command_name, *args, **options):
conn.send_command(*args, **options)
return self.parse_response(conn, command_name, **options)

def _close_connection(self, conn) -> None:
def _close_connection(self, conn, error, *args) -> None:
"""
Close the connection before retrying.

Expand Down Expand Up @@ -633,7 +633,7 @@ def _execute_command(self, *args, **options):
lambda: self._send_command_parse_response(
conn, command_name, *args, **options
),
lambda _: self._close_connection(conn),
lambda error: self._close_connection(conn, error, *args),
)
finally:
if self._single_connection_client:
Expand Down Expand Up @@ -1217,6 +1217,7 @@ def run_in_thread(
sleep_time: float = 0.0,
daemon: bool = False,
exception_handler: Optional[Callable] = None,
pubsub = None
) -> "PubSubWorkerThread":
for channel, handler in self.channels.items():
if handler is None:
Expand All @@ -1230,8 +1231,9 @@ def run_in_thread(
f"Shard Channel: '{s_channel}' has no handler registered"
)

pubsub = self if pubsub is None else pubsub
thread = PubSubWorkerThread(
self, sleep_time, daemon=daemon, exception_handler=exception_handler
pubsub, sleep_time, daemon=daemon, exception_handler=exception_handler
)
thread.start()
return thread
Expand Down
75 changes: 75 additions & 0 deletions redis/data_structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import threading
from typing import List, Any, TypeVar, Generic, Union

from redis.typing import Number

T = TypeVar('T')

class WeightedList(Generic[T]):
"""
Thread-safe weighted list.
"""
def __init__(self):
self._items: List[tuple[Any, Number]] = []
self._lock = threading.RLock()

def add(self, item: Any, weight: float) -> None:
"""Add item with weight, maintaining sorted order"""
with self._lock:
# Find insertion point using binary search
left, right = 0, len(self._items)
while left < right:
mid = (left + right) // 2
if self._items[mid][1] < weight:
right = mid
else:
left = mid + 1

self._items.insert(left, (item, weight))

def remove(self, item):
"""Remove first occurrence of item"""
with self._lock:
for i, (stored_item, weight) in enumerate(self._items):
if stored_item == item:
self._items.pop(i)
return weight
raise ValueError("Item not found")

def get_by_weight_range(self, min_weight: float, max_weight: float) -> List[tuple[Any, Number]]:
"""Get all items within weight range"""
with self._lock:
result = []
for item, weight in self._items:
if min_weight <= weight <= max_weight:
result.append((item, weight))
return result

def get_top_n(self, n: int) -> List[tuple[Any, Number]]:
"""Get top N the highest weighted items"""
with self._lock:
return [(item, weight) for item, weight in self._items[:n]]

def update_weight(self, item, new_weight: float):
with self._lock:
"""Update weight of an item"""
old_weight = self.remove(item)
self.add(item, new_weight)
return old_weight

def __iter__(self):
"""Iterate in descending weight order"""
with self._lock:
items_copy = self._items.copy() # Create snapshot as lock released after each 'yield'

for item, weight in items_copy:
yield item, weight

def __len__(self):
with self._lock:
return len(self._items)

def __getitem__(self, index) -> tuple[Any, Number]:
with self._lock:
item, weight = self._items[index]
return item, weight
65 changes: 55 additions & 10 deletions redis/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import threading
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Optional, Union
from typing import List, Optional, Union, Dict, Type

from redis.auth.token import TokenInterface
from redis.credentials import CredentialProvider, StreamingCredentialProvider
Expand Down Expand Up @@ -42,6 +42,11 @@ def dispatch(self, event: object):
async def dispatch_async(self, event: object):
pass

@abstractmethod
def register_listeners(self, mappings: Dict[Type[object], List[EventListenerInterface]]):
"""Register additional listeners."""
pass


class EventException(Exception):
"""
Expand All @@ -56,11 +61,14 @@ def __init__(self, exception: Exception, event: object):

class EventDispatcher(EventDispatcherInterface):
# TODO: Make dispatcher to accept external mappings.
def __init__(self):
def __init__(
self,
event_listeners: Optional[Dict[Type[object], List[EventListenerInterface]]] = None,
):
"""
Mapping should be extended for any new events or listeners to be added.
Dispatcher that dispatches events to listeners associated with given event.
"""
self._event_listeners_mapping = {
self._event_listeners_mapping: Dict[Type[object], List[EventListenerInterface]]= {
AfterConnectionReleasedEvent: [
ReAuthConnectionListener(),
],
Expand All @@ -77,17 +85,35 @@ def __init__(self):
],
}

self._lock = threading.Lock()
self._async_lock = asyncio.Lock()

if event_listeners:
self.register_listeners(event_listeners)

def dispatch(self, event: object):
listeners = self._event_listeners_mapping.get(type(event))
with self._lock:
listeners = self._event_listeners_mapping.get(type(event), [])

for listener in listeners:
listener.listen(event)
for listener in listeners:
listener.listen(event)

async def dispatch_async(self, event: object):
listeners = self._event_listeners_mapping.get(type(event))
with self._async_lock:
listeners = self._event_listeners_mapping.get(type(event), [])

for listener in listeners:
await listener.listen(event)

for listener in listeners:
await listener.listen(event)
def register_listeners(self, event_listeners: Dict[Type[object], List[EventListenerInterface]]):
with self._lock:
for event_type in event_listeners:
if event_type in self._event_listeners_mapping:
self._event_listeners_mapping[event_type] = list(
set(self._event_listeners_mapping[event_type] + event_listeners[event_type])
)
else:
self._event_listeners_mapping[event_type] = event_listeners[event_type]


class AfterConnectionReleasedEvent:
Expand Down Expand Up @@ -225,6 +251,25 @@ def nodes(self) -> dict:
def credential_provider(self) -> Union[CredentialProvider, None]:
return self._credential_provider

class OnCommandsFailEvent:
"""
Event fired whenever a command fails during the execution.
"""
def __init__(
self,
commands: tuple,
exception: Exception,
):
self._commands = commands
self._exception = exception

@property
def commands(self) -> tuple:
return self._commands

@property
def exception(self) -> Exception:
return self._exception

class ReAuthConnectionListener(EventListenerInterface):
"""
Expand Down
Empty file added redis/multidb/__init__.py
Empty file.
Loading