From 53bfa9b46a115344b9a7b08adf9c94c2a1c29c97 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 8 Sep 2025 15:23:39 +1000 Subject: [PATCH 01/35] Implement AllocationDeciders#findNonPreferred --- .../allocator/BalancedShardsAllocator.java | 81 +++++++++++++++++++ .../allocation/decider/AllocationDecider.java | 11 +++ .../decider/AllocationDeciders.java | 13 +++ 3 files changed, 105 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c737b89b80f73..fce9451502b83 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -155,6 +155,7 @@ public void allocate(RoutingAllocation allocation) { final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights); balancer.allocateUnassigned(); balancer.moveShards(); + balancer.moveNonPreferred(); balancer.balance(); // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy. @@ -711,6 +712,77 @@ protected int comparePivot(int j) { return indices; } + /** + * Move started shards that are in non-preferred allocations + */ + public void moveNonPreferred() { + for (Iterator it = allocation.deciders().findNonPreferred(allocation); it.hasNext();) { + ShardRouting shardRouting = it.next(); + ProjectIndex index = projectIndex(shardRouting); + final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); + if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); + sourceNode.removeShard(index, shardRouting); + Tuple relocatingShards = routingNodes.relocateShard( + shardRouting, + targetNode.getNodeId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + "non-preferred", + allocation.changes() + ); + final ShardRouting shard = relocatingShards.v2(); + targetNode.addShard(projectIndex(shard), shard); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); + } + } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { + logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + } + } + } + + /** + * Makes a decision on whether to move a started shard to another node. The following rules apply + * to the {@link MoveDecision} return object: + * 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. + * 2. If the shard's current allocation is preferred ({@link Decision.Type#YES}), no attempt will be made to move the shard and + * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. + * 3. If the shard is not allowed ({@link Decision.Type#NO}), or not preferred ({@link Decision.Type#NOT_PREFERRED}) to remain + * on its current node, then {@link MoveDecision#getAllocationDecision()} will be populated with the decision of moving to + * another node. If {@link MoveDecision#forceMove()} returns {@code true}, then {@link MoveDecision#getTargetNode} will return + * a non-null value representing a node that returned {@link Decision.Type#YES} from canAllocate, otherwise the assignedNodeId + * will be null. + * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then + * {@link MoveDecision#getNodeDecisions} will have a non-null value. + */ + public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final ShardRouting shardRouting) { + NodeSorter sorter = nodeSorters.sorterForShard(shardRouting); + index.assertMatch(shardRouting); + + if (shardRouting.started() == false) { + // we can only move started shards + return MoveDecision.NOT_TAKEN; + } + + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + assert sourceNode != null && sourceNode.containsShard(index, shardRouting); + RoutingNode routingNode = sourceNode.getRoutingNode(); + Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); + if (canRemain.type() != Type.NOT_PREFERRED || canRemain.type() != Type.NO) { + return MoveDecision.remain(canRemain); + } + + sorter.reset(index); + /* + * the sorter holds the minimum weight node first for the shards index. + * We now walk through the nodes until we find a node to allocate the shard. + * This is not guaranteed to be balanced after this operation we still try best effort to + * allocate on the minimal eligible node. + */ + return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocatePreferredOnly); + } + /** * Move started shards that can not be allocated to a node anymore * @@ -839,6 +911,15 @@ private MoveDecision decideMove( ); } + private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) { + Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation); + // not-preferred means no here + if (decision.type() == Type.NOT_PREFERRED) { + return Decision.NO; + } + return decision; + } + private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { // don't use canRebalance as we want hard filtering rules to apply. See #17698 return allocation.deciders().canAllocate(shardRouting, target, allocation); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 7fae18a332f0c..3eb585812ce98 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; +import java.util.Iterator; import java.util.Optional; import java.util.Set; @@ -153,4 +154,14 @@ public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRo public Optional> getForcedInitialShardAllocationToNodes(ShardRouting shardRouting, RoutingAllocation allocation) { return Optional.empty(); } + + /** + * Return a list of shard allocations that are non-preferable according to this decider + * + * @param allocation the current routing allocation + * @return A list of shard allocations that this decider would like to move elsewhere, in order of descending priority + */ + public Optional> getNonPreferredAllocations(RoutingAllocation allocation) { + return Optional.empty(); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 6a09c894dbc7d..cbf924425be8c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -17,9 +17,12 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.util.set.Sets; +import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; @@ -244,4 +247,14 @@ public Optional> getForcedInitialShardAllocationToNodes(ShardRouting } return result; } + + @SuppressWarnings("unchecked") + public Iterator findNonPreferred(RoutingAllocation routingAllocation) { + var iterators = new ArrayList>(deciders.length); + for (AllocationDecider decider : deciders) { + decider.getNonPreferredAllocations(routingAllocation).ifPresent(iterators::add); + assert iterators.size() == 1 : "when we've got more than one decider contributing we should revisit how these are combined"; + } + return Iterators.concat(iterators.>toArray(Iterator[]::new)); + } } From f0f9f77e87a7ffe2a99d225a32b35c6f4993d8aa Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 8 Sep 2025 16:13:37 +1000 Subject: [PATCH 02/35] Fix assertion --- .../cluster/routing/allocation/decider/AllocationDeciders.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index cbf924425be8c..f9d49ae53a11f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -253,7 +253,7 @@ public Iterator findNonPreferred(RoutingAllocation routingAllocati var iterators = new ArrayList>(deciders.length); for (AllocationDecider decider : deciders) { decider.getNonPreferredAllocations(routingAllocation).ifPresent(iterators::add); - assert iterators.size() == 1 : "when we've got more than one decider contributing we should revisit how these are combined"; + assert iterators.size() <= 1 : "when we've got more than one decider contributing we should revisit how these are combined"; } return Iterators.concat(iterators.>toArray(Iterator[]::new)); } From 71232d5e38111f25b29687a402d7d729f1b3c00c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 8 Sep 2025 17:41:02 +1000 Subject: [PATCH 03/35] Implement prioritisable problems --- .../allocator/BalancedShardsAllocator.java | 24 +++++++++++++++--- .../allocation/decider/AllocationDecider.java | 8 +++--- .../decider/AllocationDeciders.java | 25 +++++++++++++------ 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index fce9451502b83..d2631bc95abeb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -716,8 +716,24 @@ protected int comparePivot(int j) { * Move started shards that are in non-preferred allocations */ public void moveNonPreferred() { - for (Iterator it = allocation.deciders().findNonPreferred(allocation); it.hasNext();) { - ShardRouting shardRouting = it.next(); + boolean movedAShard = false; + do { + for (Iterator problemIterator = allocation.deciders() + .findAllocationProblems(allocation); problemIterator.hasNext();) { + AllocationDeciders.AllocationProblem problem = problemIterator.next(); + if (tryResolveAllocationProblem(problem)) { + movedAShard = true; + break; + } + logger.debug("Unable to resolve allocation problem [{}], will try next time", problem); + } + // TODO: Update cluster info + } while (movedAShard); + } + + private boolean tryResolveAllocationProblem(AllocationDeciders.AllocationProblem problem) { + for (Iterator shardIterator = problem.preferredShardMovements(); shardIterator.hasNext();) { + ShardRouting shardRouting = shardIterator.next(); ProjectIndex index = projectIndex(shardRouting); final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { @@ -728,7 +744,7 @@ public void moveNonPreferred() { shardRouting, targetNode.getNodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - "non-preferred", + problem.relocateReason(), allocation.changes() ); final ShardRouting shard = relocatingShards.v2(); @@ -736,10 +752,12 @@ public void moveNonPreferred() { if (logger.isTraceEnabled()) { logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); } + return true; } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); } } + return false; } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 3eb585812ce98..45cbe413245c2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -16,7 +16,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; -import java.util.Iterator; +import java.util.Collection; import java.util.Optional; import java.util.Set; @@ -156,12 +156,12 @@ public Optional> getForcedInitialShardAllocationToNodes(ShardRouting } /** - * Return a list of shard allocations that are non-preferable according to this decider + * Get a list of allocation problems that can be fixed by moving some shards * * @param allocation the current routing allocation - * @return A list of shard allocations that this decider would like to move elsewhere, in order of descending priority + * @return A list of node IDs that contain shards this decider would like to move elsewhere, in order of descending priority */ - public Optional> getNonPreferredAllocations(RoutingAllocation allocation) { + public Optional> getAllocationProblems(RoutingAllocation allocation) { return Optional.empty(); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index f9d49ae53a11f..db5d6aa9e1523 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -17,14 +17,14 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.util.set.Sets; -import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.function.BiFunction; import java.util.function.Function; @@ -248,13 +248,22 @@ public Optional> getForcedInitialShardAllocationToNodes(ShardRouting return result; } - @SuppressWarnings("unchecked") - public Iterator findNonPreferred(RoutingAllocation routingAllocation) { - var iterators = new ArrayList>(deciders.length); + public Iterator findAllocationProblems(RoutingAllocation routingAllocation) { + var problems = new TreeSet<>(Comparator.comparing(AllocationProblem::priority).reversed()); for (AllocationDecider decider : deciders) { - decider.getNonPreferredAllocations(routingAllocation).ifPresent(iterators::add); - assert iterators.size() <= 1 : "when we've got more than one decider contributing we should revisit how these are combined"; + decider.getAllocationProblems(routingAllocation).ifPresent(problems::addAll); + } + return problems.iterator(); + } + + public interface AllocationProblem { + + Iterator preferredShardMovements(); + + String relocateReason(); + + default int priority() { + return 1; } - return Iterators.concat(iterators.>toArray(Iterator[]::new)); } } From c31b05af11ed707c8512fae09600790df39908bf Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 8 Sep 2025 17:49:25 +1000 Subject: [PATCH 04/35] Javadoc --- .../allocation/decider/AllocationDeciders.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index db5d6aa9e1523..650379a521e89 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -258,10 +259,21 @@ public Iterator findAllocationProblems(RoutingAllocation rout public interface AllocationProblem { + /** + * Shard movements to attempt to resolve the problem in descending priority order. + */ Iterator preferredShardMovements(); + /** + * The reason for the relocation + * + * @see RoutingChangesObserver#relocationStarted(ShardRouting, ShardRouting, String) + */ String relocateReason(); + /** + * We could prioritize them this way + */ default int priority() { return 1; } From 967e76e150bb9493351632375cf1ef8e11177736 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 8 Sep 2025 17:59:01 +1000 Subject: [PATCH 05/35] Example for write load constraint decider --- .../allocation/decider/AllocationDecider.java | 2 +- .../decider/WriteLoadConstraintDecider.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 45cbe413245c2..35ddd7874c265 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -161,7 +161,7 @@ public Optional> getForcedInitialShardAllocationToNodes(ShardRouting * @param allocation the current routing allocation * @return A list of node IDs that contain shards this decider would like to move elsewhere, in order of descending priority */ - public Optional> getAllocationProblems(RoutingAllocation allocation) { + public Optional> getAllocationProblems(RoutingAllocation allocation) { return Optional.empty(); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index e814f570a67bb..1cc254df16145 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -22,6 +22,11 @@ import org.elasticsearch.core.Strings; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Collection; +import java.util.Iterator; +import java.util.Optional; +import java.util.stream.Collectors; + /** * Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread * pool usage stats and any candidate shard's write load estimate. @@ -109,6 +114,47 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented"); } + @Override + public Optional> getAllocationProblems(RoutingAllocation allocation) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { + return Optional.empty(); + } + + final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + final Collection hotSpots = nodeUsageStatsForThreadPools.entrySet() + .stream() + .filter(entry -> entry.getValue().threadPoolUsageStatsMap().containsKey(ThreadPool.Names.WRITE)) + .filter(entry -> { + long maxQueueLatency = entry.getValue() + .threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE) + .maxThreadPoolQueueLatencyMillis(); + return maxQueueLatency > writeLoadConstraintSettings.getQueueLatencyThreshold().millis(); + }) + .map(entry -> new HotSpot(entry.getKey())) + .collect(Collectors.toList()); + return hotSpots.isEmpty() == false ? Optional.of(hotSpots) : Optional.empty(); + } + + private record HotSpot(String nodeId) implements AllocationDeciders.AllocationProblem { + + @Override + public Iterator preferredShardMovements() { + // TODO: return shards in priority order + return null; + } + + @Override + public String relocateReason() { + return "hot-spotting"; + } + + @Override + public String toString() { + return "Hot-spotting on node " + nodeId; + } + } + /** * Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node. * Returns the percent thread pool utilization change. From d147a18129ac2d6641bdd5d450d6273d7c78d317 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 8 Sep 2025 18:00:06 +1000 Subject: [PATCH 06/35] Fix text --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index d2631bc95abeb..fcc1d4a9997e0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -725,7 +725,7 @@ public void moveNonPreferred() { movedAShard = true; break; } - logger.debug("Unable to resolve allocation problem [{}], will try next time", problem); + logger.debug("Unable to resolve [{}]", problem); } // TODO: Update cluster info } while (movedAShard); From 4f7b51974914afb9487ca7c4a6e72e7548aacada Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 8 Sep 2025 18:00:54 +1000 Subject: [PATCH 07/35] Tidy --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index fcc1d4a9997e0..fb26e4de23c91 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -721,7 +721,7 @@ public void moveNonPreferred() { for (Iterator problemIterator = allocation.deciders() .findAllocationProblems(allocation); problemIterator.hasNext();) { AllocationDeciders.AllocationProblem problem = problemIterator.next(); - if (tryResolveAllocationProblem(problem)) { + if (tryResolve(problem)) { movedAShard = true; break; } @@ -731,7 +731,7 @@ public void moveNonPreferred() { } while (movedAShard); } - private boolean tryResolveAllocationProblem(AllocationDeciders.AllocationProblem problem) { + private boolean tryResolve(AllocationDeciders.AllocationProblem problem) { for (Iterator shardIterator = problem.preferredShardMovements(); shardIterator.hasNext();) { ShardRouting shardRouting = shardIterator.next(); ProjectIndex index = projectIndex(shardRouting); From 91ee1970f20d2bfdf6a6d732ac1b9651ae4890ce Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 9 Sep 2025 12:01:40 +1000 Subject: [PATCH 08/35] Fix boolean logic --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index fb26e4de23c91..e2cad43308284 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -787,7 +787,7 @@ public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final Shard assert sourceNode != null && sourceNode.containsShard(index, shardRouting); RoutingNode routingNode = sourceNode.getRoutingNode(); Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if (canRemain.type() != Type.NOT_PREFERRED || canRemain.type() != Type.NO) { + if ((canRemain.type() == Type.NOT_PREFERRED || canRemain.type() == Type.NO) == false) { return MoveDecision.remain(canRemain); } From 1d3b08e6b20a137d16a5047f3b4aa046eb31c457 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 14:59:22 +1000 Subject: [PATCH 09/35] Introduce pluggable non-preferred iteration --- .../routing/allocation/RoutingAllocation.java | 26 ++++++++- .../allocator/BalancedShardsAllocator.java | 55 +++++++++---------- .../NonPreferredShardIteratorFactory.java | 40 ++++++++++++++ 3 files changed, 89 insertions(+), 32 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 3e9f934572585..94059bd135892 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.allocator.NonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.core.Nullable; @@ -35,6 +36,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -85,6 +87,8 @@ public class RoutingAllocation { // Tracks the sizes of the searchable snapshots that aren't yet registered in ClusterInfo by their cluster node id private final Map unaccountedSearchableSnapshotSizes; + private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; + public RoutingAllocation( AllocationDeciders deciders, ClusterState clusterState, @@ -112,7 +116,16 @@ public RoutingAllocation( SnapshotShardSizeInfo shardSizeInfo, long currentNanoTime ) { - this(deciders, routingNodes, clusterState, clusterInfo, shardSizeInfo, currentNanoTime, false); + this( + deciders, + routingNodes, + clusterState, + clusterInfo, + shardSizeInfo, + currentNanoTime, + false, + NonPreferredShardIteratorFactory.NOOP + ); } /** @@ -130,7 +143,8 @@ private RoutingAllocation( ClusterInfo clusterInfo, SnapshotShardSizeInfo shardSizeInfo, long currentNanoTime, - boolean isSimulating + boolean isSimulating, + NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory ) { this.deciders = deciders; this.routingNodes = routingNodes; @@ -156,6 +170,7 @@ private RoutingAllocation( resizeSourceIndexUpdater, new ShardChangesObserver() } ); + this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; } private static Map nodeReplacementTargets(ClusterState clusterState) { @@ -213,6 +228,10 @@ public RoutingTable routingTable() { return globalRoutingTable().getRoutingTable(); } + public Iterator nonPreferredShards() { + return nonPreferredShardIteratorFactory.createNonPreferredShardIterator(this); + } + public GlobalRoutingTable globalRoutingTable() { return clusterState.globalRoutingTable(); } @@ -459,7 +478,8 @@ public RoutingAllocation mutableCloneForSimulation() { clusterInfo, shardSizeInfo, currentNanoTime, - true + true, + nonPreferredShardIteratorFactory ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index e2cad43308284..40f465a391f4b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -718,44 +718,41 @@ protected int comparePivot(int j) { public void moveNonPreferred() { boolean movedAShard = false; do { - for (Iterator problemIterator = allocation.deciders() - .findAllocationProblems(allocation); problemIterator.hasNext();) { - AllocationDeciders.AllocationProblem problem = problemIterator.next(); - if (tryResolve(problem)) { + // Any time we move a shard, we need to update the cluster info and ask again for the non-preferred shards + // as they may have changed + for (Iterator nonPreferredShards = allocation.nonPreferredShards(); nonPreferredShards.hasNext();) { + ShardRouting shard = nonPreferredShards.next(); + if (tryMoveNonPreferred(shard)) { movedAShard = true; break; } - logger.debug("Unable to resolve [{}]", problem); } // TODO: Update cluster info } while (movedAShard); } - private boolean tryResolve(AllocationDeciders.AllocationProblem problem) { - for (Iterator shardIterator = problem.preferredShardMovements(); shardIterator.hasNext();) { - ShardRouting shardRouting = shardIterator.next(); - ProjectIndex index = projectIndex(shardRouting); - final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); - if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { - final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); - sourceNode.removeShard(index, shardRouting); - Tuple relocatingShards = routingNodes.relocateShard( - shardRouting, - targetNode.getNodeId(), - allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - problem.relocateReason(), - allocation.changes() - ); - final ShardRouting shard = relocatingShards.v2(); - targetNode.addShard(projectIndex(shard), shard); - if (logger.isTraceEnabled()) { - logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); - } - return true; - } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { - logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); + private boolean tryMoveNonPreferred(ShardRouting shardRouting) { + ProjectIndex index = projectIndex(shardRouting); + final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); + if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); + sourceNode.removeShard(index, shardRouting); + Tuple relocatingShards = routingNodes.relocateShard( + shardRouting, + targetNode.getNodeId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + "non-preferred", + allocation.changes() + ); + final ShardRouting shard = relocatingShards.v2(); + targetNode.addShard(projectIndex(shard), shard); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); } + return true; + } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { + logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); } return false; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java new file mode 100644 index 0000000000000..c148291cb0b6d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; + +import java.util.Collections; +import java.util.Iterator; + +public interface NonPreferredShardIteratorFactory { + + NonPreferredShardIteratorFactory NOOP = ignored -> Collections.emptyIterator(); + + /** + * Create an iterator returning all shards that are in non-preferred allocations, ordered in + * descending desirability-to-move order + * + * @param allocation the current routing allocation + * @return An iterator containing shards we'd like to move to a preferred allocation + */ + Iterator createNonPreferredShardIterator(RoutingAllocation allocation); + + /** + * The default iterator factory + */ + class Default implements NonPreferredShardIteratorFactory { + @Override + public Iterator createNonPreferredShardIterator(RoutingAllocation allocation) { + return Collections.emptyIterator(); + } + } +} From 687c6e23de2e78d7ecf2e252b013f254a852b578 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 16:57:50 +1000 Subject: [PATCH 10/35] Implement NonPreferredShardIteratorFactory for resolving hot-spots --- .../elasticsearch/cluster/ClusterModule.java | 6 +- .../routing/allocation/AllocationService.java | 18 ++- .../routing/allocation/RoutingAllocation.java | 36 +++-- .../WriteLoadConstraintSettings.java | 4 + ...faultNonPreferredShardIteratorFactory.java | 128 ++++++++++++++++++ .../allocation/AllocationServiceTests.java | 10 +- .../DesiredBalanceReconcilerTests.java | 7 +- ...TransportGetShutdownStatusActionTests.java | 4 +- 8 files changed, 193 insertions(+), 20 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 2522001b94168..58d1c7ae8c22d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -39,10 +39,12 @@ import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings; import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory; +import org.elasticsearch.cluster.routing.allocation.allocator.DefaultNonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction; @@ -184,12 +186,14 @@ public ClusterModule( this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver); this.shardRoutingRoleStrategy = getShardRoutingRoleStrategy(clusterPlugins); + final var writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterService.getClusterSettings()); this.allocationService = new AllocationService( allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, - shardRoutingRoleStrategy + shardRoutingRoleStrategy, + new DefaultNonPreferredShardIteratorFactory(writeLoadConstraintSettings) ); this.allocationService.addAllocFailuresResetListenerTo(clusterService); this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 263c2362c9ed6..3104f07ada7dc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.NonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -89,6 +90,7 @@ public class AllocationService { private final ClusterInfoService clusterInfoService; private final SnapshotsInfoService snapshotsInfoService; private final ShardRoutingRoleStrategy shardRoutingRoleStrategy; + private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; // only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator @SuppressWarnings("this-escape") @@ -100,7 +102,14 @@ public AllocationService( SnapshotsInfoService snapshotsInfoService, ShardRoutingRoleStrategy shardRoutingRoleStrategy ) { - this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, shardRoutingRoleStrategy); + this( + allocationDeciders, + shardsAllocator, + clusterInfoService, + snapshotsInfoService, + shardRoutingRoleStrategy, + NonPreferredShardIteratorFactory.NOOP + ); setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator)); } @@ -109,13 +118,15 @@ public AllocationService( ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, - ShardRoutingRoleStrategy shardRoutingRoleStrategy + ShardRoutingRoleStrategy shardRoutingRoleStrategy, + NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory ) { this.allocationDeciders = allocationDeciders; this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; this.snapshotsInfoService = snapshotsInfoService; this.shardRoutingRoleStrategy = shardRoutingRoleStrategy; + this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; } /** @@ -764,7 +775,8 @@ private RoutingAllocation createRoutingAllocation(ClusterState clusterState, lon clusterState, clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes(), - currentNanoTime + currentNanoTime, + nonPreferredShardIteratorFactory ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 94059bd135892..34f255d42d4ea 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -96,7 +96,7 @@ public RoutingAllocation( SnapshotShardSizeInfo shardSizeInfo, long currentNanoTime ) { - this(deciders, null, clusterState, clusterInfo, shardSizeInfo, currentNanoTime); + this(deciders, null, clusterState, clusterInfo, shardSizeInfo, currentNanoTime, NonPreferredShardIteratorFactory.NOOP); } /** @@ -123,18 +123,32 @@ public RoutingAllocation( clusterInfo, shardSizeInfo, currentNanoTime, - false, - NonPreferredShardIteratorFactory.NOOP + NonPreferredShardIteratorFactory.NOOP, + false ); } + public RoutingAllocation( + AllocationDeciders deciders, + @Nullable RoutingNodes routingNodes, + ClusterState clusterState, + ClusterInfo clusterInfo, + SnapshotShardSizeInfo shardSizeInfo, + long currentNanoTime, + NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory + ) { + this(deciders, routingNodes, clusterState, clusterInfo, shardSizeInfo, currentNanoTime, nonPreferredShardIteratorFactory, false); + } + /** * Creates a new {@link RoutingAllocation} - * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations - * @param routingNodes Routing nodes in the current cluster or {@code null} if using those in the given cluster state - * @param clusterState cluster state before rerouting + * + * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations + * @param routingNodes Routing nodes in the current cluster or {@code null} if using those in the given cluster state + * @param clusterState cluster state before rerouting * @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()}) - * @param isSimulating {@code true} if "transient" deciders should be ignored because we are simulating the final allocation + * @param nonPreferredShardIteratorFactory factory for non-preferred shards iterator + * @param isSimulating {@code true} if "transient" deciders should be ignored because we are simulating the final allocation */ private RoutingAllocation( AllocationDeciders deciders, @@ -143,8 +157,8 @@ private RoutingAllocation( ClusterInfo clusterInfo, SnapshotShardSizeInfo shardSizeInfo, long currentNanoTime, - boolean isSimulating, - NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory + NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory, + boolean isSimulating ) { this.deciders = deciders; this.routingNodes = routingNodes; @@ -478,8 +492,8 @@ public RoutingAllocation mutableCloneForSimulation() { clusterInfo, shardSizeInfo, currentNanoTime, - true, - nonPreferredShardIteratorFactory + nonPreferredShardIteratorFactory, + true ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index 21c1a7ba04f0f..b2e38d52f5df1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -24,6 +24,10 @@ public class WriteLoadConstraintSettings { private static final String SETTING_PREFIX = "cluster.routing.allocation.write_load_decider."; + public static final WriteLoadConstraintSettings DEFAULT = new WriteLoadConstraintSettings( + ClusterSettings.createBuiltInClusterSettings() + ); + public enum WriteLoadDeciderStatus { /** * The decider is disabled diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java new file mode 100644 index 0000000000000..9e55f2d1973fe --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * Non-preferred shard iterator factory that returns the most desirable shards from most-hot-spotted + * nodes first. + * Does not return nodes for which we have no write-pool utilization, or shards for which we have no + * write-load data. + */ +public class DefaultNonPreferredShardIteratorFactory implements NonPreferredShardIteratorFactory { + + private final WriteLoadConstraintSettings writeLoadConstraintSettings; + + public DefaultNonPreferredShardIteratorFactory(WriteLoadConstraintSettings writeLoadConstraintSettings) { + this.writeLoadConstraintSettings = writeLoadConstraintSettings; + } + + @Override + public Iterator createNonPreferredShardIterator(RoutingAllocation allocation) { + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { + return Collections.emptyIterator(); + } + final Set hotSpottedNodes = new TreeSet<>(Comparator.reverseOrder()); + final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + for (RoutingNode node : allocation.routingNodes()) { + var nodeUsageStats = nodeUsageStatsForThreadPools.get(node.nodeId()); + if (nodeUsageStats != null) { + final var writeThreadPoolStats = nodeUsageStats.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + assert writeThreadPoolStats != null; + hotSpottedNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis())); + } + } + return new NodeShardIterator(hotSpottedNodes.iterator()); + } + + private static class NodeShardIterator implements Iterator { + + private final Iterator iterator; + private Iterator currentShardIterator; + + private NodeShardIterator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + if (currentShardIterator == null || currentShardIterator.hasNext() == false) { + if (iterator.hasNext()) { + currentShardIterator = iterator.next().iterator(); + } else { + return false; + } + } + return currentShardIterator.hasNext(); + } + + @Override + public ShardRouting next() { + if (currentShardIterator == null) { + currentShardIterator = iterator.next().iterator(); + } + return currentShardIterator.next(); + } + } + + private static class NodeShardIterable implements Iterable, Comparable { + + private final RoutingAllocation allocation; + private final RoutingNode routingNode; + private final long maxQueueLatencyMillis; + + private NodeShardIterable(RoutingAllocation allocation, RoutingNode routingNode, long maxQueueLatencyMillis) { + this.allocation = allocation; + this.routingNode = routingNode; + this.maxQueueLatencyMillis = maxQueueLatencyMillis; + } + + @Override + public Iterator iterator() { + return createShardIterator(); + } + + @Override + public int compareTo(NodeShardIterable o) { + return Long.compare(maxQueueLatencyMillis, o.maxQueueLatencyMillis); + } + + private Iterator createShardIterator() { + final var shardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); + final List sortedRoutings = new ArrayList<>(); + double totalWriteLoad = 0; + for (ShardRouting shard : routingNode) { + Double shardWriteLoad = shardWriteLoads.get(shard.shardId()); + if (shardWriteLoad != null) { + sortedRoutings.add(shard); + totalWriteLoad += shardWriteLoad; + } + } + // TODO: Work out what this order should be + // Sort by distance-from-mean-write-load + double meanWriteLoad = totalWriteLoad / sortedRoutings.size(); + sortedRoutings.sort(Comparator.comparing(sr -> Math.abs(shardWriteLoads.get(sr.shardId()) - meanWriteLoad))); + return sortedRoutings.iterator(); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java index 3e341413eb77f..136b7cccdd68b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.allocator.NonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -154,7 +155,8 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing }, new EmptyClusterInfoService(), EmptySnapshotsInfoService.INSTANCE, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, + NonPreferredShardIteratorFactory.NOOP ); final String unrealisticAllocatorName = "unrealistic"; @@ -268,7 +270,8 @@ public void testExplainsNonAllocationOfShardWithUnknownAllocator() { null, null, null, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, + NonPreferredShardIteratorFactory.NOOP ); allocationService.setExistingShardsAllocators( Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()) @@ -390,7 +393,8 @@ public void testAutoExpandReplicas() throws Exception { null, new EmptyClusterInfoService(), EmptySnapshotsInfoService.INSTANCE, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, + NonPreferredShardIteratorFactory.NOOP ); final ProjectId project1 = randomUniqueProjectId(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 1f8d59a958bfe..e5a212bc14dc3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -1478,7 +1478,12 @@ public void allocate(RoutingAllocation allocation) { public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) { throw new AssertionError("should not be called"); } - }, clusterInfoService, snapshotsInfoService, TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY); + }, + clusterInfoService, + snapshotsInfoService, + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, + NonPreferredShardIteratorFactory.NOOP + ); allocationService.setExistingShardsAllocators(Map.of(GatewayAllocator.ALLOCATOR_NAME, new NoOpExistingShardsAllocator())); return allocationService; } diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java index 917eb59922962..b9275feee0166 100644 --- a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.allocation.Explanations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.NonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -147,7 +148,8 @@ public Decision canRebalance(RoutingAllocation allocation) { new BalancedShardsAllocator(Settings.EMPTY), clusterInfoService, snapshotsInfoService, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, + NonPreferredShardIteratorFactory.NOOP ); allocationService.setExistingShardsAllocators(Map.of(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator())); } From 1a4c85a3397afc3583e8184ce67d5c68b6d7ad20 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 17:23:20 +1000 Subject: [PATCH 11/35] Remove unused default implementation --- .../allocator/NonPreferredShardIteratorFactory.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java index c148291cb0b6d..4814c6abf1f36 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java @@ -27,14 +27,4 @@ public interface NonPreferredShardIteratorFactory { * @return An iterator containing shards we'd like to move to a preferred allocation */ Iterator createNonPreferredShardIterator(RoutingAllocation allocation); - - /** - * The default iterator factory - */ - class Default implements NonPreferredShardIteratorFactory { - @Override - public Iterator createNonPreferredShardIterator(RoutingAllocation allocation) { - return Collections.emptyIterator(); - } - } } From 3196d21da6b2869744277341e59c50138f5f0236 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 17:44:13 +1000 Subject: [PATCH 12/35] Get rid of remnants of prior approach --- .../allocation/decider/AllocationDecider.java | 11 ----- .../decider/AllocationDeciders.java | 34 -------------- .../decider/WriteLoadConstraintDecider.java | 46 ------------------- 3 files changed, 91 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 35ddd7874c265..7fae18a332f0c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type; -import java.util.Collection; import java.util.Optional; import java.util.Set; @@ -154,14 +153,4 @@ public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRo public Optional> getForcedInitialShardAllocationToNodes(ShardRouting shardRouting, RoutingAllocation allocation) { return Optional.empty(); } - - /** - * Get a list of allocation problems that can be fixed by moving some shards - * - * @param allocation the current routing allocation - * @return A list of node IDs that contain shards this decider would like to move elsewhere, in order of descending priority - */ - public Optional> getAllocationProblems(RoutingAllocation allocation) { - return Optional.empty(); - } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index 650379a521e89..6a09c894dbc7d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingChangesObserver; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -21,11 +20,8 @@ import org.elasticsearch.common.util.set.Sets; import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; import java.util.Optional; import java.util.Set; -import java.util.TreeSet; import java.util.function.BiFunction; import java.util.function.Function; @@ -248,34 +244,4 @@ public Optional> getForcedInitialShardAllocationToNodes(ShardRouting } return result; } - - public Iterator findAllocationProblems(RoutingAllocation routingAllocation) { - var problems = new TreeSet<>(Comparator.comparing(AllocationProblem::priority).reversed()); - for (AllocationDecider decider : deciders) { - decider.getAllocationProblems(routingAllocation).ifPresent(problems::addAll); - } - return problems.iterator(); - } - - public interface AllocationProblem { - - /** - * Shard movements to attempt to resolve the problem in descending priority order. - */ - Iterator preferredShardMovements(); - - /** - * The reason for the relocation - * - * @see RoutingChangesObserver#relocationStarted(ShardRouting, ShardRouting, String) - */ - String relocateReason(); - - /** - * We could prioritize them this way - */ - default int priority() { - return 1; - } - } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 1cc254df16145..e814f570a67bb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -22,11 +22,6 @@ import org.elasticsearch.core.Strings; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; -import java.util.Iterator; -import java.util.Optional; -import java.util.stream.Collectors; - /** * Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread * pool usage stats and any candidate shard's write load estimate. @@ -114,47 +109,6 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented"); } - @Override - public Optional> getAllocationProblems(RoutingAllocation allocation) { - if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { - return Optional.empty(); - } - - final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); - final Collection hotSpots = nodeUsageStatsForThreadPools.entrySet() - .stream() - .filter(entry -> entry.getValue().threadPoolUsageStatsMap().containsKey(ThreadPool.Names.WRITE)) - .filter(entry -> { - long maxQueueLatency = entry.getValue() - .threadPoolUsageStatsMap() - .get(ThreadPool.Names.WRITE) - .maxThreadPoolQueueLatencyMillis(); - return maxQueueLatency > writeLoadConstraintSettings.getQueueLatencyThreshold().millis(); - }) - .map(entry -> new HotSpot(entry.getKey())) - .collect(Collectors.toList()); - return hotSpots.isEmpty() == false ? Optional.of(hotSpots) : Optional.empty(); - } - - private record HotSpot(String nodeId) implements AllocationDeciders.AllocationProblem { - - @Override - public Iterator preferredShardMovements() { - // TODO: return shards in priority order - return null; - } - - @Override - public String relocateReason() { - return "hot-spotting"; - } - - @Override - public String toString() { - return "Hot-spotting on node " + nodeId; - } - } - /** * Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node. * Returns the percent thread pool utilization change. From d63012c3a48cd742ea9b82ac8df6999924c810b2 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 17:48:53 +1000 Subject: [PATCH 13/35] Remove cruft --- .../routing/allocation/WriteLoadConstraintSettings.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index b2e38d52f5df1..21c1a7ba04f0f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -24,10 +24,6 @@ public class WriteLoadConstraintSettings { private static final String SETTING_PREFIX = "cluster.routing.allocation.write_load_decider."; - public static final WriteLoadConstraintSettings DEFAULT = new WriteLoadConstraintSettings( - ClusterSettings.createBuiltInClusterSettings() - ); - public enum WriteLoadDeciderStatus { /** * The decider is disabled From 7918b3b447d4b749394a2743f95cb8c5f5e2507c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 17:59:24 +1000 Subject: [PATCH 14/35] Improve naming/javadoc --- .../allocation/allocator/BalancedShardsAllocator.java | 5 ++--- .../allocator/NonPreferredShardIteratorFactory.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 40f465a391f4b..bb57d6e9269aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -721,8 +721,7 @@ public void moveNonPreferred() { // Any time we move a shard, we need to update the cluster info and ask again for the non-preferred shards // as they may have changed for (Iterator nonPreferredShards = allocation.nonPreferredShards(); nonPreferredShards.hasNext();) { - ShardRouting shard = nonPreferredShards.next(); - if (tryMoveNonPreferred(shard)) { + if (tryMoveShardIfNonPreferred(nonPreferredShards.next())) { movedAShard = true; break; } @@ -731,7 +730,7 @@ public void moveNonPreferred() { } while (movedAShard); } - private boolean tryMoveNonPreferred(ShardRouting shardRouting) { + private boolean tryMoveShardIfNonPreferred(ShardRouting shardRouting) { ProjectIndex index = projectIndex(shardRouting); final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java index 4814c6abf1f36..4ec22edeff1e6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java @@ -20,7 +20,7 @@ public interface NonPreferredShardIteratorFactory { NonPreferredShardIteratorFactory NOOP = ignored -> Collections.emptyIterator(); /** - * Create an iterator returning all shards that are in non-preferred allocations, ordered in + * Create an iterator returning all shards to be checked for non-preferred allocation, ordered in * descending desirability-to-move order * * @param allocation the current routing allocation From d349668d91458d84cb643c456fc4162156571d3a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 18:27:26 +1000 Subject: [PATCH 15/35] Improve wiring --- .../elasticsearch/cluster/ClusterModule.java | 22 +++++++-- .../routing/allocation/AllocationService.java | 18 ++------ .../routing/allocation/RoutingAllocation.java | 46 +++---------------- .../allocator/BalancedShardsAllocator.java | 32 ++++++++++--- .../allocation/AllocationServiceTests.java | 10 ++-- .../BalancedShardsAllocatorTests.java | 3 +- .../DesiredBalanceReconcilerTests.java | 7 +-- ...TransportGetShutdownStatusActionTests.java | 4 +- 8 files changed, 59 insertions(+), 83 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 58d1c7ae8c22d..744f0a9b6fcf1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -186,14 +186,12 @@ public ClusterModule( this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver); this.shardRoutingRoleStrategy = getShardRoutingRoleStrategy(clusterPlugins); - final var writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterService.getClusterSettings()); this.allocationService = new AllocationService( allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, - shardRoutingRoleStrategy, - new DefaultNonPreferredShardIteratorFactory(writeLoadConstraintSettings) + shardRoutingRoleStrategy ); this.allocationService.addAllocFailuresResetListenerTo(clusterService); this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService); @@ -508,16 +506,30 @@ private static ShardsAllocator createShardsAllocator( ShardAllocationExplainer shardAllocationExplainer, DesiredBalanceMetrics desiredBalanceMetrics ) { + WriteLoadConstraintSettings writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings); + DefaultNonPreferredShardIteratorFactory nonPreferredShardIteratorFactory = new DefaultNonPreferredShardIteratorFactory( + writeLoadConstraintSettings + ); Map> allocators = new HashMap<>(); allocators.put( BALANCED_ALLOCATOR, - () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory) + () -> new BalancedShardsAllocator( + balancerSettings, + writeLoadForecaster, + balancingWeightsFactory, + nonPreferredShardIteratorFactory + ) ); allocators.put( DESIRED_BALANCE_ALLOCATOR, () -> new DesiredBalanceShardsAllocator( clusterSettings, - new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory), + new BalancedShardsAllocator( + balancerSettings, + writeLoadForecaster, + balancingWeightsFactory, + nonPreferredShardIteratorFactory + ), threadPool, clusterService, reconciler, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 3104f07ada7dc..263c2362c9ed6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -36,7 +36,6 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.elasticsearch.cluster.routing.allocation.allocator.NonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -90,7 +89,6 @@ public class AllocationService { private final ClusterInfoService clusterInfoService; private final SnapshotsInfoService snapshotsInfoService; private final ShardRoutingRoleStrategy shardRoutingRoleStrategy; - private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; // only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator @SuppressWarnings("this-escape") @@ -102,14 +100,7 @@ public AllocationService( SnapshotsInfoService snapshotsInfoService, ShardRoutingRoleStrategy shardRoutingRoleStrategy ) { - this( - allocationDeciders, - shardsAllocator, - clusterInfoService, - snapshotsInfoService, - shardRoutingRoleStrategy, - NonPreferredShardIteratorFactory.NOOP - ); + this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, shardRoutingRoleStrategy); setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator)); } @@ -118,15 +109,13 @@ public AllocationService( ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService, - ShardRoutingRoleStrategy shardRoutingRoleStrategy, - NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory + ShardRoutingRoleStrategy shardRoutingRoleStrategy ) { this.allocationDeciders = allocationDeciders; this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; this.snapshotsInfoService = snapshotsInfoService; this.shardRoutingRoleStrategy = shardRoutingRoleStrategy; - this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; } /** @@ -775,8 +764,7 @@ private RoutingAllocation createRoutingAllocation(ClusterState clusterState, lon clusterState, clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes(), - currentNanoTime, - nonPreferredShardIteratorFactory + currentNanoTime ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 34f255d42d4ea..3e9f934572585 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.allocation.allocator.NonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.core.Nullable; @@ -36,7 +35,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -87,8 +85,6 @@ public class RoutingAllocation { // Tracks the sizes of the searchable snapshots that aren't yet registered in ClusterInfo by their cluster node id private final Map unaccountedSearchableSnapshotSizes; - private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; - public RoutingAllocation( AllocationDeciders deciders, ClusterState clusterState, @@ -96,7 +92,7 @@ public RoutingAllocation( SnapshotShardSizeInfo shardSizeInfo, long currentNanoTime ) { - this(deciders, null, clusterState, clusterInfo, shardSizeInfo, currentNanoTime, NonPreferredShardIteratorFactory.NOOP); + this(deciders, null, clusterState, clusterInfo, shardSizeInfo, currentNanoTime); } /** @@ -116,39 +112,16 @@ public RoutingAllocation( SnapshotShardSizeInfo shardSizeInfo, long currentNanoTime ) { - this( - deciders, - routingNodes, - clusterState, - clusterInfo, - shardSizeInfo, - currentNanoTime, - NonPreferredShardIteratorFactory.NOOP, - false - ); - } - - public RoutingAllocation( - AllocationDeciders deciders, - @Nullable RoutingNodes routingNodes, - ClusterState clusterState, - ClusterInfo clusterInfo, - SnapshotShardSizeInfo shardSizeInfo, - long currentNanoTime, - NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory - ) { - this(deciders, routingNodes, clusterState, clusterInfo, shardSizeInfo, currentNanoTime, nonPreferredShardIteratorFactory, false); + this(deciders, routingNodes, clusterState, clusterInfo, shardSizeInfo, currentNanoTime, false); } /** * Creates a new {@link RoutingAllocation} - * - * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations - * @param routingNodes Routing nodes in the current cluster or {@code null} if using those in the given cluster state - * @param clusterState cluster state before rerouting + * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations + * @param routingNodes Routing nodes in the current cluster or {@code null} if using those in the given cluster state + * @param clusterState cluster state before rerouting * @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()}) - * @param nonPreferredShardIteratorFactory factory for non-preferred shards iterator - * @param isSimulating {@code true} if "transient" deciders should be ignored because we are simulating the final allocation + * @param isSimulating {@code true} if "transient" deciders should be ignored because we are simulating the final allocation */ private RoutingAllocation( AllocationDeciders deciders, @@ -157,7 +130,6 @@ private RoutingAllocation( ClusterInfo clusterInfo, SnapshotShardSizeInfo shardSizeInfo, long currentNanoTime, - NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory, boolean isSimulating ) { this.deciders = deciders; @@ -184,7 +156,6 @@ private RoutingAllocation( resizeSourceIndexUpdater, new ShardChangesObserver() } ); - this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; } private static Map nodeReplacementTargets(ClusterState clusterState) { @@ -242,10 +213,6 @@ public RoutingTable routingTable() { return globalRoutingTable().getRoutingTable(); } - public Iterator nonPreferredShards() { - return nonPreferredShardIteratorFactory.createNonPreferredShardIterator(this); - } - public GlobalRoutingTable globalRoutingTable() { return clusterState.globalRoutingTable(); } @@ -492,7 +459,6 @@ public RoutingAllocation mutableCloneForSimulation() { clusterInfo, shardSizeInfo, currentNanoTime, - nonPreferredShardIteratorFactory, true ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index bb57d6e9269aa..8cd3a238ea333 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -114,6 +114,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private final BalancerSettings balancerSettings; private final WriteLoadForecaster writeLoadForecaster; private final BalancingWeightsFactory balancingWeightsFactory; + private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; public BalancedShardsAllocator() { this(Settings.EMPTY); @@ -124,18 +125,25 @@ public BalancedShardsAllocator(Settings settings) { } public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster) { - this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings)); + this( + balancerSettings, + writeLoadForecaster, + new GlobalBalancingWeightsFactory(balancerSettings), + NonPreferredShardIteratorFactory.NOOP + ); } @Inject public BalancedShardsAllocator( BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster, - BalancingWeightsFactory balancingWeightsFactory + BalancingWeightsFactory balancingWeightsFactory, + NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory ) { this.balancerSettings = balancerSettings; this.writeLoadForecaster = writeLoadForecaster; this.balancingWeightsFactory = balancingWeightsFactory; + this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; } @Override @@ -152,7 +160,13 @@ public void allocate(RoutingAllocation allocation) { return; } final BalancingWeights balancingWeights = balancingWeightsFactory.create(); - final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights); + final Balancer balancer = new Balancer( + writeLoadForecaster, + allocation, + balancerSettings.getThreshold(), + balancingWeights, + nonPreferredShardIteratorFactory + ); balancer.allocateUnassigned(); balancer.moveShards(); balancer.moveNonPreferred(); @@ -189,7 +203,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f writeLoadForecaster, allocation, balancerSettings.getThreshold(), - balancingWeightsFactory.create() + balancingWeightsFactory.create(), + nonPreferredShardIteratorFactory ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -249,12 +264,14 @@ public static class Balancer { private final Map nodes; private final BalancingWeights balancingWeights; private final NodeSorters nodeSorters; + private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; private Balancer( WriteLoadForecaster writeLoadForecaster, RoutingAllocation allocation, float threshold, - BalancingWeights balancingWeights + BalancingWeights balancingWeights, + NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory ) { this.writeLoadForecaster = writeLoadForecaster; this.allocation = allocation; @@ -267,6 +284,7 @@ private Balancer( nodes = Collections.unmodifiableMap(buildModelFromAssigned()); this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this); this.balancingWeights = balancingWeights; + this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; } private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { @@ -720,7 +738,9 @@ public void moveNonPreferred() { do { // Any time we move a shard, we need to update the cluster info and ask again for the non-preferred shards // as they may have changed - for (Iterator nonPreferredShards = allocation.nonPreferredShards(); nonPreferredShards.hasNext();) { + for (Iterator nonPreferredShards = nonPreferredShardIteratorFactory.createNonPreferredShardIterator( + allocation + ); nonPreferredShards.hasNext();) { if (tryMoveShardIfNonPreferred(nonPreferredShards.next())) { movedAShard = true; break; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java index 136b7cccdd68b..3e341413eb77f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationServiceTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.routing.allocation.allocator.NonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -155,8 +154,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing }, new EmptyClusterInfoService(), EmptySnapshotsInfoService.INSTANCE, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, - NonPreferredShardIteratorFactory.NOOP + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY ); final String unrealisticAllocatorName = "unrealistic"; @@ -270,8 +268,7 @@ public void testExplainsNonAllocationOfShardWithUnknownAllocator() { null, null, null, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, - NonPreferredShardIteratorFactory.NOOP + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY ); allocationService.setExistingShardsAllocators( Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()) @@ -393,8 +390,7 @@ public void testAutoExpandReplicas() throws Exception { null, new EmptyClusterInfoService(), EmptySnapshotsInfoService.INSTANCE, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, - NonPreferredShardIteratorFactory.NOOP + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY ); final ProjectId project1 = randomUniqueProjectId(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 3667de9c65e4e..3b31303ae2b6d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -621,7 +621,8 @@ public void testPartitionedClusterWithSeparateWeights() { TEST_WRITE_LOAD_FORECASTER, new PrefixBalancingWeightsFactory( Map.of("shardsOnly", new WeightFunction(1, 0, 0, 0), "weightsOnly", new WeightFunction(0, 0, 1, 0)) - ) + ), + NonPreferredShardIteratorFactory.NOOP ), EmptyClusterInfoService.INSTANCE, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index e5a212bc14dc3..1f8d59a958bfe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -1478,12 +1478,7 @@ public void allocate(RoutingAllocation allocation) { public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) { throw new AssertionError("should not be called"); } - }, - clusterInfoService, - snapshotsInfoService, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, - NonPreferredShardIteratorFactory.NOOP - ); + }, clusterInfoService, snapshotsInfoService, TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY); allocationService.setExistingShardsAllocators(Map.of(GatewayAllocator.ALLOCATOR_NAME, new NoOpExistingShardsAllocator())); return allocationService; } diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java index b9275feee0166..917eb59922962 100644 --- a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java @@ -31,7 +31,6 @@ import org.elasticsearch.cluster.routing.allocation.Explanations; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; -import org.elasticsearch.cluster.routing.allocation.allocator.NonPreferredShardIteratorFactory; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -148,8 +147,7 @@ public Decision canRebalance(RoutingAllocation allocation) { new BalancedShardsAllocator(Settings.EMPTY), clusterInfoService, snapshotsInfoService, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, - NonPreferredShardIteratorFactory.NOOP + TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY ); allocationService.setExistingShardsAllocators(Map.of(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator())); } From 0c34875d616bae20fa2c18cf5c08498ea24ed196 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 10 Sep 2025 18:31:48 +1000 Subject: [PATCH 16/35] Fix infinite loop --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 8cd3a238ea333..354639495b70c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -734,10 +734,11 @@ protected int comparePivot(int j) { * Move started shards that are in non-preferred allocations */ public void moveNonPreferred() { - boolean movedAShard = false; + boolean movedAShard; do { // Any time we move a shard, we need to update the cluster info and ask again for the non-preferred shards // as they may have changed + movedAShard = false; for (Iterator nonPreferredShards = nonPreferredShardIteratorFactory.createNonPreferredShardIterator( allocation ); nonPreferredShards.hasNext();) { From b7fcc4ab49162ca667d23b49c5fa547e2f8b2f39 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 11 Sep 2025 14:36:11 +1000 Subject: [PATCH 17/35] Test/fix iterator logic --- ...faultNonPreferredShardIteratorFactory.java | 62 +++++++++---------- ...NonPreferredShardIteratorFactoryTests.java | 51 +++++++++++++++ 2 files changed, 82 insertions(+), 31 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java index 9e55f2d1973fe..af9314d7beac3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -52,37 +52,7 @@ public Iterator createNonPreferredShardIterator(RoutingAllocation hotSpottedNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis())); } } - return new NodeShardIterator(hotSpottedNodes.iterator()); - } - - private static class NodeShardIterator implements Iterator { - - private final Iterator iterator; - private Iterator currentShardIterator; - - private NodeShardIterator(Iterator iterator) { - this.iterator = iterator; - } - - @Override - public boolean hasNext() { - if (currentShardIterator == null || currentShardIterator.hasNext() == false) { - if (iterator.hasNext()) { - currentShardIterator = iterator.next().iterator(); - } else { - return false; - } - } - return currentShardIterator.hasNext(); - } - - @Override - public ShardRouting next() { - if (currentShardIterator == null) { - currentShardIterator = iterator.next().iterator(); - } - return currentShardIterator.next(); - } + return new LazilyExpandingShardIterator<>(hotSpottedNodes); } private static class NodeShardIterable implements Iterable, Comparable { @@ -125,4 +95,34 @@ private Iterator createShardIterator() { return sortedRoutings.iterator(); } } + + static class LazilyExpandingShardIterator implements Iterator { + + private final Iterator> allIterables; + private Iterator currentIterator; + + LazilyExpandingShardIterator(Iterable> allIterables) { + this.allIterables = allIterables.iterator(); + } + + @Override + public boolean hasNext() { + while (currentIterator == null || currentIterator.hasNext() == false) { + if (allIterables.hasNext() == false) { + return false; + } else { + currentIterator = allIterables.next().iterator(); + } + } + return true; + } + + @Override + public T next() { + while (currentIterator == null || currentIterator.hasNext() == false) { + currentIterator = allIterables.next().iterator(); + } + return currentIterator.next(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java new file mode 100644 index 0000000000000..4e789aab7c124 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.cluster.routing.allocation.allocator.DefaultNonPreferredShardIteratorFactory.LazilyExpandingShardIterator; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.IntStream; + +public class DefaultNonPreferredShardIteratorFactoryTests extends ESTestCase { + + public void testLazilyExpandingIterator() { + final List> allValues = new ArrayList<>(); + final List flatValues = new ArrayList<>(); + IntStream.range(0, randomIntBetween(0, 30)).forEach(i -> { + int listSize = randomIntBetween(0, 10); + final var innerList = IntStream.range(0, listSize).mapToObj(j -> (i + "/" + j)).toList(); + allValues.add(innerList); + flatValues.addAll(innerList); + }); + + Iterator iterator = new LazilyExpandingShardIterator<>(allValues); + + int nextIndex = 0; + while (true) { + if (randomBoolean()) { + assertEquals(iterator.hasNext(), nextIndex < flatValues.size()); + } else { + if (nextIndex < flatValues.size()) { + assertEquals(iterator.next(), flatValues.get(nextIndex++)); + } else { + assertThrows(NoSuchElementException.class, iterator::next); + } + } + if (randomBoolean() && nextIndex == flatValues.size()) { + break; + } + } + } +} From ab73569012171f90138904ef590b44032d2684b4 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 11 Sep 2025 15:53:00 +1000 Subject: [PATCH 18/35] Test shard iteration order --- ...NonPreferredShardIteratorFactoryTests.java | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java index 4e789aab7c124..d6fae8f137e8b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -9,15 +9,32 @@ package org.elasticsearch.cluster.routing.allocation.allocator; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DefaultNonPreferredShardIteratorFactory.LazilyExpandingShardIterator; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.stream.IntStream; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + public class DefaultNonPreferredShardIteratorFactoryTests extends ESTestCase { public void testLazilyExpandingIterator() { @@ -48,4 +65,97 @@ public void testLazilyExpandingIterator() { } } } + + public void testShardIterationOrder() { + final var iteratorFactory = new DefaultNonPreferredShardIteratorFactory( + new WriteLoadConstraintSettings( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ) + ) + ); + final RoutingAllocation routingAllocation = createRoutingAllocation(randomIntBetween(2, 20)); + final Iterator shards = iteratorFactory.createNonPreferredShardIterator(routingAllocation); + + long lastNodeQueueLatency = -1; + int totalCount = 0; + while (shards.hasNext()) { + totalCount++; + final ShardRouting shardRouting = shards.next(); + final String nodeId = shardRouting.currentNodeId(); + NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = routingAllocation.clusterInfo() + .getNodeUsageStatsForThreadPools() + .get(nodeId); + assertNotNull(nodeUsageStatsForThreadPools); + long thisNodeQueueLatency = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE) + .maxThreadPoolQueueLatencyMillis(); + // should receive shards from nodes in descending queue latency order + if (lastNodeQueueLatency != -1) { + assertThat(thisNodeQueueLatency, lessThanOrEqualTo(lastNodeQueueLatency)); + } + lastNodeQueueLatency = thisNodeQueueLatency; + final Double shardWriteLoad = routingAllocation.clusterInfo().getShardWriteLoads().get(shardRouting.shardId()); + // Should not receive shards with no write-load + assertNotNull(shardWriteLoad); + // TODO: assert on shard order, when we know what we want that to be + } + + if (totalCount > 0) { + assertThat(lastNodeQueueLatency, greaterThanOrEqualTo(0L)); + } + } + + private RoutingAllocation createRoutingAllocation(int numberOfNodes) { + int writeThreadPoolSize = randomIntBetween(4, 32); + final Map nodeUsageStats = new HashMap<>(); + final List allNodeIds = new ArrayList<>(); + IntStream.range(0, numberOfNodes).mapToObj(i -> "node_" + i).forEach(nodeId -> { + // Some have no utilization + if (usually()) { + nodeUsageStats.put( + nodeId, + new NodeUsageStatsForThreadPools( + nodeId, + Map.of( + ThreadPool.Names.WRITE, + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + writeThreadPoolSize, + randomFloatBetween(0.0f, 1.0f, true), + randomLongBetween(0, 5_000) + ) + ) + ) + ); + } + allNodeIds.add(nodeId); + }); + + ClusterInfo.Builder clusterInfo = ClusterInfo.builder().nodeUsageStatsForThreadPools(nodeUsageStats); + + final int numberOfPrimaries = randomIntBetween(1, numberOfNodes * 3); + final ClusterState state = ClusterStateCreationUtils.state( + numberOfNodes, + new String[] { randomIdentifier(), randomIdentifier(), randomIdentifier() }, + numberOfPrimaries + ); + + final Map shardWriteLoads = new HashMap<>(); + for (RoutingNode node : state.getRoutingNodes()) { + for (ShardRouting shardRouting : node) { + // Some have no write-load + if (usually()) { + shardWriteLoads.put(shardRouting.shardId(), randomDoubleBetween(0.0, writeThreadPoolSize, true)); + } + } + } + clusterInfo.shardWriteLoads(shardWriteLoads); + + return new RoutingAllocation(null, state, clusterInfo.build(), null, System.nanoTime()); + } } From cebf3a9118735b41cf3243146ff7392f3bfc6736 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 11 Sep 2025 16:04:50 +1000 Subject: [PATCH 19/35] Use Iterable instead of Iterator --- .../allocator/BalancedShardsAllocator.java | 26 +++++++++---------- ...faultNonPreferredShardIteratorFactory.java | 6 ++--- .../NonPreferredShardIteratorFactory.java | 5 ++-- ...NonPreferredShardIteratorFactoryTests.java | 5 ++-- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 354639495b70c..c9adc3a1e3094 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -734,21 +734,19 @@ protected int comparePivot(int j) { * Move started shards that are in non-preferred allocations */ public void moveNonPreferred() { - boolean movedAShard; - do { - // Any time we move a shard, we need to update the cluster info and ask again for the non-preferred shards - // as they may have changed - movedAShard = false; - for (Iterator nonPreferredShards = nonPreferredShardIteratorFactory.createNonPreferredShardIterator( - allocation - ); nonPreferredShards.hasNext();) { - if (tryMoveShardIfNonPreferred(nonPreferredShards.next())) { - movedAShard = true; - break; - } - } + while (moveASingleNonPreferredShard()) { + // keep trying until we're unable to move any more // TODO: Update cluster info - } while (movedAShard); + } + } + + private boolean moveASingleNonPreferredShard() { + for (ShardRouting shardRouting : nonPreferredShardIteratorFactory.createNonPreferredShardIterator(allocation)) { + if (tryMoveShardIfNonPreferred(shardRouting)) { + return true; + } + } + return false; } private boolean tryMoveShardIfNonPreferred(ShardRouting shardRouting) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java index af9314d7beac3..416fe63a44885 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -38,9 +38,9 @@ public DefaultNonPreferredShardIteratorFactory(WriteLoadConstraintSettings write } @Override - public Iterator createNonPreferredShardIterator(RoutingAllocation allocation) { + public Iterable createNonPreferredShardIterator(RoutingAllocation allocation) { if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { - return Collections.emptyIterator(); + return Collections.emptyList(); } final Set hotSpottedNodes = new TreeSet<>(Comparator.reverseOrder()); final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); @@ -52,7 +52,7 @@ public Iterator createNonPreferredShardIterator(RoutingAllocation hotSpottedNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis())); } } - return new LazilyExpandingShardIterator<>(hotSpottedNodes); + return () -> new LazilyExpandingShardIterator<>(hotSpottedNodes); } private static class NodeShardIterable implements Iterable, Comparable { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java index 4ec22edeff1e6..d4216532c5f63 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java @@ -13,11 +13,10 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import java.util.Collections; -import java.util.Iterator; public interface NonPreferredShardIteratorFactory { - NonPreferredShardIteratorFactory NOOP = ignored -> Collections.emptyIterator(); + NonPreferredShardIteratorFactory NOOP = ignored -> Collections.emptyList(); /** * Create an iterator returning all shards to be checked for non-preferred allocation, ordered in @@ -26,5 +25,5 @@ public interface NonPreferredShardIteratorFactory { * @param allocation the current routing allocation * @return An iterator containing shards we'd like to move to a preferred allocation */ - Iterator createNonPreferredShardIterator(RoutingAllocation allocation); + Iterable createNonPreferredShardIterator(RoutingAllocation allocation); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java index d6fae8f137e8b..c538dd3600295 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -80,13 +80,12 @@ public void testShardIterationOrder() { ) ); final RoutingAllocation routingAllocation = createRoutingAllocation(randomIntBetween(2, 20)); - final Iterator shards = iteratorFactory.createNonPreferredShardIterator(routingAllocation); + final Iterable shards = iteratorFactory.createNonPreferredShardIterator(routingAllocation); long lastNodeQueueLatency = -1; int totalCount = 0; - while (shards.hasNext()) { + for (ShardRouting shardRouting : shards) { totalCount++; - final ShardRouting shardRouting = shards.next(); final String nodeId = shardRouting.currentNodeId(); NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = routingAllocation.clusterInfo() .getNodeUsageStatsForThreadPools() From e215a55315f22fff96084687bdfc1fe61c31dc6f Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 11 Sep 2025 16:09:21 +1000 Subject: [PATCH 20/35] Comment --- .../allocator/DefaultNonPreferredShardIteratorFactoryTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java index c538dd3600295..e92428d2a9c95 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -90,6 +90,7 @@ public void testShardIterationOrder() { NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = routingAllocation.clusterInfo() .getNodeUsageStatsForThreadPools() .get(nodeId); + // Should not receive shards from nodes with no usage stats assertNotNull(nodeUsageStatsForThreadPools); long thisNodeQueueLatency = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap() .get(ThreadPool.Names.WRITE) From 53c7b75d7e1cee853ceb08a93886f0bcea04418e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 11 Sep 2025 16:11:39 +1000 Subject: [PATCH 21/35] Test when decider not fully enabled --- ...NonPreferredShardIteratorFactoryTests.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java index e92428d2a9c95..bfc0cf702555a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -111,6 +111,27 @@ public void testShardIterationOrder() { } } + public void testNoShardsAreReturnedWhenWriteLoadDeciderNotFullyEnabled() { + final var iteratorFactory = new DefaultNonPreferredShardIteratorFactory( + new WriteLoadConstraintSettings( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + randomFrom( + WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED, + WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY + ) + ) + .build() + ) + ) + ); + final RoutingAllocation routingAllocation = createRoutingAllocation(randomIntBetween(2, 20)); + final Iterable shards = iteratorFactory.createNonPreferredShardIterator(routingAllocation); + assertFalse(shards.iterator().hasNext()); + } + private RoutingAllocation createRoutingAllocation(int numberOfNodes) { int writeThreadPoolSize = randomIntBetween(4, 32); final Map nodeUsageStats = new HashMap<>(); From 0d4a5c724ad14dc899a7aef7c1f5976378035b86 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 11 Sep 2025 17:00:45 +1000 Subject: [PATCH 22/35] Naming --- .../allocator/DefaultNonPreferredShardIteratorFactory.java | 6 +++--- .../DefaultNonPreferredShardIteratorFactoryTests.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java index 416fe63a44885..c7189d58017f2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -52,7 +52,7 @@ public Iterable createNonPreferredShardIterator(RoutingAllocation hotSpottedNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis())); } } - return () -> new LazilyExpandingShardIterator<>(hotSpottedNodes); + return () -> new LazilyExpandingIterator<>(hotSpottedNodes); } private static class NodeShardIterable implements Iterable, Comparable { @@ -96,12 +96,12 @@ private Iterator createShardIterator() { } } - static class LazilyExpandingShardIterator implements Iterator { + static class LazilyExpandingIterator implements Iterator { private final Iterator> allIterables; private Iterator currentIterator; - LazilyExpandingShardIterator(Iterable> allIterables) { + LazilyExpandingIterator(Iterable> allIterables) { this.allIterables = allIterables.iterator(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java index bfc0cf702555a..5dddddbdd9cf4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -17,7 +17,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; -import org.elasticsearch.cluster.routing.allocation.allocator.DefaultNonPreferredShardIteratorFactory.LazilyExpandingShardIterator; +import org.elasticsearch.cluster.routing.allocation.allocator.DefaultNonPreferredShardIteratorFactory.LazilyExpandingIterator; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; @@ -47,7 +47,7 @@ public void testLazilyExpandingIterator() { flatValues.addAll(innerList); }); - Iterator iterator = new LazilyExpandingShardIterator<>(allValues); + Iterator iterator = new LazilyExpandingIterator<>(allValues); int nextIndex = 0; while (true) { From af93b87a91039db7462c1a4eb5b58cfc34d25700 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 16 Sep 2025 17:32:16 +1000 Subject: [PATCH 23/35] Only move a single non-preferred shard, do move non-preferred before moveShards --- .../allocator/BalancedShardsAllocator.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c9adc3a1e3094..1dbfe61650c60 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -167,13 +167,18 @@ public void allocate(RoutingAllocation allocation) { balancingWeights, nonPreferredShardIteratorFactory ); - balancer.allocateUnassigned(); - balancer.moveShards(); - balancer.moveNonPreferred(); - balancer.balance(); - // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy. - collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation); + try { + balancer.allocateUnassigned(); + if (balancer.moveNonPreferred()) { + return; + } + balancer.moveShards(); + balancer.balance(); + } finally { + // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy. + collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation); + } } private void collectAndRecordNodeWeightStats(Balancer balancer, BalancingWeights balancingWeights, RoutingAllocation allocation) { @@ -731,16 +736,11 @@ protected int comparePivot(int j) { } /** - * Move started shards that are in non-preferred allocations + * Move a started shard in a non-preferred allocation + * + * @return true if a shard was moved, false otherwise */ - public void moveNonPreferred() { - while (moveASingleNonPreferredShard()) { - // keep trying until we're unable to move any more - // TODO: Update cluster info - } - } - - private boolean moveASingleNonPreferredShard() { + private boolean moveNonPreferred() { for (ShardRouting shardRouting : nonPreferredShardIteratorFactory.createNonPreferredShardIterator(allocation)) { if (tryMoveShardIfNonPreferred(shardRouting)) { return true; From de052a34231df82f59eeca356d2a8390f45a6dce Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Wed, 17 Sep 2025 18:57:17 +1000 Subject: [PATCH 24/35] Sort shards correctly --- ...faultNonPreferredShardIteratorFactory.java | 68 +++++++++++++++---- ...NonPreferredShardIteratorFactoryTests.java | 32 +++++++-- 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java index c7189d58017f2..25b49543a1221 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -13,15 +13,16 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; -import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.stream.StreamSupport; /** * Non-preferred shard iterator factory that returns the most desirable shards from most-hot-spotted @@ -79,20 +80,57 @@ public int compareTo(NodeShardIterable o) { private Iterator createShardIterator() { final var shardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); - final List sortedRoutings = new ArrayList<>(); - double totalWriteLoad = 0; - for (ShardRouting shard : routingNode) { - Double shardWriteLoad = shardWriteLoads.get(shard.shardId()); - if (shardWriteLoad != null) { - sortedRoutings.add(shard); - totalWriteLoad += shardWriteLoad; - } + return StreamSupport.stream(routingNode.spliterator(), false) + .sorted(new TieringWriteLoadComparator(shardWriteLoads)) + .toList() + .iterator(); + } + } + + /** + * Sorts shards by "tiered" write load, then descending write load inside tiers + * + * e.g., MEDIUM/0.7, MEDIUM/0.65, HIGH/0.9, HIGH/0.81, LOW/0.4, LOW/0.2, LOW/0.1 + */ + private static class TieringWriteLoadComparator implements Comparator { + + private final Map shardWriteLoads; + private final double lowThreshold; + private final double highThreshold; + private final Comparator comparator; + + /** + * Enum order is prioritization order + */ + private enum Tier { + MEDIUM, + HIGH, + LOW + } + + private TieringWriteLoadComparator(Map shardWriteLoads) { + this.shardWriteLoads = shardWriteLoads; + double maxWriteLoad = shardWriteLoads.values().stream().reduce(0.0, Double::max); + this.lowThreshold = maxWriteLoad * 0.5; + this.highThreshold = maxWriteLoad * 0.8; + this.comparator = Comparator.comparing(this::getTier) + .thenComparing(sr -> shardWriteLoads.getOrDefault(sr.shardId(), 0.0), Comparator.reverseOrder()); + } + + @Override + public int compare(ShardRouting o1, ShardRouting o2) { + return comparator.compare(o1, o2); + } + + private Tier getTier(ShardRouting shard) { + double writeLoad = shardWriteLoads.getOrDefault(shard.shardId(), 0.0); + if (writeLoad < lowThreshold) { + return Tier.LOW; + } else if (writeLoad < highThreshold) { + return Tier.MEDIUM; + } else { + return Tier.HIGH; } - // TODO: Work out what this order should be - // Sort by distance-from-mean-write-load - double meanWriteLoad = totalWriteLoad / sortedRoutings.size(); - sortedRoutings.sort(Comparator.comparing(sr -> Math.abs(shardWriteLoads.get(sr.shardId()) - meanWriteLoad))); - return sortedRoutings.iterator(); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java index 5dddddbdd9cf4..dc969762e6679 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.stream.IntStream; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -80,13 +81,21 @@ public void testShardIterationOrder() { ) ); final RoutingAllocation routingAllocation = createRoutingAllocation(randomIntBetween(2, 20)); + final double maxShardWriteLoad = routingAllocation.clusterInfo().getShardWriteLoads().values().stream().reduce(0.0, Double::max); + final double lowThreshold = maxShardWriteLoad * 0.5; + final double highThreshold = maxShardWriteLoad * 0.8; final Iterable shards = iteratorFactory.createNonPreferredShardIterator(routingAllocation); + String nodeId = null; + Tier lastTierForNode = null; long lastNodeQueueLatency = -1; int totalCount = 0; for (ShardRouting shardRouting : shards) { totalCount++; - final String nodeId = shardRouting.currentNodeId(); + if (Objects.equals(nodeId, shardRouting.currentNodeId()) == false) { + lastTierForNode = null; + nodeId = shardRouting.currentNodeId(); + } NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = routingAllocation.clusterInfo() .getNodeUsageStatsForThreadPools() .get(nodeId); @@ -100,10 +109,13 @@ public void testShardIterationOrder() { assertThat(thisNodeQueueLatency, lessThanOrEqualTo(lastNodeQueueLatency)); } lastNodeQueueLatency = thisNodeQueueLatency; - final Double shardWriteLoad = routingAllocation.clusterInfo().getShardWriteLoads().get(shardRouting.shardId()); - // Should not receive shards with no write-load - assertNotNull(shardWriteLoad); - // TODO: assert on shard order, when we know what we want that to be + final double shardWriteLoad = routingAllocation.clusterInfo().getShardWriteLoads().getOrDefault(shardRouting.shardId(), 0.0); + // Inside nodes, shards should be delivered in tier order + Tier tier = tierFor(shardWriteLoad, lowThreshold, highThreshold); + if (lastTierForNode != null) { + assertThat(tier, greaterThanOrEqualTo(lastTierForNode)); + } + lastTierForNode = tier; } if (totalCount > 0) { @@ -111,6 +123,16 @@ public void testShardIterationOrder() { } } + private Tier tierFor(double writeLoad, double lowThreshold, double highThreshold) { + return writeLoad < lowThreshold ? Tier.LOW : writeLoad < highThreshold ? Tier.MEDIUM : Tier.HIGH; + } + + private enum Tier { + MEDIUM, + HIGH, + LOW + } + public void testNoShardsAreReturnedWhenWriteLoadDeciderNotFullyEnabled() { final var iteratorFactory = new DefaultNonPreferredShardIteratorFactory( new WriteLoadConstraintSettings( From d59ea2bd413f6c633bc22fe3f3792af1e6097737 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 11:32:24 +1000 Subject: [PATCH 25/35] Use streams instead of sorting shards up-front --- ...faultNonPreferredShardIteratorFactory.java | 110 +++++++++--------- 1 file changed, 52 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java index 25b49543a1221..699b725a6e3e1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.function.Function; +import java.util.stream.Stream; import java.util.stream.StreamSupport; /** @@ -43,30 +45,40 @@ public Iterable createNonPreferredShardIterator(RoutingAllocation if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { return Collections.emptyList(); } - final Set hotSpottedNodes = new TreeSet<>(Comparator.reverseOrder()); + final Set allClusterNodes = new TreeSet<>(Comparator.reverseOrder()); final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); for (RoutingNode node : allocation.routingNodes()) { var nodeUsageStats = nodeUsageStatsForThreadPools.get(node.nodeId()); if (nodeUsageStats != null) { final var writeThreadPoolStats = nodeUsageStats.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); assert writeThreadPoolStats != null; - hotSpottedNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis())); + allClusterNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis())); + } else { + allClusterNodes.add(new NodeShardIterable(allocation, node, 0L)); } } - return () -> new LazilyExpandingIterator<>(hotSpottedNodes); + return () -> new LazilyExpandingIterator<>(allClusterNodes); } - private static class NodeShardIterable implements Iterable, Comparable { + /** + * Returns all shards from a node in the order + * + *
    + *
  1. shards with medium write-load
  2. + *
  3. shards with high write-load
  4. + *
  5. shards with low write-load
  6. + *
+ * + * Where low and high thresholds are {@link #LOW_THRESHOLD} * max-write-load + * and {@link #HIGH_THRESHOLD} * max-write-load respectively. + */ + private record NodeShardIterable(RoutingAllocation allocation, RoutingNode routingNode, long maxQueueLatencyMillis) + implements + Iterable, + Comparable { - private final RoutingAllocation allocation; - private final RoutingNode routingNode; - private final long maxQueueLatencyMillis; - - private NodeShardIterable(RoutingAllocation allocation, RoutingNode routingNode, long maxQueueLatencyMillis) { - this.allocation = allocation; - this.routingNode = routingNode; - this.maxQueueLatencyMillis = maxQueueLatencyMillis; - } + private static final double LOW_THRESHOLD = 0.5; + private static final double HIGH_THRESHOLD = 0.8; @Override public Iterator iterator() { @@ -80,56 +92,38 @@ public int compareTo(NodeShardIterable o) { private Iterator createShardIterator() { final var shardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); - return StreamSupport.stream(routingNode.spliterator(), false) - .sorted(new TieringWriteLoadComparator(shardWriteLoads)) - .toList() - .iterator(); + final WriteLoadFilter filter = WriteLoadFilter.create(shardWriteLoads); + return Stream.of( + StreamSupport.stream(routingNode.spliterator(), false).filter(filter::hasMediumLoad), + StreamSupport.stream(routingNode.spliterator(), false).filter(filter::hasHighLoad), + StreamSupport.stream(routingNode.spliterator(), false).filter(filter::hasLowLoad) + ).flatMap(Function.identity()).iterator(); } - } - /** - * Sorts shards by "tiered" write load, then descending write load inside tiers - * - * e.g., MEDIUM/0.7, MEDIUM/0.65, HIGH/0.9, HIGH/0.81, LOW/0.4, LOW/0.2, LOW/0.1 - */ - private static class TieringWriteLoadComparator implements Comparator { - - private final Map shardWriteLoads; - private final double lowThreshold; - private final double highThreshold; - private final Comparator comparator; - - /** - * Enum order is prioritization order - */ - private enum Tier { - MEDIUM, - HIGH, - LOW - } + private record WriteLoadFilter(Map shardWriteLoads, double lowThreshold, double highThreshold) { - private TieringWriteLoadComparator(Map shardWriteLoads) { - this.shardWriteLoads = shardWriteLoads; - double maxWriteLoad = shardWriteLoads.values().stream().reduce(0.0, Double::max); - this.lowThreshold = maxWriteLoad * 0.5; - this.highThreshold = maxWriteLoad * 0.8; - this.comparator = Comparator.comparing(this::getTier) - .thenComparing(sr -> shardWriteLoads.getOrDefault(sr.shardId(), 0.0), Comparator.reverseOrder()); - } + public static WriteLoadFilter create(Map shardWriteLoads) { + final double maxWriteLoad = shardWriteLoads.values().stream().reduce(0.0, Double::max); + final double lowThreshold = maxWriteLoad * NodeShardIterable.LOW_THRESHOLD; + final double highThreshold = maxWriteLoad * NodeShardIterable.HIGH_THRESHOLD; + return new WriteLoadFilter(shardWriteLoads, lowThreshold, highThreshold); + } - @Override - public int compare(ShardRouting o1, ShardRouting o2) { - return comparator.compare(o1, o2); - } + public boolean hasMediumLoad(ShardRouting shardRouting) { + double shardWriteLoad = shardWriteLoad(shardRouting); + return shardWriteLoad >= lowThreshold && shardWriteLoad < highThreshold; + } - private Tier getTier(ShardRouting shard) { - double writeLoad = shardWriteLoads.getOrDefault(shard.shardId(), 0.0); - if (writeLoad < lowThreshold) { - return Tier.LOW; - } else if (writeLoad < highThreshold) { - return Tier.MEDIUM; - } else { - return Tier.HIGH; + public boolean hasHighLoad(ShardRouting shardRouting) { + return shardWriteLoad(shardRouting) >= highThreshold; + } + + public boolean hasLowLoad(ShardRouting shardRouting) { + return shardWriteLoad(shardRouting) < lowThreshold; + } + + private double shardWriteLoad(ShardRouting shardRouting) { + return shardWriteLoads.getOrDefault(shardRouting.shardId(), 0.0); } } } From 6093a7a55813e5764d96db4d80136872f405cb2a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 12:00:02 +1000 Subject: [PATCH 26/35] Fix javadoc --- .../allocator/DefaultNonPreferredShardIteratorFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java index 699b725a6e3e1..97d2cc8f1ef79 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -29,8 +29,8 @@ /** * Non-preferred shard iterator factory that returns the most desirable shards from most-hot-spotted * nodes first. - * Does not return nodes for which we have no write-pool utilization, or shards for which we have no - * write-load data. + * Any nodes missing queue latency information are considered to have a queue latency of 0. + * Any shards missing write load information are considered to have a write load of 0. */ public class DefaultNonPreferredShardIteratorFactory implements NonPreferredShardIteratorFactory { From cf03477631aa0d4f687f0a201ed17c7a328e6c1d Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 12:00:54 +1000 Subject: [PATCH 27/35] Use record class --- .../DefaultNonPreferredShardIteratorFactory.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java index 97d2cc8f1ef79..1e2db0bf4183f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -32,13 +32,9 @@ * Any nodes missing queue latency information are considered to have a queue latency of 0. * Any shards missing write load information are considered to have a write load of 0. */ -public class DefaultNonPreferredShardIteratorFactory implements NonPreferredShardIteratorFactory { - - private final WriteLoadConstraintSettings writeLoadConstraintSettings; - - public DefaultNonPreferredShardIteratorFactory(WriteLoadConstraintSettings writeLoadConstraintSettings) { - this.writeLoadConstraintSettings = writeLoadConstraintSettings; - } +public record DefaultNonPreferredShardIteratorFactory(WriteLoadConstraintSettings writeLoadConstraintSettings) + implements + NonPreferredShardIteratorFactory { @Override public Iterable createNonPreferredShardIterator(RoutingAllocation allocation) { From e79a84a9f33df6cbae8d5cbe142e2d95fd973030 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 15:39:51 +1000 Subject: [PATCH 28/35] Test that all shards are returned --- ...NonPreferredShardIteratorFactoryTests.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java index dc969762e6679..cece9903846c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactoryTests.java @@ -31,8 +31,12 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.StreamSupport; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -123,6 +127,30 @@ public void testShardIterationOrder() { } } + public void testThatAllShardsAreReturnedOnce() { + final var iteratorFactory = new DefaultNonPreferredShardIteratorFactory( + new WriteLoadConstraintSettings( + ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ) + ) + ); + final RoutingAllocation routingAllocation = createRoutingAllocation(randomIntBetween(2, 20)); + final Set allShardRoutings = routingAllocation.routingNodes() + .stream() + .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) + .collect(Collectors.toSet()); + for (ShardRouting shard : iteratorFactory.createNonPreferredShardIterator(routingAllocation)) { + assertTrue(allShardRoutings.remove(shard)); + } + assertThat(allShardRoutings, empty()); + } + private Tier tierFor(double writeLoad, double lowThreshold, double highThreshold) { return writeLoad < lowThreshold ? Tier.LOW : writeLoad < highThreshold ? Tier.MEDIUM : Tier.HIGH; } From ac0ec291bc51db43d5c93d6bdd211ca0411c3e08 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 16:09:30 +1000 Subject: [PATCH 29/35] Add NODE_INTERLEAVED as an iteration order --- .../allocation/allocator/BalancedShardsAllocator.java | 2 +- .../allocator/NonPreferredShardIteratorFactory.java | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 1dbfe61650c60..0ed7ee795d65d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -129,7 +129,7 @@ public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForec balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings), - NonPreferredShardIteratorFactory.NOOP + NonPreferredShardIteratorFactory.NODE_INTERLEAVED ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java index d4216532c5f63..842b4fa142afb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java @@ -16,8 +16,16 @@ public interface NonPreferredShardIteratorFactory { + /** + * Doesn't iterate over the shards at all, can be used to disable movement of NON_PREFERRED shards. + */ NonPreferredShardIteratorFactory NOOP = ignored -> Collections.emptyList(); + /** + * Just iterates over all shards using {@link org.elasticsearch.cluster.routing.RoutingNodes#nodeInterleavedShardIterator()} + */ + NonPreferredShardIteratorFactory NODE_INTERLEAVED = allocation -> () -> allocation.routingNodes().nodeInterleavedShardIterator(); + /** * Create an iterator returning all shards to be checked for non-preferred allocation, ordered in * descending desirability-to-move order From c2ee39f6efb0511017e135ba7ba743d74cde290e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 16:14:16 +1000 Subject: [PATCH 30/35] Javadoc for NonPreferredShardIteratorFactory --- .../allocator/NonPreferredShardIteratorFactory.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java index 842b4fa142afb..b92d3d9ff49f7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/NonPreferredShardIteratorFactory.java @@ -14,6 +14,11 @@ import java.util.Collections; +/** + * A factory that produces {@link Iterable}s of {@link ShardRouting}s used to look for non-preferred allocation and + * try to relocate them. The first shard encountered that the allocation deciders indicate is in a NOT_PREFERRED + * allocation, and can be moved to a preferred allocation, will be moved and the iteration will stop. + */ public interface NonPreferredShardIteratorFactory { /** From 69a545af55449428eb66c8245f6cb35db0727339 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 16:19:12 +1000 Subject: [PATCH 31/35] Javadoc --- .../allocator/DefaultNonPreferredShardIteratorFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java index 1e2db0bf4183f..b4df7220a1975 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DefaultNonPreferredShardIteratorFactory.java @@ -29,8 +29,10 @@ /** * Non-preferred shard iterator factory that returns the most desirable shards from most-hot-spotted * nodes first. - * Any nodes missing queue latency information are considered to have a queue latency of 0. - * Any shards missing write load information are considered to have a write load of 0. + *
    + *
  • Any nodes missing queue-latency information are considered to have a queue-latency of 0.
  • + *
  • Any shards missing write-load information are considered to have a write-load of 0.
  • + *
*/ public record DefaultNonPreferredShardIteratorFactory(WriteLoadConstraintSettings writeLoadConstraintSettings) implements From c76af05d302f5510fdba5654a4ebd4c7dd986287 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 16:24:57 +1000 Subject: [PATCH 32/35] Try to simplify condition --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 0ed7ee795d65d..5ac900254d905 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -802,7 +802,7 @@ public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final Shard assert sourceNode != null && sourceNode.containsShard(index, shardRouting); RoutingNode routingNode = sourceNode.getRoutingNode(); Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); - if ((canRemain.type() == Type.NOT_PREFERRED || canRemain.type() == Type.NO) == false) { + if (canRemain.type() != Type.NOT_PREFERRED && canRemain.type() != Type.NO) { return MoveDecision.remain(canRemain); } From 630d06d514ed9ba1845d0fac11e4f26f02fbe7a9 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 16:38:29 +1000 Subject: [PATCH 33/35] in-line tryMoveShardIfNonPreferred --- .../allocator/BalancedShardsAllocator.java | 47 ++++++++----------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 5ac900254d905..ca20210f0efa2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -742,39 +742,32 @@ protected int comparePivot(int j) { */ private boolean moveNonPreferred() { for (ShardRouting shardRouting : nonPreferredShardIteratorFactory.createNonPreferredShardIterator(allocation)) { - if (tryMoveShardIfNonPreferred(shardRouting)) { + ProjectIndex index = projectIndex(shardRouting); + final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); + if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { + final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); + final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); + sourceNode.removeShard(index, shardRouting); + Tuple relocatingShards = routingNodes.relocateShard( + shardRouting, + targetNode.getNodeId(), + allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + "non-preferred", + allocation.changes() + ); + final ShardRouting shard = relocatingShards.v2(); + targetNode.addShard(projectIndex(shard), shard); + if (logger.isTraceEnabled()) { + logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); + } return true; + } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { + logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); } } return false; } - private boolean tryMoveShardIfNonPreferred(ShardRouting shardRouting) { - ProjectIndex index = projectIndex(shardRouting); - final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); - if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { - final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); - final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); - sourceNode.removeShard(index, shardRouting); - Tuple relocatingShards = routingNodes.relocateShard( - shardRouting, - targetNode.getNodeId(), - allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), - "non-preferred", - allocation.changes() - ); - final ShardRouting shard = relocatingShards.v2(); - targetNode.addShard(projectIndex(shard), shard); - if (logger.isTraceEnabled()) { - logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); - } - return true; - } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { - logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); - } - return false; - } - /** * Makes a decision on whether to move a started shard to another node. The following rules apply * to the {@link MoveDecision} return object: From 5103231404e812b1fdce4604e8c2fbcd3e82dd12 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 16:40:11 +1000 Subject: [PATCH 34/35] Move new behaviour together --- .../allocator/BalancedShardsAllocator.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ca20210f0efa2..f587b8b9c5ba2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -809,6 +809,15 @@ public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final Shard return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocatePreferredOnly); } + private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) { + Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation); + // not-preferred means no here + if (decision.type() == Type.NOT_PREFERRED) { + return Decision.NO; + } + return decision; + } + /** * Move started shards that can not be allocated to a node anymore * @@ -937,15 +946,6 @@ private MoveDecision decideMove( ); } - private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) { - Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation); - // not-preferred means no here - if (decision.type() == Type.NOT_PREFERRED) { - return Decision.NO; - } - return decision; - } - private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { // don't use canRebalance as we want hard filtering rules to apply. See #17698 return allocation.deciders().canAllocate(shardRouting, target, allocation); From e888265983bd33052e4ee95dfa61d472628bf9fd Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 18 Sep 2025 17:15:46 +1000 Subject: [PATCH 35/35] Comment on NOOP default --- .../allocation/allocator/BalancedShardsAllocator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index f587b8b9c5ba2..336b3ace5c899 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -129,7 +129,10 @@ public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForec balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings), - NonPreferredShardIteratorFactory.NODE_INTERLEAVED + // We need to default to no-op here because there are lots of tests + // that depend on not returning after a single move + // TODO: default to NODE_INTERLEAVED or similar + NonPreferredShardIteratorFactory.NOOP ); }