Skip to content
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
53bfa9b
Implement AllocationDeciders#findNonPreferred
nicktindall Sep 8, 2025
971a395
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_node
nicktindall Sep 8, 2025
f0f9f77
Fix assertion
nicktindall Sep 8, 2025
71232d5
Implement prioritisable problems
nicktindall Sep 8, 2025
c31b05a
Javadoc
nicktindall Sep 8, 2025
967e76e
Example for write load constraint decider
nicktindall Sep 8, 2025
d147a18
Fix text
nicktindall Sep 8, 2025
4f7b519
Tidy
nicktindall Sep 8, 2025
91ee197
Fix boolean logic
nicktindall Sep 9, 2025
1d3b08e
Introduce pluggable non-preferred iteration
nicktindall Sep 10, 2025
687c6e2
Implement NonPreferredShardIteratorFactory for resolving hot-spots
nicktindall Sep 10, 2025
1a4c85a
Remove unused default implementation
nicktindall Sep 10, 2025
a14af62
Merge remote-tracking branch 'origin/main' into ES-12739_pluggable_no…
nicktindall Sep 10, 2025
3196d21
Get rid of remnants of prior approach
nicktindall Sep 10, 2025
d63012c
Remove cruft
nicktindall Sep 10, 2025
7918b3b
Improve naming/javadoc
nicktindall Sep 10, 2025
d349668
Improve wiring
nicktindall Sep 10, 2025
0c34875
Fix infinite loop
nicktindall Sep 10, 2025
22ad4d9
Merge branch 'main' into ES-12739_pluggable_non_preferred_iteration
nicktindall Sep 10, 2025
b7fcc4a
Test/fix iterator logic
nicktindall Sep 11, 2025
ab73569
Test shard iteration order
nicktindall Sep 11, 2025
cebf3a9
Use Iterable instead of Iterator
nicktindall Sep 11, 2025
e215a55
Comment
nicktindall Sep 11, 2025
53c7b75
Test when decider not fully enabled
nicktindall Sep 11, 2025
0d4a5c7
Naming
nicktindall Sep 11, 2025
af93b87
Only move a single non-preferred shard, do move non-preferred before …
nicktindall Sep 16, 2025
de052a3
Sort shards correctly
nicktindall Sep 17, 2025
d59ea2b
Use streams instead of sorting shards up-front
nicktindall Sep 18, 2025
ab127a9
Merge remote-tracking branch 'origin/main' into ES-12739_pluggable_no…
nicktindall Sep 18, 2025
6093a7a
Fix javadoc
nicktindall Sep 18, 2025
cf03477
Use record class
nicktindall Sep 18, 2025
e79a84a
Test that all shards are returned
nicktindall Sep 18, 2025
a433918
Merge remote-tracking branch 'origin/main' into ES-12739_pluggable_no…
nicktindall Sep 18, 2025
ac0ec29
Add NODE_INTERLEAVED as an iteration order
nicktindall Sep 18, 2025
c2ee39f
Javadoc for NonPreferredShardIteratorFactory
nicktindall Sep 18, 2025
69a545a
Javadoc
nicktindall Sep 18, 2025
c76af05
Try to simplify condition
nicktindall Sep 18, 2025
630d06d
in-line tryMoveShardIfNonPreferred
nicktindall Sep 18, 2025
5103231
Move new behaviour together
nicktindall Sep 18, 2025
e888265
Comment on NOOP default
nicktindall Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.DefaultNonPreferredShardIteratorFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
Expand Down Expand Up @@ -504,16 +506,30 @@ private static ShardsAllocator createShardsAllocator(
ShardAllocationExplainer shardAllocationExplainer,
DesiredBalanceMetrics desiredBalanceMetrics
) {
WriteLoadConstraintSettings writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings);
DefaultNonPreferredShardIteratorFactory nonPreferredShardIteratorFactory = new DefaultNonPreferredShardIteratorFactory(
writeLoadConstraintSettings
);
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(
BALANCED_ALLOCATOR,
() -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory)
() -> new BalancedShardsAllocator(
balancerSettings,
writeLoadForecaster,
balancingWeightsFactory,
nonPreferredShardIteratorFactory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than passing the factory implementation through the BalancedShardsAllocator and Balancer constructors, could we directly add the logic to the Balancer in the first place? Avoid the factory. The other objects passed through the constructors are usually shared with other components, whereas the new logic only runs in the Balancer.

The moveNonPreferred could be gated by the WRITE_LOAD_DECIDER_ENABLED_SETTING. An alternative to the NOOP implementation. I don’t think tests would even be able to exercise moveNonPreferred without some hot-spot mocking to get to a 5 second queue latency, even if the new logic were enabled by default.

Though perhaps there was some other reason for the NOOP / adding it here that I'm missing. Factories seem to come into play often for stateful vs stateless impls, but we don't have an alternative real implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is just to put a boundary on the responsibilities of the two classes, the BalancedShardsAllocator doesn't care about the iteration order of the shards - as long as the iterator contains all the shards this logic will work.

Similarly to how the BalancedShardsAllocator doesn't care what the individual deciders do, it just knows about YES/NO/THROTTLE/NOT_PREFERRED.

In my opinion the interface delineates responsibilities, and allows the reader to not concern themselves with the implementation details of the iteration order when grok-ing the BalancedShardsAllocator. It also frees us up to bake in all kinds of knowledge about the configured deciders into the our implementation without that knowledge leaking into the BalancedShardsAllocator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation could equally be

allocation -> allocation.routingNodes().nodeInterleavedShardIterator()

)
);
allocators.put(
DESIRED_BALANCE_ALLOCATOR,
() -> new DesiredBalanceShardsAllocator(
clusterSettings,
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory),
new BalancedShardsAllocator(
balancerSettings,
writeLoadForecaster,
balancingWeightsFactory,
nonPreferredShardIteratorFactory
),
threadPool,
clusterService,
reconciler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private final BalancerSettings balancerSettings;
private final WriteLoadForecaster writeLoadForecaster;
private final BalancingWeightsFactory balancingWeightsFactory;
private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory;

public BalancedShardsAllocator() {
this(Settings.EMPTY);
Expand All @@ -124,18 +125,25 @@ public BalancedShardsAllocator(Settings settings) {
}

public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster) {
this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings));
this(
balancerSettings,
writeLoadForecaster,
new GlobalBalancingWeightsFactory(balancerSettings),
NonPreferredShardIteratorFactory.NOOP
);
}

@Inject
public BalancedShardsAllocator(
BalancerSettings balancerSettings,
WriteLoadForecaster writeLoadForecaster,
BalancingWeightsFactory balancingWeightsFactory
BalancingWeightsFactory balancingWeightsFactory,
NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory
) {
this.balancerSettings = balancerSettings;
this.writeLoadForecaster = writeLoadForecaster;
this.balancingWeightsFactory = balancingWeightsFactory;
this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory;
}

@Override
Expand All @@ -152,9 +160,16 @@ public void allocate(RoutingAllocation allocation) {
return;
}
final BalancingWeights balancingWeights = balancingWeightsFactory.create();
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights);
final Balancer balancer = new Balancer(
writeLoadForecaster,
allocation,
balancerSettings.getThreshold(),
balancingWeights,
nonPreferredShardIteratorFactory
);
balancer.allocateUnassigned();
balancer.moveShards();
balancer.moveNonPreferred();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts on this ordering? I figured we'd need to run the new logic before moveShards, since moveShards could trigger the simulator to consider the hot-spot addressed, before we check in moveNonPreferred.

Copy link
Contributor Author

@nicktindall nicktindall Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on whether you think fixing hot-spots or shutting down nodes is more important. The naming would suggest it's more important to move canRemain=NO than canRemain=NOT_PREFERRED shards, therefore moveShards should get first priority at movement, but like you say because moveNotPreferred prioritises movements, we may make sub-optimal moves in the event of an intersection between NO and NOT_PREFERRED. If there is an intersection, that would (most likely) suggest currently that there is a shutting down node that is also hot-spotting. In which case we have to evacuate all the shards either way.

I don't have strong preference here because I hope it's rare enough to not matter, I'm inclined to follow the naming and prioritise moving NOs, perhaps we need to apply our prioritisation there too?

Copy link
Contributor

@DiannaHohensee DiannaHohensee Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this currently breaks the shard moves simulator, can we run moveNonPreferred before moveShards? Otherwise we could have eliminated the hot-spot in the simulator, with moveShard shard relocations, by the time the code gets here.

I don't think the ordering will have a significant impact on shard movement. Especially since, to fix a hot-spot, we're moving a single shard per node per 30 second stats refresh cycle: I'd expect it to be irrelevant noise compared to the number of shards moved away from a shutting down node. This is also the allocator, the decisions we make here don't affect the order of shard movement. You'd have to do something with the Reconciler if you wanted to affect shard movement order.

balancer.balance();

// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
Expand Down Expand Up @@ -188,7 +203,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
writeLoadForecaster,
allocation,
balancerSettings.getThreshold(),
balancingWeightsFactory.create()
balancingWeightsFactory.create(),
nonPreferredShardIteratorFactory
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -248,12 +264,14 @@ public static class Balancer {
private final Map<String, ModelNode> nodes;
private final BalancingWeights balancingWeights;
private final NodeSorters nodeSorters;
private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory;

private Balancer(
WriteLoadForecaster writeLoadForecaster,
RoutingAllocation allocation,
float threshold,
BalancingWeights balancingWeights
BalancingWeights balancingWeights,
NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory
) {
this.writeLoadForecaster = writeLoadForecaster;
this.allocation = allocation;
Expand All @@ -266,6 +284,7 @@ private Balancer(
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this);
this.balancingWeights = balancingWeights;
this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory;
}

private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) {
Expand Down Expand Up @@ -711,6 +730,94 @@ protected int comparePivot(int j) {
return indices;
}

/**
* Move started shards that are in non-preferred allocations
*/
public void moveNonPreferred() {
boolean movedAShard;
do {
// Any time we move a shard, we need to update the cluster info and ask again for the non-preferred shards
// as they may have changed
movedAShard = false;
for (Iterator<ShardRouting> nonPreferredShards = nonPreferredShardIteratorFactory.createNonPreferredShardIterator(
allocation
); nonPreferredShards.hasNext();) {
if (tryMoveShardIfNonPreferred(nonPreferredShards.next())) {
movedAShard = true;
break;
}
}
// TODO: Update cluster info
} while (movedAShard);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe return Iterable, then whole thing would be:

for (var shard : nonPreferredIterable(allocation)) {
  if (tryMoveShardIfNonPreferred(shard) {
    return;
  }
}

Copy link
Contributor Author

@nicktindall nicktindall Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here is what the ultimate solution should look like, where we are able to do multiple moves in a single allocate call. We stop iterating when no moves are made, but each time we make a move we refresh the shard iterator, because we may have resolved a hot-spot, which could change the order or contents of the list.

For example, if there are two hot-spotted nodes (N and M), the first time we call for the iterator it will be:

N1, N2, N3, M1, M2, M3, ...

then if we successfully move N2 and it resolve the hot-spot we'll ask again and get

M1, M2, M3, ...

Hence the nested loop, but true, the inner loop could be tidier with an Iterable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to Iterable in cebf3a9

}

private boolean tryMoveShardIfNonPreferred(ShardRouting shardRouting) {
ProjectIndex index = projectIndex(shardRouting);
final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting);
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(index, shardRouting);
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
"non-preferred",
allocation.changes()
);
final ShardRouting shard = relocatingShards.v2();
targetNode.addShard(projectIndex(shard), shard);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
return true;
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
return false;
}

/**
* Makes a decision on whether to move a started shard to another node. The following rules apply
* to the {@link MoveDecision} return object:
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
* 2. If the shard's current allocation is preferred ({@link Decision.Type#YES}), no attempt will be made to move the shard and
* {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null.
* 3. If the shard is not allowed ({@link Decision.Type#NO}), or not preferred ({@link Decision.Type#NOT_PREFERRED}) to remain
* on its current node, then {@link MoveDecision#getAllocationDecision()} will be populated with the decision of moving to
* another node. If {@link MoveDecision#forceMove()} returns {@code true}, then {@link MoveDecision#getTargetNode} will return
* a non-null value representing a node that returned {@link Decision.Type#YES} from canAllocate, otherwise the assignedNodeId
* will be null.
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
* {@link MoveDecision#getNodeDecisions} will have a non-null value.
*/
public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final ShardRouting shardRouting) {
NodeSorter sorter = nodeSorters.sorterForShard(shardRouting);
index.assertMatch(shardRouting);

if (shardRouting.started() == false) {
// we can only move started shards
return MoveDecision.NOT_TAKEN;
}

final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(index, shardRouting);
RoutingNode routingNode = sourceNode.getRoutingNode();
Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if ((canRemain.type() == Type.NOT_PREFERRED || canRemain.type() == Type.NO) == false) {
return MoveDecision.remain(canRemain);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will consider NO and NOT_PREFERRED here, because it may be that a NO is really a NOT_PREFERRED that's also a NO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow here. I'd appreciate if you could help me understand it better. Do you mean the decision could be an overall NO because some other decider may say NO while the writeLoad decider says NOT_PREFERRED? Since we run moveShards first, do we still need to consider NO here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Allocation#withDeciders we return the "most negative" decision, which when one decider says NOT_PREFERRED and another says NO will be NO. Because we're iterating in the order of most-desirable-to-move first, if we see either of these values returned it makes sense to assume there was a NOT_PREFERRED in there and make the move anyway. The alternative would be to assume there was no NOT_PREFERRED when there is a NO and potentially moving a less-preferred shard.

This will come into play now, as @DiannaHohensee and I discussed this morning it's probably better to run moveNotPreferred first because otherwise we risk moving a sub-optimal shard when NO and NOT_PREFERRED intersect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on that argument about moving non-preferred first. I would naively think we want to ensure we move all hard-rules first - to vacate nodes - and then move the non-preferred after.

I think that also avoids this slightly confusing check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll not that moveShards should try to move shards to places where canAllocate says YES over places where it says NOT_PREFERRED. Which seems to solve the sub-optimal shard movement issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on that argument about moving non-preferred first.

The ShardMovementWriteLoadSimulator will simulate the end of a hot-spot as soon as a single shard leaves the node that is hot-spotting. So if moveShards runs first, it could eliminate the hot-spot before we reach moveNonPreferred and have the opportunity to select a sensible shard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would naively think we want to ensure we move all hard-rules first - to vacate nodes - and then move the non-preferred after.

The Balanced/DesiredBalanceShardsAllocators do not pick order of shard movement. The Reconciler does that -- the allocator and reconciler happen to have the same order, but I think the priority for the allocator is to make the best choices, not consider shard movement priority. The reconciler behavior is actually in my balancer changes patch.

The exception is allocateUnassigned for primaries, for which there's an early exit from the allocators to publish the DesiredBalance ASAP.


sorter.reset(index);
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocatePreferredOnly);
}

/**
* Move started shards that can not be allocated to a node anymore
*
Expand Down Expand Up @@ -839,6 +946,15 @@ private MoveDecision decideMove(
);
}

private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) {
Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation);
// not-preferred means no here
if (decision.type() == Type.NOT_PREFERRED) {
return Decision.NO;
}
return decision;
}

private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
// don't use canRebalance as we want hard filtering rules to apply. See #17698
return allocation.deciders().canAllocate(shardRouting, target, allocation);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

/**
* Non-preferred shard iterator factory that returns the most desirable shards from most-hot-spotted
* nodes first.
* Does not return nodes for which we have no write-pool utilization, or shards for which we have no
* write-load data.
*/
public class DefaultNonPreferredShardIteratorFactory implements NonPreferredShardIteratorFactory {

private final WriteLoadConstraintSettings writeLoadConstraintSettings;

public DefaultNonPreferredShardIteratorFactory(WriteLoadConstraintSettings writeLoadConstraintSettings) {
this.writeLoadConstraintSettings = writeLoadConstraintSettings;
}

@Override
public Iterator<ShardRouting> createNonPreferredShardIterator(RoutingAllocation allocation) {
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) {
return Collections.emptyIterator();
}
final Set<NodeShardIterable> hotSpottedNodes = new TreeSet<>(Comparator.reverseOrder());
final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
for (RoutingNode node : allocation.routingNodes()) {
var nodeUsageStats = nodeUsageStatsForThreadPools.get(node.nodeId());
if (nodeUsageStats != null) {
final var writeThreadPoolStats = nodeUsageStats.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
assert writeThreadPoolStats != null;
hotSpottedNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis()));
}
}
return new NodeShardIterator(hotSpottedNodes.iterator());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hotSpottedNodes seems all nodes with stats available, not necessarily hot spot
missing maxQueueLatency threshold check?

Copy link
Contributor Author

@nicktindall nicktindall Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is intentional and perhaps more of a naming issue, ideally I think this iterator factory just produces the iterator and doesn't do any filtering at all (that's the job of the deciders). It just returns shards in an order where the most desirable to move are presented first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realise I am doing some filtering by excluding nodes with no utilisation and shards with no write load, but that goes to my earlier comment about it being over-fitted to the write load use case. We can change that if things change, but currently there's no sense in investigating those shards.

}

private static class NodeShardIterator implements Iterator<ShardRouting> {

private final Iterator<NodeShardIterable> iterator;
private Iterator<ShardRouting> currentShardIterator;

private NodeShardIterator(Iterator<NodeShardIterable> iterator) {
this.iterator = iterator;
}

@Override
public boolean hasNext() {
if (currentShardIterator == null || currentShardIterator.hasNext() == false) {
if (iterator.hasNext()) {
currentShardIterator = iterator.next().iterator();
} else {
return false;
}
}
return currentShardIterator.hasNext();
}

@Override
public ShardRouting next() {
if (currentShardIterator == null) {
currentShardIterator = iterator.next().iterator();
}
return currentShardIterator.next();
}
}

private static class NodeShardIterable implements Iterable<ShardRouting>, Comparable<NodeShardIterable> {

private final RoutingAllocation allocation;
private final RoutingNode routingNode;
private final long maxQueueLatencyMillis;

private NodeShardIterable(RoutingAllocation allocation, RoutingNode routingNode, long maxQueueLatencyMillis) {
this.allocation = allocation;
this.routingNode = routingNode;
this.maxQueueLatencyMillis = maxQueueLatencyMillis;
}

@Override
public Iterator<ShardRouting> iterator() {
return createShardIterator();
}

@Override
public int compareTo(NodeShardIterable o) {
return Long.compare(maxQueueLatencyMillis, o.maxQueueLatencyMillis);
}

private Iterator<ShardRouting> createShardIterator() {
final var shardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you’re creating a list of shards across all nodes. I wonder if instead, we could first collect a list of nodes that are hot spotting, then create separate lists of shards (with their write loads, skip any shards with 0 load) for each hot spotting node from the allocation.clusterInfo().getShardWriteLoads(), and finally sort and iterate each shard list in the order we prefer, checking whether we can move each shard until we find one that’s movable for each node. Still need an iterator to sort and manage a list of shards, but it might be simpler just iterating at that level? Then the nodes don't need iterators.

We only want to move one shard per node. Not obvious to me how to easily achieve that when iterating all shards at once.

Copy link
Contributor Author

@nicktindall nicktindall Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach here is it's an iterator that returns the shards we'd like to move next, in order of preference. Once we move a shard we ask again for this list. We have to do this because every time we move a shard it can change the list of shards we want to move (e.g. if a shard movement resolves a hot-spot, the shards from that node might appear further down the list in the subsequent iterator, and a lesser-hot-spotted node might appear at the front of it instead).

I tried to not do any filtering here, because it's supposed to be the prioritisation logic, where the deciders themselves decide whether we canRemain (it would seem to be duplicating logic to do it also here).

If we go through one of these iterators and don't find any shard we want to move, we break out of the loop and continue to balancing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to do this because every time we move a shard it can change the list of shards we want to move

But moving a single shard resolves the hot spot. Even if we move one shard off of a NodeA, the priority order for further shards to move away from NodeA shouldn't be dynamic 🤔

if a shard movement resolves a hot-spot, the shards from that node might appear further down the list in the subsequent iterator, and a lesser-hot-spotted node might appear at the front of it instead

IIUC, you're trying to fairly spread node hot-spot resolution? Like pick a shard for NodeA, then pick a shard for NodeB, before coming back to NodeA. I don't think that matters for the allocator, which comes up with the final allocation, not the plan for which shards to move first. NodeA is hot-spotting, and we can focus on NodeA's shards to resolve the hot spot, before moving on the NodeB's shards. We wouldn't be assigning any of NodeA or NodeB's shards to NodeA or NodeB because they are hot / not-preferred, so there's no interaction there, and no need for evenness / fairness in selection order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, you're trying to fairly spread node hot-spot resolution?

No, as discussed on zoom the iterator represents our preference for the next move. e.g. if there are three nodes (M, N, O) with queue latencies (100, 50, 0) the shards will be iterated in the order

M1, M2, M3, M4, N1, N2, N3, O1, O2

where Mx denotes the shard on node M that is the xth most desirable to move.

So we'll iterate through that list finding the first of those shards that can move somewhere, then execute the move, then we'll ask for that list again in the next iteration.

Say we moved a shard from M to O and now our latencies for (M, N, O) are (0, 50, 0), the next iterator will look like

N1, N2, N3, M1, M2, O1, O2, O3

because N is the most likely to be hot-spotted, so it goes to the front of the list

Then we move a shard off of N and the new latencies change to (M, N, O) = (0, 0, 0)

Then the iterator would look something like (although M, N, O could be in any order because they're all equal):

N1, N2, M1, M2, O1, O2, O3, O4

Which we'd iterate through and find no shard with canRemain = NOT_PREFERRED so we'd make no movements and move on to the next phase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ties in with my prior comment about actually calling canRemain first.

final List<ShardRouting> sortedRoutings = new ArrayList<>();
double totalWriteLoad = 0;
for (ShardRouting shard : routingNode) {
Double shardWriteLoad = shardWriteLoads.get(shard.shardId());
if (shardWriteLoad != null) {
sortedRoutings.add(shard);
totalWriteLoad += shardWriteLoad;
}
}
// TODO: Work out what this order should be
// Sort by distance-from-mean-write-load
double meanWriteLoad = totalWriteLoad / sortedRoutings.size();
sortedRoutings.sort(Comparator.comparing(sr -> Math.abs(shardWriteLoads.get(sr.shardId()) - meanWriteLoad)));
return sortedRoutings.iterator();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I recall correctly, Henning mentioned picking a shard somewhere in the middle. I think we dont need sort (strong order) but a set of average shards. For example create two partitions - preferable and not. Everything that 0.5-0.8 of maxShardLoad goes to preferable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we'd like to do that and I find that slightly harder with returning a list (though possibly doable).

Copy link
Contributor

@mhl-b mhl-b Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this, an iterator that returns average shards first, if nothing worked then heavy shards(>0.8), then light (<0.5). In worst case traverse shards 4 times: find max load, then any average, then any heavy, then light.

    private Stream<ShardRouting> shardsStream(){
        return StreamSupport.stream(routingNode.spliterator(),false);
    }
...
        var maxLoad = shardsStream().mapToDouble(ShardRouting::load).max().orElse(1.0);
        var avg = shardsStream().filter(s -> s.load() / maxLoad >= 0.5 && s.load() / maxLoad <= 0.8);
        var heavy = shardsStream().filter(s -> s.load() / maxLoad > 0.8);
        var light = shardsStream().filter(s -> s.load() / maxLoad < 0.5);
        return concat(concat(avg, heavy), light).iterator(); //Stream.concat()

PS there is no ShardRouting::load, used here for brevity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attempted in de052a3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:'-) ohboi. I would still opt-in for lazy sequence, rather than sorted list. I believe expected case is to have some average shards to move, hence allocating and sorting list seems redundant, a single pass with filter should suffice. Especially in context of 10k shards node, there is high probability of having a good average shard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point, made lazier in d59ea2b

Also didn't bother sorting inside the low/medium/high ranges but can easily add that if we think its worth it for the determinism.

}
}
Loading