From 33d40274ecbe5179b982dc3fcb7eaaf241f7bb41 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Tue, 4 Feb 2025 14:26:42 -0500 Subject: [PATCH 1/2] DesiredBalanceReconciler always returns AllocationStats --- .../cluster/routing/RoutingNode.java | 3 + .../allocator/DesiredBalanceReconciler.java | 11 +-- .../allocation/decider/AllocationDecider.java | 6 +- .../decider/AllocationDeciders.java | 4 + .../ClusterRebalanceAllocationDecider.java | 4 + .../ConcurrentRebalanceAllocationDecider.java | 5 ++ .../decider/EnableAllocationDecider.java | 7 +- .../DesiredBalanceReconcilerTests.java | 86 ++++++++++++++++--- 8 files changed, 104 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index c18591d07a85f..2b5806724c75f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -112,6 +112,9 @@ public String nodeId() { return this.nodeId; } + /** + * Number of shards assigned to this node. Includes relocating shards. Use {@link #numberOfOwningShards()} to exclude relocating shards. + */ public int size() { return shards.size(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index dd7216758c3b7..3fef47b241814 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -515,11 +515,6 @@ private void moveShards() { } private DesiredBalanceMetrics.AllocationStats balance() { - // Check if rebalancing is disabled. - if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { - return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS; - } - int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size(); int totalAllocations = 0; int undesiredAllocationsExcludingShuttingDownNodes = 0; @@ -549,9 +544,15 @@ private DesiredBalanceMetrics.AllocationStats balance() { } if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) { + // shard is not on a shutting down node, nor is it on a desired node per the previous check. undesiredAllocationsExcludingShuttingDownNodes++; } + if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { + // Rebalancing is disabled, we're just here to collect the AllocationStats to return. + continue; + } + if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) { // rebalancing disabled for this shard continue; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 62089856d783c..7fae18a332f0c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -75,9 +75,9 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod } /** - * Returns a {@link Decision} whether the cluster can execute - * re-balanced operations at all. - * {@link Decision#ALWAYS}. + * Returns a {@link Decision} on whether the cluster is allowed to rebalance shards to improve relative node shard weights and + * performance. + * @return {@link Decision#ALWAYS} is returned by default if not overridden. */ public Decision canRebalance(RoutingAllocation allocation) { return Decision.ALWAYS; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 638bec38c3f43..edc57e93d9adf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -82,6 +82,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing ); } + /** + * Returns whether rebalancing (move shards to improve relative node weights and performance) is allowed right now. + * Rebalancing can be disabled via cluster settings, or throttled by cluster settings (e.g. max concurrent shard moves). + */ public Decision canRebalance(RoutingAllocation allocation) { return withDeciders( allocation, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java index 7d866983496f4..3cf012d3faa3f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ClusterRebalanceAllocationDecider.java @@ -150,6 +150,10 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca + "]" ); + /** + * Rebalancing may be enabled, disabled, or only allowed after all primaries have started, depending on the cluster setting + * {@link #CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING}. + */ @SuppressWarnings("fallthrough") @Override public Decision canRebalance(RoutingAllocation allocation) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index 6c457267a5ffa..deb3e4440f4ab 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -61,6 +61,11 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca return canRebalance(allocation); } + /** + * We allow a limited number of concurrent shard relocations, per the cluster setting + * {@link #CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING}. + * Returns a {@link Decision#THROTTLE} decision if the limit is exceeded, otherwise returns {@link Decision#YES}. + */ @Override public Decision canRebalance(RoutingAllocation allocation) { int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java index 2e7bf0c7ee241..1dfe2f8631142 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java @@ -146,6 +146,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat }; } + /** + * Rebalancing is limited by the {@link Rebalance} value set on the cluster setting {@link #CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING}. + * We might allow movement only of primary shards, or replica shards, or none, or all. + * This method only concerns itself with whether {@link Rebalance#NONE} is set: rebalancing is allowed for all other setting values. + */ @Override public Decision canRebalance(RoutingAllocation allocation) { if (allocation.ignoreDisable()) { @@ -243,7 +248,7 @@ public String toString() { } /** - * Rebalance values or rather their string representation to be used used with + * Rebalance values or rather their string representation to be used with * {@link EnableAllocationDecider#CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING} / * {@link EnableAllocationDecider#INDEX_ROUTING_REBALANCE_ENABLE_SETTING} * via cluster / index settings. diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 81aa1a60eb45e..878ef46e79336 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -110,6 +110,8 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase { + private static AtomicReference ALLOCATION_STATS_PLACEHOLDER = new AtomicReference<>(); + public void testNoChangesOnEmptyDesiredBalance() { final var clusterState = DesiredBalanceComputerTests.createInitialClusterState(3); final var routingAllocation = createRoutingAllocationFrom(clusterState); @@ -235,8 +237,9 @@ public void testUnassignedPrimariesBeforeUnassignedReplicas() { (indexName, nodeId) -> indexName.equals("index-0") && nodeId.equals("node-0") ); + AtomicReference allocationStats = new AtomicReference<>(); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, allocationStats), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -260,6 +263,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var index1RoutingTable = stateWithStartedPrimary.routingTable().shardRoutingTable("index-1", 0); assertTrue(index1RoutingTable.primaryShard().unassigned()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); + assertNotNull(allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0), allocationStats.get()); } // now relax the filter so that the replica of index-0 and the primary of index-1 can both be assigned to node-1, but the throttle @@ -273,6 +278,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var index1RoutingTable = stateWithInitializingSecondPrimary.routingTable().shardRoutingTable("index-1", 0); assertTrue(index1RoutingTable.primaryShard().initializing()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); + assertNotNull(allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0), allocationStats.get()); } final var stateWithStartedPrimariesAndInitializingReplica = startInitializingShardsAndReroute( @@ -286,6 +293,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var index1RoutingTable = stateWithStartedPrimariesAndInitializingReplica.routingTable().shardRoutingTable("index-1", 0); assertTrue(index1RoutingTable.primaryShard().started()); assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned)); + assertNotNull(allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0), allocationStats.get()); } } @@ -815,6 +824,9 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing } public void testMoveShards() { + /** + * Set up 4 nodes and an index of 3 shards with 1 replica each (6 shard copies). + */ final var discoveryNodes = discoveryNodes(4); final var metadata = Metadata.builder(); final var routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY); @@ -839,11 +851,13 @@ public void testMoveShards() { .build(); final var clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + // Set up overriding AllocationDecider#canAllocate decisions for a shard. final var canAllocateRef = new AtomicReference<>(Decision.YES); final var desiredBalance = new AtomicReference<>(desiredBalance(clusterState, (shardId, nodeId) -> true)); + AtomicReference allocationStats = new AtomicReference<>(); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance.get()), + routingAllocation -> reconcile(routingAllocation, desiredBalance.get(), allocationStats), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -873,7 +887,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertTrue(shardRouting.started()); assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1")); } + assertNotNull(allocationStats); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); + // Only allow allocation on two of the nodes, excluding the other two nodes. clusterSettings.applySettings( Settings.builder() .putList( @@ -886,6 +903,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // all still on desired nodes, no // movement needed + assertNotNull(allocationStats); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3"))); @@ -894,10 +913,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat final var reroutedState = allocationService.reroute(clusterState, "test", ActionListener.noop()); assertThat(reroutedState.getRoutingNodes().node("node-0").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); + assertNotNull(allocationStats); + // Total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice. + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get()); // Ensuring that we check the shortcut two-param canAllocate() method up front canAllocateRef.set(Decision.NO); assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); canAllocateRef.set(Decision.YES); // Restore filter to default @@ -935,6 +958,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat "test", ActionListener.noop() ); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3), allocationStats.get()); + assertThat(shuttingDownState.getRoutingNodes().node("node-2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); } @@ -963,11 +988,13 @@ public void testRebalance() { final var desiredBalance = new AtomicReference<>( desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-0") || nodeId.equals("node-1")) ); + AtomicReference allocationStats = new AtomicReference<>(); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance.get()), + routingAllocation -> reconcile(routingAllocation, desiredBalance.get(), allocationStats), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), + new ConcurrentRebalanceAllocationDecider(clusterSettings), new AllocationDecider() { @Override public Decision canRebalance(RoutingAllocation allocation) { @@ -997,24 +1024,28 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1")); } - assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // all still on desired nodes, no - // movement needed + // All still on desired nodes, no movement needed, cluster state remains the same. + assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get()); desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3"))); canRebalanceGlobalRef.set(Decision.NO); - assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // rebalancing forbidden on all - // shards, no movement + // rebalancing forbidden on all shards, no movement allowed, cluster state remains the same. + assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); + // assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); canRebalanceGlobalRef.set(Decision.YES); canRebalanceShardRef.set(Decision.NO); - assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // rebalancing forbidden on - // specific shards, no movement + // rebalancing forbidden on specific shards, still no movement. + assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); + // assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); canRebalanceShardRef.set(Decision.YES); canAllocateShardRef.set(Decision.NO); - assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // allocation not possible, no - // movement + // allocation not possible, no movement + assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); + // assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); canAllocateShardRef.set(Decision.YES); // The next reroute starts moving shards to node-2 and node-3, but interleaves the decisions between node-0 and node-1 for fairness. @@ -1022,6 +1053,16 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat final var reroutedState = allocationService.reroute(clusterState, "test", ActionListener.noop()); assertThat(reroutedState.getRoutingNodes().node("node-0").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); + assertNotNull(allocationStats.get()); + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get()); + + // Test that the AllocationStats are still updated, even though throttling is active. The cluster state should remain unchanged + // because due to throttling: the previous reroute request started relocating two shards and, since those reallocations have not + // been completed, no additional shard relocations can begin. + assertSame(reroutedState, allocationService.reroute(reroutedState, "test", ActionListener.noop())); + assertNotNull(allocationStats); + // Note: total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice. + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get()); } public void testDoNotRebalanceToTheNodeThatNoLongerExists() { @@ -1225,12 +1266,14 @@ public void testRebalanceDoesNotCauseHotSpots() { while (true) { var allocation = createRoutingAllocationFrom(clusterState, deciders); - reconciler.reconcile(balance, allocation); + var allocationStats = reconciler.reconcile(balance, allocation); var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING); if (initializing.isEmpty()) { + assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0), allocationStats); break; } + for (ShardRouting shardRouting : initializing) { totalOutgoingMoves.get(shardRouting.relocatingNodeId()).incrementAndGet(); allocation.routingNodes().startShard(shardRouting, allocation.changes(), 0L); @@ -1344,11 +1387,24 @@ public void testShouldLogOnTooManyUndesiredAllocations() { } private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) { + reconcile(routingAllocation, desiredBalance, ALLOCATION_STATS_PLACEHOLDER); + } + + private static void reconcile( + RoutingAllocation routingAllocation, + DesiredBalance desiredBalance, + AtomicReference allocationStatsAtomicReference + ) { final var threadPool = mock(ThreadPool.class); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet); - new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation); + allocationStatsAtomicReference.set( + new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation) + ); } + /** + * Returns whether the node's shards are all desired assignments. + */ private static boolean isReconciled(RoutingNode node, DesiredBalance balance) { for (ShardRouting shardRouting : node) { if (balance.assignments().get(shardRouting.shardId()).nodeIds().contains(node.nodeId()) == false) { @@ -1486,6 +1542,10 @@ private static IndexMetadata randomPriorityIndex(String name, int numberOfShards .build(); } + /** + * Settings that limit concurrency on each node to: a single primary shard recovery from local disk; a single shard move as a source + * node; a single shard move as the destination node. + */ private static Settings throttleSettings() { return Settings.builder() .put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1) From 3154c760904be97638e5d4de6ee39ea870797a11 Mon Sep 17 00:00:00 2001 From: Dianna Hohensee Date: Wed, 12 Feb 2025 21:33:36 -0500 Subject: [PATCH 2/2] Update docs/changelog/122458.yaml --- docs/changelog/122458.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/122458.yaml diff --git a/docs/changelog/122458.yaml b/docs/changelog/122458.yaml new file mode 100644 index 0000000000000..e28e22eb363b6 --- /dev/null +++ b/docs/changelog/122458.yaml @@ -0,0 +1,5 @@ +pr: 122458 +summary: '`DesiredBalanceReconciler` always returns `AllocationStats`' +area: Allocation +type: bug +issues: []