From 1b6837e0a9516e65c76fd571ef9b1e26fc61b120 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Mon, 22 Sep 2025 18:01:54 -0700 Subject: [PATCH 01/19] allocation: create separate limit for frozen tier concurrent rebalance The concurrent rebalance limit prevents too many shard relocations from happening at once. This works well in limiting active shards on the hot tier, so compute for indexing is reserved. Given that the frozen tier is an archive, compute separation is less of a concern and more rebalancing activity can occur without degradation. This change creates a separate variable for managing this. --- .../cluster/routing/RoutingNode.java | 9 ++ .../ConcurrentRebalanceAllocationDecider.java | 73 ++++++++++++-- .../common/settings/ClusterSettings.java | 1 + .../ConcurrentRebalanceRoutingTests.java | 95 ++++++++++++++----- 4 files changed, 143 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 2b5806724c75f..06f743685bd4b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -245,6 +245,15 @@ public Stream shardsWithState(ShardRoutingState state) { return internalGetShardsWithState(state).stream(); } + /** + * Determine the number of shards with a specific state + * @param state ShardRoutingState which should be listed + * @return number of shards + */ + public int shardCountsWithState(ShardRoutingState state) { + return internalGetShardsWithState(state).size(); + } + /** * Determine the shards of an index with a specific state * @param index id of the index diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index deb3e4440f4ab..cd68840bb1810 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -20,13 +21,15 @@ /** * Similar to the {@link ClusterRebalanceAllocationDecider} this * {@link AllocationDecider} controls the number of currently in-progress - * re-balance (relocation) operations and restricts node allocations if the - * configured threshold is reached. The default number of concurrent rebalance - * operations is set to {@code 2} + * re-balance (shard relocation) operations and restricts node allocations + * if the configured threshold is reached. Frozen and non-frozen shards are + * considered separately. The default number of concurrent rebalance operations + * is set to {@code 2} for non-frozen shards, and {@code 10} for frozen shards. *

* Re-balance operations can be controlled in real-time via the cluster update API using - * {@code cluster.routing.allocation.cluster_concurrent_rebalance}. Iff this - * setting is set to {@code -1} the number of concurrent re-balance operations + * {@code cluster.routing.allocation.cluster_concurrent_rebalance} and + * {@code cluster.routing.allocation.cluster_concurrent_frozen_rebalance}. + * Iff either setting is set to {@code -1} the number of concurrent re-balance operations * are unlimited. */ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { @@ -44,18 +47,38 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { ); private volatile int clusterConcurrentRebalance; + /** + * Same as cluster_concurrent_rebalance, but applies separately to frozen tier shards + */ + public static final Setting CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting( + "cluster.routing.allocation.cluster_concurrent_frozen_rebalance", + 10, + -1, + Property.Dynamic, + Property.NodeScope + ); + private volatile int clusterConcurrentFrozenRebalance; + public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) { clusterSettings.initializeAndWatch( CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, this::setClusterConcurrentRebalance ); - logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance); + clusterSettings.initializeAndWatch( + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING, + this::setClusterConcurrentFrozenRebalance + ); + logger.debug("using [cluster_concurrent_rebalance] with [concurrent_rebalance=%d, concurrent_frozen_rebalance=%d]", clusterConcurrentRebalance, clusterConcurrentFrozenRebalance); } private void setClusterConcurrentRebalance(int concurrentRebalance) { clusterConcurrentRebalance = concurrentRebalance; } + private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) { + clusterConcurrentFrozenRebalance = concurrentFrozenRebalance; + } + @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { return canRebalance(allocation); @@ -68,18 +91,25 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca */ @Override public Decision canRebalance(RoutingAllocation allocation) { + int relocatingFrozenShards = countRelocatingFrozenShards(allocation); int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); if (allocation.isSimulating() && relocatingShards >= 2) { // BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2). // (See https://github.com/elastic/elasticsearch/issues/87279) // Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation // it is possible to artificially set above setting to 2 to avoid unnecessary moves in desired balance. + // Separately: keep overall limit in simulation to two including frozen shards return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating"); } - if (clusterConcurrentRebalance == -1) { + boolean normalRelocationUnlimited = clusterConcurrentRebalance == -1; + boolean frozenRelocationUnlimited = clusterConcurrentFrozenRebalance == -1; + if (normalRelocationUnlimited == true && frozenRelocationUnlimited == true) { return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); } - if (relocatingShards >= clusterConcurrentRebalance) { + + // separate into frozen/non-frozen counts + relocatingShards = relocatingShards - relocatingFrozenShards; + if (normalRelocationUnlimited == false && relocatingShards >= clusterConcurrentRebalance) { return allocation.decision( Decision.THROTTLE, NAME, @@ -89,12 +119,35 @@ public Decision canRebalance(RoutingAllocation allocation) { clusterConcurrentRebalance ); } + if (frozenRelocationUnlimited == false && relocatingFrozenShards >= clusterConcurrentFrozenRebalance) { + return allocation.decision( + Decision.THROTTLE, + NAME, + "reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]", + relocatingFrozenShards, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), + clusterConcurrentFrozenRebalance + ); + } return allocation.decision( Decision.YES, NAME, - "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", + "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d], " + + "and threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", clusterConcurrentRebalance, - relocatingShards + relocatingShards, + clusterConcurrentFrozenRebalance, + relocatingFrozenShards ); } + + private int countRelocatingFrozenShards(RoutingAllocation allocation) { + int frozenRelocations = 0; + for (var routingNode : allocation.routingNodes()) { + if (routingNode.node().isDedicatedFrozenNode()) { + frozenRelocations += routingNode.shardCountsWithState(ShardRoutingState.RELOCATING); + } + } + return frozenRelocations; + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 9c2d6fab10368..939229e14f919 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -238,6 +238,7 @@ public void apply(Settings value, Settings current, Settings previous) { BreakerSettings.CIRCUIT_BREAKER_TYPE, ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, + ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index 472df77f1e06e..fa1c3a3cf5c76 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -17,10 +17,15 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; +import static org.elasticsearch.cluster.routing.allocation.DataTier.DATA_FROZEN; +import java.util.Collections; + import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -32,18 +37,36 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase { public void testClusterConcurrentRebalance() { - AllocationService strategy = createAllocationService( - Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 10) - .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) - .build() - ); + boolean testFrozen = randomBoolean(); + AllocationService strategy; + if (testFrozen) { + strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 3) + .build() + ); + } else { + strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) + .build() + ); + } logger.info("Building initial routing table"); - Metadata metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(5).numberOfReplicas(1)) - .build(); + Metadata metadata; + if (testFrozen) { + metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)).numberOfShards(5).numberOfReplicas(1)) + .build(); + } else { + metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(5).numberOfReplicas(1)) + .build(); + } RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) .addAsNew(metadata.getProject().index("test")) @@ -61,9 +84,15 @@ public void testClusterConcurrentRebalance() { } logger.info("start two nodes and fully start the shards"); - clusterState = ClusterState.builder(clusterState) - .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) - .build(); + if (testFrozen) { + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))).add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE)))) + .build(); + } else { + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + } clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { @@ -82,19 +111,35 @@ public void testClusterConcurrentRebalance() { } logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); - clusterState = ClusterState.builder(clusterState) - .nodes( - DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3")) - .add(newNode("node4")) - .add(newNode("node5")) - .add(newNode("node6")) - .add(newNode("node7")) - .add(newNode("node8")) - .add(newNode("node9")) - .add(newNode("node10")) - ) - .build(); + if (testFrozen) { + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node3", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node4", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node5", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node6", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node7", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node8", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node9", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node10", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + ) + .build(); + } else { + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node3")) + .add(newNode("node4")) + .add(newNode("node5")) + .add(newNode("node6")) + .add(newNode("node7")) + .add(newNode("node8")) + .add(newNode("node9")) + .add(newNode("node10")) + ) + .build(); + } clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { From 7f683ba5c6ce344033b8b719f9ccc9586400206e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 23 Sep 2025 01:12:30 +0000 Subject: [PATCH 02/19] [CI] Update transport version definitions --- server/src/main/resources/transport/upper_bounds/8.18.csv | 2 +- server/src/main/resources/transport/upper_bounds/8.19.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.0.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.1.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.2.csv | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index 4eb5140004ea6..ffc592e1809ee 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_6,8840008 +initial_elasticsearch_8_18_8,8840010 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 476468b203875..3cc6f439c5ea5 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_3,8841067 +initial_elasticsearch_8_19_5,8841069 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv index f8f50cc6d7839..8ad2ed1a4cacf 100644 --- a/server/src/main/resources/transport/upper_bounds/9.0.csv +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -1 +1 @@ -initial_elasticsearch_9_0_6,9000015 +initial_elasticsearch_9_0_8,9000017 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 5a65f2e578156..1cea5dc4d929b 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_elasticsearch_9_1_4,9112007 +initial_elasticsearch_9_1_5,9112008 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index e24f914a1d1ca..6e7d51d3d3020 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ml_inference_endpoint_cache,9157000 +security_stats_endpoint,9168000 From 56b76c8edd5709413bc9640f7d925c0482dc9d5f Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 23 Sep 2025 01:20:25 +0000 Subject: [PATCH 03/19] [CI] Auto commit changes from spotless --- .../ConcurrentRebalanceAllocationDecider.java | 8 ++++++-- .../ConcurrentRebalanceRoutingTests.java | 19 +++++++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index cd68840bb1810..83deed568488c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -68,7 +68,11 @@ public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) { CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING, this::setClusterConcurrentFrozenRebalance ); - logger.debug("using [cluster_concurrent_rebalance] with [concurrent_rebalance=%d, concurrent_frozen_rebalance=%d]", clusterConcurrentRebalance, clusterConcurrentFrozenRebalance); + logger.debug( + "using [cluster_concurrent_rebalance] with [concurrent_rebalance=%d, concurrent_frozen_rebalance=%d]", + clusterConcurrentRebalance, + clusterConcurrentFrozenRebalance + ); } private void setClusterConcurrentRebalance(int concurrentRebalance) { @@ -133,7 +137,7 @@ public Decision canRebalance(RoutingAllocation allocation) { Decision.YES, NAME, "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d], " - + "and threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", + + "and threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", clusterConcurrentRebalance, relocatingShards, clusterConcurrentFrozenRebalance, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index fa1c3a3cf5c76..c468dcd4e1618 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -16,14 +16,12 @@ import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodeRole; -import org.elasticsearch.cluster.routing.allocation.DataTier; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; -import static org.elasticsearch.cluster.routing.allocation.DataTier.DATA_FROZEN; import java.util.Collections; import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState; @@ -52,7 +50,7 @@ public void testClusterConcurrentRebalance() { .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) .build() - ); + ); } logger.info("Building initial routing table"); @@ -60,7 +58,12 @@ public void testClusterConcurrentRebalance() { Metadata metadata; if (testFrozen) { metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)).numberOfShards(5).numberOfReplicas(1)) + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) + .numberOfShards(5) + .numberOfReplicas(1) + ) .build(); } else { metadata = Metadata.builder() @@ -86,7 +89,11 @@ public void testClusterConcurrentRebalance() { logger.info("start two nodes and fully start the shards"); if (testFrozen) { clusterState = ClusterState.builder(clusterState) - .nodes(DiscoveryNodes.builder().add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))).add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE)))) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + ) .build(); } else { clusterState = ClusterState.builder(clusterState) From 2ffa4d3bd677d7a174fae0e494acf2d0cba93be1 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 23 Sep 2025 13:28:55 -0700 Subject: [PATCH 04/19] Formatting fixes --- .../decider/ConcurrentRebalanceAllocationDecider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index 83deed568488c..f990de6165afd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -69,7 +69,7 @@ public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) { this::setClusterConcurrentFrozenRebalance ); logger.debug( - "using [cluster_concurrent_rebalance] with [concurrent_rebalance=%d, concurrent_frozen_rebalance=%d]", + "using [cluster_concurrent_rebalance] with [concurrent_rebalance={}, concurrent_frozen_rebalance={}]", clusterConcurrentRebalance, clusterConcurrentFrozenRebalance ); @@ -107,7 +107,7 @@ public Decision canRebalance(RoutingAllocation allocation) { } boolean normalRelocationUnlimited = clusterConcurrentRebalance == -1; boolean frozenRelocationUnlimited = clusterConcurrentFrozenRebalance == -1; - if (normalRelocationUnlimited == true && frozenRelocationUnlimited == true) { + if (normalRelocationUnlimited && frozenRelocationUnlimited) { return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); } From c8810f231d3c68079c0b36dbd893e29c9cf83d9a Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 23 Sep 2025 15:15:15 -0700 Subject: [PATCH 05/19] Adding unlimited test --- .../ConcurrentRebalanceRoutingTests.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index c468dcd4e1618..d02c188f71678 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -183,4 +183,76 @@ public void testClusterConcurrentRebalance() { assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(10)); assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(0)); } + + public void testClusterConcurrentRebalanceFrozenUnlimited() { + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) + .build() + ); + + logger.info("Building initial routing table"); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) + .numberOfShards(5) + .numberOfReplicas(1) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) + .addAsNew(metadata.getProject().index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); + + assertThat(clusterState.routingTable().index("test").size(), equalTo(5)); + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test").shard(i).shard(0).state(), equalTo(UNASSIGNED)); + assertThat(clusterState.routingTable().index("test").shard(i).shard(1).state(), equalTo(UNASSIGNED)); + assertThat(clusterState.routingTable().index("test").shard(i).shard(0).currentNodeId(), nullValue()); + assertThat(clusterState.routingTable().index("test").shard(i).shard(1).currentNodeId(), nullValue()); + } + + logger.info("start two nodes and fully start the shards"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + + logger.info("start all the primary shards, replicas will start initializing"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node3", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node4", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node5", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node6", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node7", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node8", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node9", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node10", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + ) + .build(); + + clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + + logger.info("start the replica shards, rebalancing should start, but without a limit 8 should be rebalancing"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + // we only allow any number of relocations at a time + assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(2)); + assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(8)); + } } From 9638803b7c094642a6ada0b921913e671b866826 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 23 Sep 2025 19:49:58 -0700 Subject: [PATCH 06/19] Addressed or vs. and logic in the context of the allocation algorithm - per-shard decision has been implemented, using the cluster-wide logic from before - cluster-wide decision has been changed to allow progress whenever there is any budget --- .../ConcurrentRebalanceAllocationDecider.java | 89 ++++++++++++++++--- 1 file changed, 77 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index f990de6165afd..2182b8c24c873 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -85,7 +85,51 @@ private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { - return canRebalance(allocation); + if (isFrozenShard(allocation, shardRouting)) { + int relocatingFrozenShards = countRelocatingFrozenShards(allocation); + if (clusterConcurrentFrozenRebalance == -1) { + return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed"); + } + if (relocatingFrozenShards >= clusterConcurrentFrozenRebalance) { + return allocation.decision( + Decision.THROTTLE, + NAME, + "reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]", + relocatingFrozenShards, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), + clusterConcurrentFrozenRebalance + ); + } + return allocation.decision( + Decision.YES, + NAME, + "below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", + clusterConcurrentFrozenRebalance, + relocatingFrozenShards + ); + } else { + int relocatingShards = allocation.routingNodes().getRelocatingShardCount() - countRelocatingFrozenShards(allocation); + if (clusterConcurrentRebalance == -1) { + return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); + } + if (relocatingShards >= clusterConcurrentRebalance) { + return allocation.decision( + Decision.THROTTLE, + NAME, + "reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]", + relocatingShards, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), + clusterConcurrentRebalance + ); + } + return allocation.decision( + Decision.YES, + NAME, + "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", + clusterConcurrentRebalance, + relocatingShards + ); + } } /** @@ -105,15 +149,30 @@ public Decision canRebalance(RoutingAllocation allocation) { // Separately: keep overall limit in simulation to two including frozen shards return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating"); } - boolean normalRelocationUnlimited = clusterConcurrentRebalance == -1; - boolean frozenRelocationUnlimited = clusterConcurrentFrozenRebalance == -1; - if (normalRelocationUnlimited && frozenRelocationUnlimited) { + if (clusterConcurrentRebalance == -1) { return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); } + if (clusterConcurrentFrozenRebalance == -1) { + return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed"); + } // separate into frozen/non-frozen counts relocatingShards = relocatingShards - relocatingFrozenShards; - if (normalRelocationUnlimited == false && relocatingShards >= clusterConcurrentRebalance) { + + // either frozen or non-frozen having some allowance before the combined limit means the allocator has room to rebalance + if (relocatingShards + relocatingFrozenShards < clusterConcurrentRebalance + clusterConcurrentFrozenRebalance) { + return allocation.decision( + Decision.YES, + NAME, + "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d], " + + "or threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", + clusterConcurrentRebalance, + relocatingShards, + clusterConcurrentFrozenRebalance, + relocatingFrozenShards + ); + } + if (relocatingShards >= clusterConcurrentRebalance) { return allocation.decision( Decision.THROTTLE, NAME, @@ -123,7 +182,7 @@ public Decision canRebalance(RoutingAllocation allocation) { clusterConcurrentRebalance ); } - if (frozenRelocationUnlimited == false && relocatingFrozenShards >= clusterConcurrentFrozenRebalance) { + if (relocatingFrozenShards >= clusterConcurrentFrozenRebalance) { return allocation.decision( Decision.THROTTLE, NAME, @@ -133,18 +192,24 @@ public Decision canRebalance(RoutingAllocation allocation) { clusterConcurrentFrozenRebalance ); } + + assert false : "this should logically never be reached, as one of frozen or non-frozen is exceeding the already tested limit"; return allocation.decision( Decision.YES, NAME, - "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d], " - + "and threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", - clusterConcurrentRebalance, - relocatingShards, - clusterConcurrentFrozenRebalance, - relocatingFrozenShards + "unreachable yes" ); } + private boolean isFrozenShard(RoutingAllocation allocation, ShardRouting shard) { + for (var routingNode : allocation.routingNodes()) { + if (routingNode.node().isDedicatedFrozenNode() && routingNode.getByShardId(shard.shardId()) != null) { + return true; + } + } + return false; + } + private int countRelocatingFrozenShards(RoutingAllocation allocation) { int frozenRelocations = 0; for (var routingNode : allocation.routingNodes()) { From 9d111e944517964fd702a0ccc6d2aacc8f7568cf Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 24 Sep 2025 02:58:28 +0000 Subject: [PATCH 07/19] [CI] Auto commit changes from spotless --- .../decider/ConcurrentRebalanceAllocationDecider.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index 2182b8c24c873..3cd1e7a93ff03 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -194,11 +194,7 @@ public Decision canRebalance(RoutingAllocation allocation) { } assert false : "this should logically never be reached, as one of frozen or non-frozen is exceeding the already tested limit"; - return allocation.decision( - Decision.YES, - NAME, - "unreachable yes" - ); + return allocation.decision(Decision.YES, NAME, "unreachable yes"); } private boolean isFrozenShard(RoutingAllocation allocation, ShardRouting shard) { From 3478f43231ac650c50c81af8b15326c4d785028e Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 23 Sep 2025 21:17:57 -0700 Subject: [PATCH 08/19] - Fixed up cluster-wide algorithm for applying frozen tier limits - Implemented count of relocatingShards inside RoutingNodes --- .../cluster/routing/RoutingNodes.java | 26 ++++++++ .../ConcurrentRebalanceAllocationDecider.java | 66 +++++++------------ 2 files changed, 48 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index ea61081d02ebe..c3fc3492b88c3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -88,6 +88,8 @@ public class RoutingNodes implements Iterable { private int relocatingShards = 0; + private int relocatingFrozenShards = 0; + private final Map> attributeValuesByAttribute; private final Map recoveriesPerNode; @@ -152,6 +154,9 @@ private RoutingNodes(GlobalRoutingTable routingTable, DiscoveryNodes discoveryNo assignedShardsAdd(shard); if (shard.relocating()) { relocatingShards++; + if (isFrozenNode(shard.currentNodeId())) { + relocatingFrozenShards++; + } ShardRouting targetShardRouting = shard.getTargetRelocatingShard(); addInitialRecovery(targetShardRouting, indexShard.primary); // LinkedHashMap to preserve order. @@ -192,6 +197,7 @@ private RoutingNodes(RoutingNodes routingNodes) { this.inactivePrimaryCount = routingNodes.inactivePrimaryCount; this.inactiveShardCount = routingNodes.inactiveShardCount; this.relocatingShards = routingNodes.relocatingShards; + this.relocatingFrozenShards = routingNodes.relocatingFrozenShards; this.attributeValuesByAttribute = Collections.synchronizedMap(Maps.copyOf(routingNodes.attributeValuesByAttribute, HashSet::new)); this.recoveriesPerNode = Maps.copyOf(routingNodes.recoveriesPerNode, Recoveries::copy); } @@ -343,6 +349,18 @@ public int getRelocatingShardCount() { return relocatingShards; } + private boolean isFrozenNode(String nodeId) { + RoutingNode node = nodesToShards.get(nodeId); + if (node != null && node.node().isDedicatedFrozenNode()) { + return true; + } + return false; + } + + public int getRelocatingFrozenShardCount() { + return relocatingFrozenShards; + } + /** * Returns all shards that are not in the state UNASSIGNED with the same shard * ID as the given shard. @@ -478,6 +496,9 @@ public Tuple relocateShard( ) { ensureMutable(); relocatingShards++; + if (isFrozenNode(nodeId)) { + relocatingFrozenShards++; + } ShardRouting source = startedShard.relocate(nodeId, expectedShardSize); ShardRouting target = source.getTargetRelocatingShard(); updateAssigned(startedShard, source); @@ -726,6 +747,9 @@ private ShardRouting started(ShardRouting shard, long expectedShardSize) { */ private ShardRouting cancelRelocation(ShardRouting shard) { relocatingShards--; + if (isFrozenNode(shard.currentNodeId())) { + relocatingFrozenShards--; + } ShardRouting cancelledShard = shard.cancelRelocation(); updateAssigned(shard, cancelledShard); return cancelledShard; @@ -881,6 +905,7 @@ public boolean equals(Object o) { && inactivePrimaryCount == that.inactivePrimaryCount && inactiveShardCount == that.inactiveShardCount && relocatingShards == that.relocatingShards + && relocatingFrozenShards == that.relocatingFrozenShards && nodesToShards.equals(that.nodesToShards) && unassignedShards.equals(that.unassignedShards) && assignedShards.equals(that.assignedShards) @@ -898,6 +923,7 @@ public int hashCode() { inactivePrimaryCount, inactiveShardCount, relocatingShards, + relocatingFrozenShards, attributeValuesByAttribute, recoveriesPerNode ); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index 3cd1e7a93ff03..8f55df5df9b4e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -85,8 +85,8 @@ private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { + int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount(); if (isFrozenShard(allocation, shardRouting)) { - int relocatingFrozenShards = countRelocatingFrozenShards(allocation); if (clusterConcurrentFrozenRebalance == -1) { return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed"); } @@ -108,7 +108,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca relocatingFrozenShards ); } else { - int relocatingShards = allocation.routingNodes().getRelocatingShardCount() - countRelocatingFrozenShards(allocation); + int relocatingShards = allocation.routingNodes().getRelocatingShardCount() - relocatingFrozenShards; if (clusterConcurrentRebalance == -1) { return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); } @@ -139,7 +139,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca */ @Override public Decision canRebalance(RoutingAllocation allocation) { - int relocatingFrozenShards = countRelocatingFrozenShards(allocation); + int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount(); int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); if (allocation.isSimulating() && relocatingShards >= 2) { // BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2). @@ -149,70 +149,48 @@ public Decision canRebalance(RoutingAllocation allocation) { // Separately: keep overall limit in simulation to two including frozen shards return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating"); } - if (clusterConcurrentRebalance == -1) { - return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); - } - if (clusterConcurrentFrozenRebalance == -1) { - return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed"); - } // separate into frozen/non-frozen counts relocatingShards = relocatingShards - relocatingFrozenShards; - // either frozen or non-frozen having some allowance before the combined limit means the allocator has room to rebalance - if (relocatingShards + relocatingFrozenShards < clusterConcurrentRebalance + clusterConcurrentFrozenRebalance) { + // either frozen or non-frozen having some allowance before their limit means the allocator has room to rebalance + if (clusterConcurrentRebalance == -1 || relocatingShards < clusterConcurrentRebalance) { return allocation.decision( Decision.YES, NAME, - "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d], " - + "or threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", - clusterConcurrentRebalance, - relocatingShards, - clusterConcurrentFrozenRebalance, - relocatingFrozenShards - ); - } - if (relocatingShards >= clusterConcurrentRebalance) { - return allocation.decision( - Decision.THROTTLE, - NAME, - "reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]", + "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", relocatingShards, CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), clusterConcurrentRebalance ); } - if (relocatingFrozenShards >= clusterConcurrentFrozenRebalance) { + if (clusterConcurrentFrozenRebalance == -1 || relocatingFrozenShards < clusterConcurrentFrozenRebalance) { return allocation.decision( - Decision.THROTTLE, + Decision.YES, NAME, - "reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]", + "below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", relocatingFrozenShards, CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), clusterConcurrentFrozenRebalance ); } - - assert false : "this should logically never be reached, as one of frozen or non-frozen is exceeding the already tested limit"; - return allocation.decision(Decision.YES, NAME, "unreachable yes"); + return allocation.decision( + Decision.THROTTLE, + NAME, + "above threshold [%d] for concurrent rebalances, current rebalance shard count [%d], " + + "and threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", + clusterConcurrentRebalance, + relocatingShards, + clusterConcurrentFrozenRebalance, + relocatingFrozenShards + ); } private boolean isFrozenShard(RoutingAllocation allocation, ShardRouting shard) { - for (var routingNode : allocation.routingNodes()) { - if (routingNode.node().isDedicatedFrozenNode() && routingNode.getByShardId(shard.shardId()) != null) { - return true; - } + String nodeId = shard.currentNodeId(); + if (nodeId != null && allocation.routingNodes().node(nodeId).node().isDedicatedFrozenNode()) { + return true; } return false; } - - private int countRelocatingFrozenShards(RoutingAllocation allocation) { - int frozenRelocations = 0; - for (var routingNode : allocation.routingNodes()) { - if (routingNode.node().isDedicatedFrozenNode()) { - frozenRelocations += routingNode.shardCountsWithState(ShardRoutingState.RELOCATING); - } - } - return frozenRelocations; - } } From ce8accd78d18ba460d2db8cbb14b56d83a844054 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 23 Sep 2025 21:51:13 -0700 Subject: [PATCH 09/19] Adding separate test for frozen --- .../allocation/ConcurrentRebalanceRoutingTests.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index d02c188f71678..9ddc5a1214e46 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -35,7 +35,14 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase { public void testClusterConcurrentRebalance() { - boolean testFrozen = randomBoolean(); + testClusterConcurrentInternal(false); + } + + public void testClusterConcurrentRebalanceFrozen() { + testClusterConcurrentInternal(true); + } + + void testClusterConcurrentInternal(boolean testFrozen) { AllocationService strategy; if (testFrozen) { strategy = createAllocationService( From 1ffda8e87f3ed128fa480373014369fa4e40d800 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 24 Sep 2025 04:58:34 +0000 Subject: [PATCH 10/19] [CI] Auto commit changes from spotless --- .../allocation/decider/ConcurrentRebalanceAllocationDecider.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index 8f55df5df9b4e..5814889db5224 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; From cb754db47e32c798d5e17dfa0f81ce2205b72af2 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Wed, 24 Sep 2025 15:57:29 -0700 Subject: [PATCH 11/19] Check RoutingNode.node() for null in frozenNode test --- .../java/org/elasticsearch/cluster/routing/RoutingNodes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index c3fc3492b88c3..30effb1d1d173 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -351,7 +351,7 @@ public int getRelocatingShardCount() { private boolean isFrozenNode(String nodeId) { RoutingNode node = nodesToShards.get(nodeId); - if (node != null && node.node().isDedicatedFrozenNode()) { + if (node != null && node.node() != null && node.node().isDedicatedFrozenNode()) { return true; } return false; From ddf7eeeb3151f678d314d4fddcfaee20636a49e7 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Wed, 24 Sep 2025 21:28:31 -0700 Subject: [PATCH 12/19] Addressed review/extra feedback - removed extraneous count method in RoutingNode - renamed RoutingNodes.isFrozenNode -> isDedicatedFrozenNode - centralized frozen node check: decider had own - reuse original "reached the limit" log message langauge; this got mangled --- .../cluster/routing/RoutingNode.java | 9 -------- .../cluster/routing/RoutingNodes.java | 8 +++---- .../ConcurrentRebalanceAllocationDecider.java | 22 +++++++------------ 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 06f743685bd4b..2b5806724c75f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -245,15 +245,6 @@ public Stream shardsWithState(ShardRoutingState state) { return internalGetShardsWithState(state).stream(); } - /** - * Determine the number of shards with a specific state - * @param state ShardRoutingState which should be listed - * @return number of shards - */ - public int shardCountsWithState(ShardRoutingState state) { - return internalGetShardsWithState(state).size(); - } - /** * Determine the shards of an index with a specific state * @param index id of the index diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 30effb1d1d173..12e10fc5ae04e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -154,7 +154,7 @@ private RoutingNodes(GlobalRoutingTable routingTable, DiscoveryNodes discoveryNo assignedShardsAdd(shard); if (shard.relocating()) { relocatingShards++; - if (isFrozenNode(shard.currentNodeId())) { + if (isDedicatedFrozenNode(shard.currentNodeId())) { relocatingFrozenShards++; } ShardRouting targetShardRouting = shard.getTargetRelocatingShard(); @@ -349,7 +349,7 @@ public int getRelocatingShardCount() { return relocatingShards; } - private boolean isFrozenNode(String nodeId) { + public boolean isDedicatedFrozenNode(String nodeId) { RoutingNode node = nodesToShards.get(nodeId); if (node != null && node.node() != null && node.node().isDedicatedFrozenNode()) { return true; @@ -496,7 +496,7 @@ public Tuple relocateShard( ) { ensureMutable(); relocatingShards++; - if (isFrozenNode(nodeId)) { + if (isDedicatedFrozenNode(nodeId)) { relocatingFrozenShards++; } ShardRouting source = startedShard.relocate(nodeId, expectedShardSize); @@ -747,7 +747,7 @@ private ShardRouting started(ShardRouting shard, long expectedShardSize) { */ private ShardRouting cancelRelocation(ShardRouting shard) { relocatingShards--; - if (isFrozenNode(shard.currentNodeId())) { + if (isDedicatedFrozenNode(shard.currentNodeId())) { relocatingFrozenShards--; } ShardRouting cancelledShard = shard.cancelRelocation(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index 5814889db5224..635267c762bb6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -85,7 +85,7 @@ private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount(); - if (isFrozenShard(allocation, shardRouting)) { + if (allocation.routingNodes().isDedicatedFrozenNode(shardRouting.currentNodeId())) { if (clusterConcurrentFrozenRebalance == -1) { return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed"); } @@ -176,20 +176,14 @@ public Decision canRebalance(RoutingAllocation allocation) { return allocation.decision( Decision.THROTTLE, NAME, - "above threshold [%d] for concurrent rebalances, current rebalance shard count [%d], " - + "and threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", - clusterConcurrentRebalance, + "reached the limit of concurrently rebalancing shards [%d] for concurrent rebalances, cluster setting [%s=%d], " + + "and [%d] for concurrent frozen rebalances, frozen cluster setting [%s=%d]", relocatingShards, - clusterConcurrentFrozenRebalance, - relocatingFrozenShards + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), + clusterConcurrentRebalance, + relocatingFrozenShards, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), + clusterConcurrentFrozenRebalance ); } - - private boolean isFrozenShard(RoutingAllocation allocation, ShardRouting shard) { - String nodeId = shard.currentNodeId(); - if (nodeId != null && allocation.routingNodes().node(nodeId).node().isDedicatedFrozenNode()) { - return true; - } - return false; - } } From 3dde0a6550b7f5e8d4572057eadf6aefa1cff27d Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Thu, 25 Sep 2025 21:48:35 -0700 Subject: [PATCH 13/19] Adding in mixed frozen and hot test --- .../cluster/ESAllocationTestCase.java | 23 ++- .../DataTierAllocationDeciderTests.java | 174 ++++++++++++++++++ 2 files changed, 195 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 7d12d0e77cb99..fd54abf7cbb24 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -51,6 +51,7 @@ import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.test.ClusterServiceUtils; @@ -138,9 +139,19 @@ public static MockAllocationService createAllocationService( GatewayAllocator gatewayAllocator, ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService + ) { + return createAllocationService(settings, gatewayAllocator, clusterInfoService, snapshotsInfoService, Collections.emptyList()); + } + + public static MockAllocationService createAllocationService( + Settings settings, + GatewayAllocator gatewayAllocator, + ClusterInfoService clusterInfoService, + SnapshotsInfoService snapshotsInfoService, + List clusterPlugins ) { return new MockAllocationService( - randomAllocationDeciders(settings, createBuiltInClusterSettings(settings)), + randomAllocationDeciders(settings, createBuiltInClusterSettings(settings), clusterPlugins), gatewayAllocator, createShardsAllocator(settings), clusterInfoService, @@ -149,8 +160,16 @@ public static MockAllocationService createAllocationService( } public static AllocationDeciders randomAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + return randomAllocationDeciders(settings, clusterSettings, Collections.emptyList()); + } + + public static AllocationDeciders randomAllocationDeciders( + Settings settings, + ClusterSettings clusterSettings, + List clusterPlugins + ) { List deciders = new ArrayList<>( - ClusterModule.createAllocationDeciders(settings, clusterSettings, Collections.emptyList()) + ClusterModule.createAllocationDeciders(settings, clusterSettings, clusterPlugins) ); Collections.shuffle(deciders, random()); return new AllocationDeciders(deciders); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java index 787690d6f55e4..62c7f1d8fcc60 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java @@ -9,9 +9,12 @@ import joptsimple.internal.Strings; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; import org.elasticsearch.cluster.metadata.DesiredNode; import org.elasticsearch.cluster.metadata.DesiredNodeWithStatus; import org.elasticsearch.cluster.metadata.DesiredNodes; @@ -30,6 +33,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -45,6 +49,8 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; +import org.elasticsearch.test.gateway.TestGatewayAllocator; +import org.elasticsearch.xpack.core.XPackPlugin; import java.util.ArrayList; import java.util.Arrays; @@ -57,6 +63,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState; +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.elasticsearch.cluster.routing.allocation.DataTier.DATA_COLD; import static org.elasticsearch.cluster.routing.allocation.DataTier.DATA_FROZEN; import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; @@ -64,6 +75,7 @@ import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class DataTierAllocationDeciderTests extends ESAllocationTestCase { @@ -1027,4 +1039,166 @@ private NodesShutdownMetadata randomRestartInCluster(Set currentNodes) { ); } + public void testClusterConcurrentRebalanceIndependentLimits() { + Settings settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 10) + .build(); + + AllocationService strategy = createAllocationService( + settings, + new TestGatewayAllocator(), + EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES, + Arrays.asList(new XPackPlugin(settings)) + ); + + logger.info("Building initial routing table"); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_HOT)) + .numberOfShards(5) + .numberOfReplicas(1) + ) + .put( + IndexMetadata.builder("test_frozen") + .settings( + settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE) + .put(SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY, true) + ) + .numberOfShards(5) + .numberOfReplicas(1) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) + .addAsNew(metadata.getProject().index("test")) + .addAsNew(metadata.getProject().index("test_frozen")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); + + assertThat(clusterState.routingTable().index("test").size(), equalTo(5)); + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test").shard(i).shard(0).state(), equalTo(UNASSIGNED)); + assertThat(clusterState.routingTable().index("test").shard(i).shard(1).state(), equalTo(UNASSIGNED)); + assertThat(clusterState.routingTable().index("test").shard(i).shard(0).currentNodeId(), nullValue()); + assertThat(clusterState.routingTable().index("test").shard(i).shard(1).currentNodeId(), nullValue()); + } + + assertThat(clusterState.routingTable().index("test_frozen").size(), equalTo(5)); + for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { + assertThat(clusterState.routingTable().index("test_frozen").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).shard(0).state(), equalTo(UNASSIGNED)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).shard(1).state(), equalTo(UNASSIGNED)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).shard(0).currentNodeId(), nullValue()); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).shard(1).currentNodeId(), nullValue()); + } + + logger.info("start two nodes and fully start the shards"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node1_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node2_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { + assertThat(clusterState.routingTable().index("test_frozen").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("start all the primary shards, replicas will start initializing"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { + assertThat(clusterState.routingTable().index("test_frozen").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("now, start 8 more frozen nodes, and check that no rebalancing/relocation have happened"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node3_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node4_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node5_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node6_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node7_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node8_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node9_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node10_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + ) + .build(); + + clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { + assertThat(clusterState.routingTable().index("test_frozen").shard(i).size(), equalTo(2)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(clusterState.routingTable().index("test_frozen").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("start the replica shards, rebalancing should start, but only for frozen nodes"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(shardsWithState(clusterState.getRoutingNodes(), "test", STARTED).size(), equalTo(10)); + assertThat(shardsWithState(clusterState.getRoutingNodes(), "test", RELOCATING).size(), equalTo(0)); + + assertThat(shardsWithState(clusterState.getRoutingNodes(), "test_frozen", STARTED).size(), equalTo(2)); + assertThat(shardsWithState(clusterState.getRoutingNodes(), "test_frozen", RELOCATING).size(), equalTo(8)); + + logger.info("now, start 8 more hot nodes"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node3", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node4", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node5", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node6", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node7", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node8", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node9", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + .add(newNode("node10", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) + ) + .build(); + + logger.info("start the replica shards, rebalancing should start, but only for hot nodes"); + clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(shardsWithState(clusterState.getRoutingNodes(), "test", STARTED).size(), equalTo(7)); + assertThat(shardsWithState(clusterState.getRoutingNodes(), "test", RELOCATING).size(), equalTo(3)); + + assertThat(shardsWithState(clusterState.getRoutingNodes(), "test_frozen", STARTED).size(), equalTo(10)); + assertThat(shardsWithState(clusterState.getRoutingNodes(), "test_frozen", RELOCATING).size(), equalTo(0)); + } } From 66817507ea617b3cecbd07754141d63cbdebca8a Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 7 Oct 2025 23:52:27 -0700 Subject: [PATCH 14/19] Test fixes --- .../ConcurrentRebalanceRoutingTests.java | 271 +++++++++--------- .../DataTierAllocationDeciderTests.java | 149 +++++----- 2 files changed, 209 insertions(+), 211 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index 9ddc5a1214e46..f5887083dd530 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -16,13 +16,19 @@ import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; import java.util.Collections; +import java.util.Set; +import java.util.function.Supplier; import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -35,163 +41,169 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase { public void testClusterConcurrentRebalance() { - testClusterConcurrentInternal(false); + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 1) + .build() + ); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(5).numberOfReplicas(1)) + .build(); + + Supplier nodeFactory = new Supplier() { + int count = 1; + + @Override + public DiscoveryNode get() { + return ESAllocationTestCase.newNode("node" + count++); + } + }; + + testClusterConcurrentInternal(strategy, metadata, nodeFactory); } public void testClusterConcurrentRebalanceFrozen() { - testClusterConcurrentInternal(true); - } + AllocationService strategy = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 3) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 1) + .build() + ); - void testClusterConcurrentInternal(boolean testFrozen) { - AllocationService strategy; - if (testFrozen) { - strategy = createAllocationService( - Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 10) - .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 3) - .build() - ); - } else { - strategy = createAllocationService( - Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 10) - .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) - .build() - ); - } + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) + .numberOfShards(5) + .numberOfReplicas(1) + ) + .build(); - logger.info("Building initial routing table"); + Supplier nodeFactory = new Supplier() { + int count = 1; + Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); - Metadata metadata; - if (testFrozen) { - metadata = Metadata.builder() - .put( - IndexMetadata.builder("test") - .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) - .numberOfShards(5) - .numberOfReplicas(1) - ) - .build(); - } else { - metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(5).numberOfReplicas(1)) - .build(); - } + @Override + public DiscoveryNode get() { + return ESAllocationTestCase.newNode("node" + count++, frozenRole); + } + }; + + testClusterConcurrentInternal(strategy, metadata, nodeFactory); + } + /** + * Run a series of concurrent rebalance checks on an index as nodes are created and the index changes state. + * Index must be named "test" + */ + void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata, Supplier nodeFactory) { RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) .addAsNew(metadata.getProject().index("test")) .build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); - assertThat(clusterState.routingTable().index("test").size(), equalTo(5)); - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(0).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(1).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(0).currentNodeId(), nullValue()); - assertThat(clusterState.routingTable().index("test").shard(i).shard(1).currentNodeId(), nullValue()); + IndexRoutingTable indexRouting = clusterState.routingTable().index("test"); + assertThat(indexRouting.size(), equalTo(5)); + for (int i = 0; i < indexRouting.size(); i++) { + IndexShardRoutingTable shardRouting = indexRouting.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.shard(0).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(1).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(0).currentNodeId(), nullValue()); + assertThat(shardRouting.shard(1).currentNodeId(), nullValue()); } logger.info("start two nodes and fully start the shards"); - if (testFrozen) { - clusterState = ClusterState.builder(clusterState) - .nodes( - DiscoveryNodes.builder() - .add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - ) - .build(); - } else { - clusterState = ClusterState.builder(clusterState) - .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) - .build(); - } + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(nodeFactory.get()).add(nodeFactory.get())) + .build(); clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + indexRouting = clusterState.routingTable().index("test"); + for (int i = 0; i < indexRouting.size(); i++) { + IndexShardRoutingTable shardRouting = indexRouting.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(INITIALIZING)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(UNASSIGNED)); } logger.info("start all the primary shards, replicas will start initializing"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + indexRouting = clusterState.routingTable().index("test"); + for (int i = 0; i < indexRouting.size(); i++) { + IndexShardRoutingTable shardRouting = indexRouting.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(STARTED)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(INITIALIZING)); } logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); - if (testFrozen) { - clusterState = ClusterState.builder(clusterState) - .nodes( - DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node4", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node5", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node6", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node7", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node8", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node9", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node10", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - ) - .build(); - } else { - clusterState = ClusterState.builder(clusterState) - .nodes( - DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3")) - .add(newNode("node4")) - .add(newNode("node5")) - .add(newNode("node6")) - .add(newNode("node7")) - .add(newNode("node8")) - .add(newNode("node9")) - .add(newNode("node10")) - ) - .build(); - } + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + indexRouting = clusterState.routingTable().index("test"); + for (int i = 0; i < indexRouting.size(); i++) { + IndexShardRoutingTable shardRouting = indexRouting.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(STARTED)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(INITIALIZING)); } logger.info("start the replica shards, rebalancing should start, but, only 3 should be rebalancing"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time - assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(7)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(3)); + assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(7)); + assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(3)); logger.info("finalize this session relocation, 3 more should relocate now"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); + routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time - assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(7)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(3)); + assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(7)); + assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(3)); logger.info("finalize this session relocation, 2 more should relocate now"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); + routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time - assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(8)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(2)); + assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(8)); + assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(2)); logger.info("finalize this session relocation, no more relocation"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); + routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time - assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(10)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(0)); + assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(10)); + assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(0)); } public void testClusterConcurrentRebalanceFrozenUnlimited() { + Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + AllocationService strategy = createAllocationService( Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -216,46 +228,43 @@ public void testClusterConcurrentRebalanceFrozenUnlimited() { ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); - assertThat(clusterState.routingTable().index("test").size(), equalTo(5)); + IndexRoutingTable indexRoutingTable = clusterState.routingTable().index("test"); + assertThat(indexRoutingTable.size(), equalTo(5)); for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(0).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(1).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(0).currentNodeId(), nullValue()); - assertThat(clusterState.routingTable().index("test").shard(i).shard(1).currentNodeId(), nullValue()); + IndexShardRoutingTable shardRouting = indexRoutingTable.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.shard(0).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(1).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(0).currentNodeId(), nullValue()); + assertThat(shardRouting.shard(1).currentNodeId(), nullValue()); } logger.info("start two nodes and fully start the shards"); clusterState = ClusterState.builder(clusterState) - .nodes( - DiscoveryNodes.builder() - .add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - ) + .nodes(DiscoveryNodes.builder().add(newNode("node1", frozenRole)).add(newNode("node2", frozenRole))) .build(); clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); logger.info("start all the primary shards, replicas will start initializing"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); - logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); - clusterState = ClusterState.builder(clusterState) - .nodes( - DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node4", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node5", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node6", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node7", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node8", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node9", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node10", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - ) - .build(); + var clusterStateBuilder = ClusterState.builder(clusterState); + + int nodeCount = randomIntBetween(8, 20); + var nodeBuilder = DiscoveryNodes.builder(clusterStateBuilder.nodes()); + + logger.info("now, start " + nodeCount + " more nodes, and check that no rebalancing/relocation have happened"); + + for (int i = 0; i < nodeCount; i++) { + int nodeId = 3 + i; + nodeBuilder = nodeBuilder.add(newNode("node" + nodeId, frozenRole)); + } + + clusterState = clusterStateBuilder.nodes(nodeBuilder).build(); clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); - logger.info("start the replica shards, rebalancing should start, but without a limit 8 should be rebalancing"); + logger.info("start the replica shards, rebalancing should start, but with a limit " + nodeCount + " should be rebalancing"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); // we only allow any number of relocations at a time diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java index 62c7f1d8fcc60..3caffe6ccb6b3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java @@ -28,7 +28,9 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GlobalRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodesHelper; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -1040,10 +1042,13 @@ private NodesShutdownMetadata randomRestartInCluster(Set currentNodes) { } public void testClusterConcurrentRebalanceIndependentLimits() { + final Set hotRole = Collections.singleton(DiscoveryNodeRole.DATA_HOT_NODE_ROLE); + final Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + Settings settings = Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) - .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 10) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 7) .build(); AllocationService strategy = createAllocationService( @@ -1081,124 +1086,108 @@ public void testClusterConcurrentRebalanceIndependentLimits() { ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); - assertThat(clusterState.routingTable().index("test").size(), equalTo(5)); + IndexRoutingTable index = clusterState.routingTable().index("test"); + assertThat(index.size(), equalTo(5)); for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(0).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(1).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(0).currentNodeId(), nullValue()); - assertThat(clusterState.routingTable().index("test").shard(i).shard(1).currentNodeId(), nullValue()); + IndexShardRoutingTable shardRouting = index.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.shard(0).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(1).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(0).currentNodeId(), nullValue()); + assertThat(shardRouting.shard(1).currentNodeId(), nullValue()); } - assertThat(clusterState.routingTable().index("test_frozen").size(), equalTo(5)); + IndexRoutingTable frozenIndex = clusterState.routingTable().index("test_frozen"); + assertThat(frozenIndex.size(), equalTo(5)); for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { - assertThat(clusterState.routingTable().index("test_frozen").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).shard(0).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).shard(1).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).shard(0).currentNodeId(), nullValue()); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).shard(1).currentNodeId(), nullValue()); + IndexShardRoutingTable shardRouting = frozenIndex.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.shard(0).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(1).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(0).currentNodeId(), nullValue()); + assertThat(shardRouting.shard(1).currentNodeId(), nullValue()); } logger.info("start two nodes and fully start the shards"); clusterState = ClusterState.builder(clusterState) .nodes( DiscoveryNodes.builder() - .add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node1_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node2_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node1", hotRole)) + .add(newNode("node2", hotRole)) + .add(newNode("node1_frozen", frozenRole)) + .add(newNode("node2_frozen", frozenRole)) ) .build(); clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + index = clusterState.routingTable().index("test"); for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + IndexShardRoutingTable shardRouting = index.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(INITIALIZING)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(UNASSIGNED)); } + frozenIndex = clusterState.routingTable().index("test_frozen"); for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { - assertThat(clusterState.routingTable().index("test_frozen").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).primaryShard().state(), equalTo(INITIALIZING)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + IndexShardRoutingTable shardRouting = frozenIndex.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(INITIALIZING)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(UNASSIGNED)); } logger.info("start all the primary shards, replicas will start initializing"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); + index = clusterState.routingTable().index("test"); for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + IndexShardRoutingTable shardRouting = index.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(STARTED)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(INITIALIZING)); } + frozenIndex = clusterState.routingTable().index("test_frozen"); for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { - assertThat(clusterState.routingTable().index("test_frozen").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + IndexShardRoutingTable shardRouting = frozenIndex.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(STARTED)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(INITIALIZING)); } - logger.info("now, start 8 more frozen nodes, and check that no rebalancing/relocation have happened"); + logger.info("now, start 8 more frozen and hot nodes"); clusterState = ClusterState.builder(clusterState) .nodes( DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node4_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node5_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node6_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node7_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node8_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node9_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) - .add(newNode("node10_frozen", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) + .add(newNode("node3", hotRole)) + .add(newNode("node4", hotRole)) + .add(newNode("node5", hotRole)) + .add(newNode("node6", hotRole)) + .add(newNode("node7", hotRole)) + .add(newNode("node8", hotRole)) + .add(newNode("node9", hotRole)) + .add(newNode("node10", hotRole)) + .add(newNode("node3_frozen", frozenRole)) + .add(newNode("node4_frozen", frozenRole)) + .add(newNode("node5_frozen", frozenRole)) + .add(newNode("node6_frozen", frozenRole)) + .add(newNode("node7_frozen", frozenRole)) + .add(newNode("node8_frozen", frozenRole)) + .add(newNode("node9_frozen", frozenRole)) + .add(newNode("node10_frozen", frozenRole)) ) .build(); clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); - - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); - } - - for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { - assertThat(clusterState.routingTable().index("test_frozen").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test_frozen").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); - } - - logger.info("start the replica shards, rebalancing should start, but only for frozen nodes"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); - assertThat(shardsWithState(clusterState.getRoutingNodes(), "test", STARTED).size(), equalTo(10)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), "test", RELOCATING).size(), equalTo(0)); - - assertThat(shardsWithState(clusterState.getRoutingNodes(), "test_frozen", STARTED).size(), equalTo(2)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), "test_frozen", RELOCATING).size(), equalTo(8)); - - logger.info("now, start 8 more hot nodes"); - clusterState = ClusterState.builder(clusterState) - .nodes( - DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node4", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node5", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node6", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node7", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node8", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node9", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - .add(newNode("node10", Collections.singleton(DiscoveryNodeRole.DATA_ROLE))) - ) - .build(); - - logger.info("start the replica shards, rebalancing should start, but only for hot nodes"); - clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + logger.info("Hot should be able to relocate its max of 3 shards, and frozen its max of 7 shards"); - assertThat(shardsWithState(clusterState.getRoutingNodes(), "test", STARTED).size(), equalTo(7)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), "test", RELOCATING).size(), equalTo(3)); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + assertThat(shardsWithState(routingNodes, "test", STARTED).size(), equalTo(7)); + assertThat(shardsWithState(routingNodes, "test", RELOCATING).size(), equalTo(3)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), "test_frozen", STARTED).size(), equalTo(10)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), "test_frozen", RELOCATING).size(), equalTo(0)); + assertThat(shardsWithState(routingNodes, "test_frozen", STARTED).size(), equalTo(3)); + assertThat(shardsWithState(routingNodes, "test_frozen", RELOCATING).size(), equalTo(7)); } } From 03fbb83cdb30f9241c8feb20b4ad7d84c250fb81 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Wed, 8 Oct 2025 17:25:03 -0700 Subject: [PATCH 15/19] Set default concurrent rebalance for frozen to 2 --- .../decider/ConcurrentRebalanceAllocationDecider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index 635267c762bb6..b392716ffbeb1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -51,7 +51,7 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { */ public static final Setting CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting( "cluster.routing.allocation.cluster_concurrent_frozen_rebalance", - 10, + 2, -1, Property.Dynamic, Property.NodeScope From 05cbcf2587deb0bdec2b700fd11019c438a64c12 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Wed, 8 Oct 2025 17:41:33 -0700 Subject: [PATCH 16/19] Default to normal setting --- .../decider/ConcurrentRebalanceAllocationDecider.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index b392716ffbeb1..bbdba44c6b61f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -48,10 +48,12 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { /** * Same as cluster_concurrent_rebalance, but applies separately to frozen tier shards + * + * Defaults to the same value as normal concurrent rebalance, if unspecified */ public static final Setting CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting( "cluster.routing.allocation.cluster_concurrent_frozen_rebalance", - 2, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, -1, Property.Dynamic, Property.NodeScope From 84bc7260fa8292ba9b8987d1f4aa9459fa09feec Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Mon, 13 Oct 2025 00:27:42 -0700 Subject: [PATCH 17/19] Renaming strategy to allocationService --- .../ConcurrentRebalanceRoutingTests.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index f5887083dd530..9547aeb7516e3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -41,7 +41,7 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase { public void testClusterConcurrentRebalance() { - AllocationService strategy = createAllocationService( + AllocationService allocationService = createAllocationService( Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) @@ -62,11 +62,11 @@ public DiscoveryNode get() { } }; - testClusterConcurrentInternal(strategy, metadata, nodeFactory); + testClusterConcurrentInternal(allocationService, metadata, nodeFactory); } public void testClusterConcurrentRebalanceFrozen() { - AllocationService strategy = createAllocationService( + AllocationService allocationService = createAllocationService( Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 3) @@ -93,14 +93,14 @@ public DiscoveryNode get() { } }; - testClusterConcurrentInternal(strategy, metadata, nodeFactory); + testClusterConcurrentInternal(allocationService, metadata, nodeFactory); } /** * Run a series of concurrent rebalance checks on an index as nodes are created and the index changes state. * Index must be named "test" */ - void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata, Supplier nodeFactory) { + void testClusterConcurrentInternal(AllocationService allocationService, Metadata metadata, Supplier nodeFactory) { RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) .addAsNew(metadata.getProject().index("test")) .build(); @@ -122,7 +122,7 @@ void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata clusterState = ClusterState.builder(clusterState) .nodes(DiscoveryNodes.builder().add(nodeFactory.get()).add(nodeFactory.get())) .build(); - clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); indexRouting = clusterState.routingTable().index("test"); for (int i = 0; i < indexRouting.size(); i++) { @@ -133,7 +133,7 @@ void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata } logger.info("start all the primary shards, replicas will start initializing"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); indexRouting = clusterState.routingTable().index("test"); for (int i = 0; i < indexRouting.size(); i++) { @@ -158,7 +158,7 @@ void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata ) .build(); - clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); indexRouting = clusterState.routingTable().index("test"); for (int i = 0; i < indexRouting.size(); i++) { @@ -169,7 +169,7 @@ void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata } logger.info("start the replica shards, rebalancing should start, but, only 3 should be rebalancing"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); RoutingNodes routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time @@ -177,7 +177,7 @@ void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(3)); logger.info("finalize this session relocation, 3 more should relocate now"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time @@ -185,7 +185,7 @@ void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(3)); logger.info("finalize this session relocation, 2 more should relocate now"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time @@ -193,7 +193,7 @@ void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(2)); logger.info("finalize this session relocation, no more relocation"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time @@ -204,7 +204,7 @@ void testClusterConcurrentInternal(AllocationService strategy, Metadata metadata public void testClusterConcurrentRebalanceFrozenUnlimited() { Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); - AllocationService strategy = createAllocationService( + AllocationService allocationService = createAllocationService( Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) @@ -243,10 +243,10 @@ public void testClusterConcurrentRebalanceFrozenUnlimited() { clusterState = ClusterState.builder(clusterState) .nodes(DiscoveryNodes.builder().add(newNode("node1", frozenRole)).add(newNode("node2", frozenRole))) .build(); - clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); logger.info("start all the primary shards, replicas will start initializing"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); var clusterStateBuilder = ClusterState.builder(clusterState); @@ -262,10 +262,10 @@ public void testClusterConcurrentRebalanceFrozenUnlimited() { clusterState = clusterStateBuilder.nodes(nodeBuilder).build(); - clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); logger.info("start the replica shards, rebalancing should start, but with a limit " + nodeCount + " should be rebalancing"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); // we only allow any number of relocations at a time assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(2)); From c798ad8c3b53578cd15b8a597864d336a1943384 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Mon, 13 Oct 2025 00:48:19 -0700 Subject: [PATCH 18/19] Fixed up decision message formatting, and added test --- .../ConcurrentRebalanceAllocationDecider.java | 12 +- ...urrentRebalanceAllocationDeciderTests.java | 330 ++++++++++++++++++ 2 files changed, 336 insertions(+), 6 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index bbdba44c6b61f..cbc515cb82a3b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -159,20 +159,20 @@ public Decision canRebalance(RoutingAllocation allocation) { return allocation.decision( Decision.YES, NAME, - "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", - relocatingShards, + "below threshold [%s=%d] for concurrent rebalances, current rebalance shard count [%d]", CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), - clusterConcurrentRebalance + clusterConcurrentRebalance, + relocatingShards ); } if (clusterConcurrentFrozenRebalance == -1 || relocatingFrozenShards < clusterConcurrentFrozenRebalance) { return allocation.decision( Decision.YES, NAME, - "below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", - relocatingFrozenShards, + "below threshold [%s=%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), - clusterConcurrentFrozenRebalance + clusterConcurrentFrozenRebalance, + relocatingFrozenShards ); } return allocation.decision( diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java new file mode 100644 index 0000000000000..fb843375a71b6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java @@ -0,0 +1,330 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +/** + * A series of tests that set up cluster state with a specified number of relocations in progress. + * + * Then sets and checks the ConcurrentRebalanceAllocationDecider canRebalance logic, at both the cluster- + * and shard-level. + */ +public class ConcurrentRebalanceAllocationDeciderTests extends ESAllocationTestCase { + + public void testConcurrentUnlimited() { + ClusterState clusterState = setupConcurrentRelocations(initializeMetadata(5), nodeFactory(), 5); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 0) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [cluster.routing.allocation.cluster_concurrent_rebalance=-1] for concurrent rebalances, " + + "current rebalance shard count [5]" + ); + assertShardAllocationDecision(clusterState, settings, Decision.Type.YES, "unlimited concurrent rebalances are allowed"); + + settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 6) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 0) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [cluster.routing.allocation.cluster_concurrent_rebalance=6] for concurrent rebalances, " + + "current rebalance shard count [5]" + ); + assertShardAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [6] for concurrent rebalances, current rebalance shard count [5]" + ); + } + + public void testFrozenConcurrentUnlimited() { + ClusterState clusterState = setupConcurrentRelocations(initializeFrozenMetadata(5), frozenNodeFactory(), 5); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 0) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [cluster.routing.allocation.cluster_concurrent_frozen_rebalance=-1] for concurrent frozen rebalances, " + + "current frozen rebalance shard count [5]" + ); + assertShardAllocationDecision(clusterState, settings, Decision.Type.YES, "unlimited concurrent frozen rebalances are allowed"); + + settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 0) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 6) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [cluster.routing.allocation.cluster_concurrent_frozen_rebalance=6] for concurrent frozen rebalances, " + + "current frozen rebalance shard count [5]" + ); + assertShardAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [6] for concurrent frozen rebalances, current frozen rebalance shard count [5]" + ); + } + + public void testThrottleDecision() { + ClusterState clusterState = setupConcurrentRelocations(initializeMetadata(2), nodeFactory(), 2); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 2) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 0) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.THROTTLE, + "reached the limit of concurrently rebalancing shards [2] for concurrent rebalances, " + + "cluster setting [cluster.routing.allocation.cluster_concurrent_rebalance=2], " + + "and [0] for concurrent frozen rebalances, frozen cluster setting " + + "[cluster.routing.allocation.cluster_concurrent_frozen_rebalance=0]" + ); + assertShardAllocationDecision( + clusterState, + settings, + Decision.Type.THROTTLE, + "reached the limit of concurrently rebalancing shards [2], " + + "cluster setting [cluster.routing.allocation.cluster_concurrent_rebalance=2]" + ); + } + + public void testFrozenThrottleDecision() { + ClusterState clusterState = setupConcurrentRelocations(initializeFrozenMetadata(2), frozenNodeFactory(), 2); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 0) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 2) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.THROTTLE, + "reached the limit of concurrently rebalancing shards [0] for concurrent rebalances, " + + "cluster setting [cluster.routing.allocation.cluster_concurrent_rebalance=0], " + + "and [2] for concurrent frozen rebalances, " + + "frozen cluster setting [cluster.routing.allocation.cluster_concurrent_frozen_rebalance=2]" + ); + assertShardAllocationDecision( + clusterState, + settings, + Decision.Type.THROTTLE, + "reached the limit of concurrently rebalancing frozen shards [2], " + + "cluster setting [cluster.routing.allocation.cluster_concurrent_frozen_rebalance=2]" + ); + } + + private void assertShardAllocationDecision( + ClusterState clusterState, + Settings settings, + Decision.Type decisionType, + String explanation + ) { + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); + + ConcurrentRebalanceAllocationDecider decider = new ConcurrentRebalanceAllocationDecider(clusterSettings); + + RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(Arrays.asList(decider)), + null, + clusterState, + null, + null, + 0 + ); + allocation.debugDecision(true); + + ShardRouting shardRouting = findStartedShard(clusterState); + Decision decision = decider.canRebalance(shardRouting, allocation); + + assertThat(decision.type(), equalTo(decisionType)); + assertThat(decision.getExplanation(), containsString(explanation)); + } + + private ShardRouting findStartedShard(ClusterState clusterState) { + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + Map> assignedShards = routingNodes.getAssignedShards(); + for (Map.Entry> entry : assignedShards.entrySet()) { + for (ShardRouting shardRouting : entry.getValue()) { + if (shardRouting.state() == ShardRoutingState.STARTED) { + return shardRouting; + } + } + } + assert false : "need at least one started shard"; + return null; + } + + private void assertClusterAllocationDecision( + ClusterState clusterState, + Settings settings, + Decision.Type decisionType, + String explanation + ) { + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); + + ConcurrentRebalanceAllocationDecider decider = new ConcurrentRebalanceAllocationDecider(clusterSettings); + + RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(Arrays.asList(decider)), + null, + clusterState, + null, + null, + 0 + ); + allocation.debugDecision(true); + + Decision decision = decider.canRebalance(allocation); + + assertThat(decision.type(), equalTo(decisionType)); + assertThat(decision.getExplanation(), containsString(explanation)); + } + + private Metadata initializeMetadata(int numberOfRelocations) { + return Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current())) + .numberOfShards(numberOfRelocations) + .numberOfReplicas(1) + ) + .build(); + } + + private Supplier nodeFactory() { + return new Supplier() { + int count = 1; + + @Override + public DiscoveryNode get() { + return ESAllocationTestCase.newNode("node" + count++); + } + }; + } + + private Metadata initializeFrozenMetadata(int numberOfRelocations) { + return Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) + .numberOfShards(numberOfRelocations) + .numberOfReplicas(1) + ) + .build(); + } + + private Supplier frozenNodeFactory() { + return new Supplier() { + int count = 1; + Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + + @Override + public DiscoveryNode get() { + return ESAllocationTestCase.newNode("node" + count++, frozenRole); + } + }; + } + + /** + * Set up a cluster state so that a specified number of concurrent relocations are in progress + * + * Internally, this creates a bunch of shards (the number of relocations) and one replica, + * each on their own node. The primaries are all started, then the replicas are set up to all + * relocate to new nodes. + */ + private ClusterState setupConcurrentRelocations(Metadata metadata, Supplier nodeFactory, int numberOfRelocations) { + AllocationService allocationService = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build() + ); + + RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) + .addAsNew(metadata.getProject().index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); + + var nodeBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < numberOfRelocations; i++) { + nodeBuilder = nodeBuilder.add(nodeFactory.get()); + } + + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + + // set up primaries, and have replicas initializing + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + + // add a bunch of nodes to create relocation chaos + var clusterStateBuilder = ClusterState.builder(clusterState); + nodeBuilder = DiscoveryNodes.builder(clusterStateBuilder.nodes()); + for (int i = 0; i < numberOfRelocations; i++) { + nodeBuilder = nodeBuilder.add(nodeFactory.get()); + } + clusterState = clusterStateBuilder.nodes(nodeBuilder).build(); + + // start relocations + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + + return clusterState; + } +} From 84148e793081942dc082207ce3f09a779852828b Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Mon, 13 Oct 2025 15:32:52 -0700 Subject: [PATCH 19/19] Slight improvements to the test structure --- ...urrentRebalanceAllocationDeciderTests.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java index fb843375a71b6..d7c51f9fa40fe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java @@ -50,7 +50,7 @@ public class ConcurrentRebalanceAllocationDeciderTests extends ESAllocationTestCase { public void testConcurrentUnlimited() { - ClusterState clusterState = setupConcurrentRelocations(initializeMetadata(5), nodeFactory(), 5); + ClusterState clusterState = setupConcurrentRelocations(5); Settings settings = Settings.builder() .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) @@ -85,7 +85,7 @@ public void testConcurrentUnlimited() { } public void testFrozenConcurrentUnlimited() { - ClusterState clusterState = setupConcurrentRelocations(initializeFrozenMetadata(5), frozenNodeFactory(), 5); + ClusterState clusterState = setupConcurrentFrozenRelocations(5); Settings settings = Settings.builder() .put("cluster.routing.allocation.cluster_concurrent_rebalance", 0) @@ -120,7 +120,7 @@ public void testFrozenConcurrentUnlimited() { } public void testThrottleDecision() { - ClusterState clusterState = setupConcurrentRelocations(initializeMetadata(2), nodeFactory(), 2); + ClusterState clusterState = setupConcurrentRelocations(2); Settings settings = Settings.builder() .put("cluster.routing.allocation.cluster_concurrent_rebalance", 2) @@ -145,7 +145,7 @@ public void testThrottleDecision() { } public void testFrozenThrottleDecision() { - ClusterState clusterState = setupConcurrentRelocations(initializeFrozenMetadata(2), frozenNodeFactory(), 2); + ClusterState clusterState = setupConcurrentFrozenRelocations(2); Settings settings = Settings.builder() .put("cluster.routing.allocation.cluster_concurrent_rebalance", 0) @@ -281,6 +281,14 @@ public DiscoveryNode get() { }; } + private ClusterState setupConcurrentRelocations(int relocations) { + return setupConcurrentRelocationsInternal(initializeMetadata(relocations), nodeFactory(), relocations); + } + + private ClusterState setupConcurrentFrozenRelocations(int relocations) { + return setupConcurrentRelocationsInternal(initializeFrozenMetadata(relocations), frozenNodeFactory(), relocations); + } + /** * Set up a cluster state so that a specified number of concurrent relocations are in progress * @@ -288,7 +296,9 @@ public DiscoveryNode get() { * each on their own node. The primaries are all started, then the replicas are set up to all * relocate to new nodes. */ - private ClusterState setupConcurrentRelocations(Metadata metadata, Supplier nodeFactory, int numberOfRelocations) { + private ClusterState setupConcurrentRelocationsInternal(Metadata metadata, Supplier nodeFactory, int relocations) { + assert relocations > 1 : "logic only works for 2 or more relocations as the replica needs to initialize elsewhere then relocate"; + AllocationService allocationService = createAllocationService( Settings.builder() .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) @@ -303,7 +313,7 @@ private ClusterState setupConcurrentRelocations(Metadata metadata, Supplier