Skip to content

Commit 8cc216c

Browse files
petyaslavovaManelCoutinhoSensei
authored andcommitted
Add dynamic_startup_nodes parameter to async RedisCluster (redis#3646)
1 parent a70b0cd commit 8cc216c

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

redis/asyncio/cluster.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
134134
| Enable read from replicas in READONLY mode and defines the load balancing
135135
strategy that will be used for cluster node selection.
136136
The data read from replicas is eventually consistent with the data in primary nodes.
137+
:param dynamic_startup_nodes:
138+
| Set the RedisCluster's startup nodes to all the discovered nodes.
139+
If true (default value), the cluster's discovered nodes will be used to
140+
determine the cluster nodes-slots mapping in the next topology refresh.
141+
It will remove the initial passed startup nodes if their endpoints aren't
142+
listed in the CLUSTER SLOTS output.
143+
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
144+
specific IP addresses, it is best to set it to false.
137145
:param reinitialize_steps:
138146
| Specifies the number of MOVED errors that need to occur before reinitializing
139147
the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -250,6 +258,7 @@ def __init__(
250258
require_full_coverage: bool = True,
251259
read_from_replicas: bool = False,
252260
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
261+
dynamic_startup_nodes: bool = True,
253262
reinitialize_steps: int = 5,
254263
cluster_error_retry_attempts: int = 3,
255264
max_connections: int = 2**31,
@@ -390,6 +399,7 @@ def __init__(
390399
startup_nodes,
391400
require_full_coverage,
392401
kwargs,
402+
dynamic_startup_nodes=dynamic_startup_nodes,
393403
address_remap=address_remap,
394404
event_dispatcher=self._event_dispatcher,
395405
)
@@ -1164,6 +1174,7 @@ async def _mock(self, error: RedisError):
11641174

11651175
class NodesManager:
11661176
__slots__ = (
1177+
"_dynamic_startup_nodes",
11671178
"_moved_exception",
11681179
"_event_dispatcher",
11691180
"connection_kwargs",
@@ -1181,6 +1192,7 @@ def __init__(
11811192
startup_nodes: List["ClusterNode"],
11821193
require_full_coverage: bool,
11831194
connection_kwargs: Dict[str, Any],
1195+
dynamic_startup_nodes: bool = True,
11841196
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
11851197
event_dispatcher: Optional[EventDispatcher] = None,
11861198
) -> None:
@@ -1193,6 +1205,8 @@ def __init__(
11931205
self.nodes_cache: Dict[str, "ClusterNode"] = {}
11941206
self.slots_cache: Dict[int, List["ClusterNode"]] = {}
11951207
self.read_load_balancer = LoadBalancer()
1208+
1209+
self._dynamic_startup_nodes: bool = dynamic_startup_nodes
11961210
self._moved_exception: MovedError = None
11971211
if event_dispatcher is None:
11981212
self._event_dispatcher = EventDispatcher()
@@ -1435,8 +1449,10 @@ async def initialize(self) -> None:
14351449
# Set the tmp variables to the real variables
14361450
self.slots_cache = tmp_slots
14371451
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
1438-
# Populate the startup nodes with all discovered nodes
1439-
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
1452+
1453+
if self._dynamic_startup_nodes:
1454+
# Populate the startup nodes with all discovered nodes
1455+
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
14401456

14411457
# Set the default node
14421458
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]

tests/test_asyncio/test_cluster.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2723,6 +2723,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
27232723
assert rc.get_node(host=default_host, port=7001) is not None
27242724
assert rc.get_node(host=default_host, port=7002) is not None
27252725

2726+
@pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
2727+
async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
2728+
rc = await get_mocked_redis_client(
2729+
2730+
port=7000,
2731+
cluster_slots=default_cluster_slots,
2732+
dynamic_startup_nodes=dynamic_startup_nodes,
2733+
)
2734+
# Nodes are taken from default_cluster_slots
2735+
discovered_nodes = [
2736+
"127.0.0.1:7000",
2737+
"127.0.0.1:7001",
2738+
"127.0.0.1:7002",
2739+
"127.0.0.1:7003",
2740+
]
2741+
startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
2742+
if dynamic_startup_nodes is True:
2743+
assert sorted(startup_nodes) == sorted(discovered_nodes)
2744+
else:
2745+
assert startup_nodes == ["[email protected]:7000"]
2746+
27262747

27272748
class TestClusterPipeline:
27282749
"""Tests for the ClusterPipeline class."""

0 commit comments

Comments
 (0)