Skip to content

Commit 6c47c64

Browse files
committed
Updated NodesManager to create shared cache between all nodes
1 parent b434efd commit 6c47c64

File tree

3 files changed

+26
-21
lines changed

3 files changed

+26
-21
lines changed

redis/cluster.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from redis._parsers import CommandsParser, Encoder
1010
from redis._parsers.helpers import parse_scan
1111
from redis.backoff import default_backoff
12-
from redis.cache import CacheConfig, CacheInterface
12+
from redis.cache import CacheConfig, CacheInterface, CacheFactoryInterface, CacheFactory
1313
from redis.client import CaseInsensitiveDict, PubSub, Redis
1414
from redis.commands import READ_COMMANDS, RedisClusterCommands
1515
from redis.commands.helpers import list_or_args
@@ -1327,6 +1327,7 @@ def __init__(
13271327
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
13281328
cache: Optional[CacheInterface] = None,
13291329
cache_config: Optional[CacheConfig] = None,
1330+
cache_factory: Optional[CacheFactoryInterface] = None,
13301331
**kwargs,
13311332
):
13321333
self.nodes_cache = {}
@@ -1339,8 +1340,9 @@ def __init__(
13391340
self._dynamic_startup_nodes = dynamic_startup_nodes
13401341
self.connection_pool_class = connection_pool_class
13411342
self.address_remap = address_remap
1342-
self.cache = cache
1343-
self.cache_config = cache_config
1343+
self._cache = cache
1344+
self._cache_config = cache_config
1345+
self._cache_factory = cache_factory
13441346
self._moved_exception = None
13451347
self.connection_kwargs = kwargs
13461348
self.read_load_balancer = LoadBalancer()
@@ -1484,15 +1486,13 @@ def create_redis_node(self, host, port, **kwargs):
14841486
# Create a redis node with a costumed connection pool
14851487
kwargs.update({"host": host})
14861488
kwargs.update({"port": port})
1487-
kwargs.update({"cache": self.cache})
1488-
kwargs.update({"cache_config": self.cache_config})
1489+
kwargs.update({"cache": self._cache})
14891490
r = Redis(connection_pool=self.connection_pool_class(**kwargs))
14901491
else:
14911492
r = Redis(
14921493
host=host,
14931494
port=port,
1494-
cache=self.cache,
1495-
cache_config=self.cache_config,
1495+
cache=self._cache,
14961496
**kwargs,
14971497
)
14981498
return r
@@ -1624,6 +1624,12 @@ def initialize(self):
16241624
f"one reachable node: {str(exception)}"
16251625
) from exception
16261626

1627+
if self._cache is None and self._cache_config is not None:
1628+
if self._cache_factory is None:
1629+
self._cache = CacheFactory(self._cache_config).get_cache()
1630+
else:
1631+
self._cache = self._cache_factory.get_cache()
1632+
16271633
# Create Redis connections to all nodes
16281634
self.create_redis_connections(list(tmp_nodes_cache.values()))
16291635

redis/connection.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,13 +1468,7 @@ def disconnect(self, inuse_connections: bool = True) -> None:
14681468
for connection in connections:
14691469
connection.disconnect()
14701470

1471-
# Send an event to stop scheduled healthcheck execution.
1472-
if self._hc_cancel_event is not None and not self._hc_cancel_event.is_set():
1473-
self._hc_cancel_event.set()
1474-
1475-
# Joins healthcheck thread on disconnect.
1476-
if self._hc_thread is not None and not self._hc_thread.is_alive():
1477-
self._hc_thread.join()
1471+
self.stop_scheduled_healthcheck()
14781472

14791473
def close(self) -> None:
14801474
"""Close the pool, disconnecting all connections"""
@@ -1496,6 +1490,15 @@ def run_scheduled_healthcheck(self) -> None:
14961490
self._perform_health_check, hc_interval, self._hc_cancel_event
14971491
)
14981492

1493+
def stop_scheduled_healthcheck(self) -> None:
1494+
# Send an event to stop scheduled healthcheck execution.
1495+
if self._hc_cancel_event is not None and not self._hc_cancel_event.is_set():
1496+
self._hc_cancel_event.set()
1497+
1498+
# Joins healthcheck thread on disconnect.
1499+
if self._hc_thread is not None and not self._hc_thread.is_alive():
1500+
self._hc_thread.join()
1501+
14991502
def _perform_health_check(self, done: threading.Event) -> None:
15001503
self._checkpid()
15011504
with self._lock:
@@ -1670,10 +1673,4 @@ def disconnect(self):
16701673
for connection in self._connections:
16711674
connection.disconnect()
16721675

1673-
# Send an event to stop scheduled healthcheck execution.
1674-
if self._hc_cancel_event is not None and not self._hc_cancel_event.is_set():
1675-
self._hc_cancel_event.set()
1676-
1677-
# Joins healthcheck thread on disconnect.
1678-
if self._hc_thread is not None and not self._hc_thread.is_alive():
1679-
self._hc_thread.join()
1676+
self.stop_scheduled_healthcheck()

tests/test_cache.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,8 @@ def test_get_from_cache(self, r):
374374
cache.get(CacheKey(command="GET", redis_keys=("foo",))).cache_value
375375
== b"barbar"
376376
)
377+
# Make sure that cache is shared between nodes.
378+
assert cache == r.nodes_manager.get_node_from_slot(1).redis_connection.get_cache()
377379

378380
@pytest.mark.parametrize(
379381
"r",

0 commit comments

Comments
 (0)