Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
053edfa
New new approach
nicktindall Sep 19, 2025
e38191a
Add logging when moveDecision is stale
nicktindall Sep 19, 2025
909fd6d
assert we're not abusing the comparator
nicktindall Sep 19, 2025
2696fd9
Change comparator ranking, test
nicktindall Sep 22, 2025
10c042d
[CI] Update transport version definitions
Sep 22, 2025
f03e4b8
Use constant for missing write load
nicktindall Sep 22, 2025
be6253c
Merge remote-tracking branch 'origin/main' into ES-12739_select_hot_s…
nicktindall Sep 22, 2025
9112b1d
Distinguish between move and move-not-preferred movements
nicktindall Sep 22, 2025
49d11e3
Update javadoc
nicktindall Sep 22, 2025
7e23a15
Tweak test params
nicktindall Sep 22, 2025
fcde6c6
Simplify logic
nicktindall Sep 22, 2025
ceff1e3
Simplify logic
nicktindall Sep 22, 2025
6dcb6f0
Simplify logic
nicktindall Sep 22, 2025
c03bcde
Simplify logic
nicktindall Sep 22, 2025
fdca404
Tidy
nicktindall Sep 22, 2025
36cff36
Simplify logic
nicktindall Sep 22, 2025
98b5b80
Tidy
nicktindall Sep 22, 2025
df79e6f
Pedantry
nicktindall Sep 22, 2025
b2d67d7
Add test for NOT_PREFERRED movement
nicktindall Sep 22, 2025
3c5f592
Pedantry
nicktindall Sep 22, 2025
ae504cd
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 22, 2025
f204577
Expand test to cover prioritisation
nicktindall Sep 22, 2025
3c65dc8
Pedantry
nicktindall Sep 22, 2025
3fd27b5
Pedantry
nicktindall Sep 22, 2025
063995c
Reduce logging
nicktindall Sep 22, 2025
3815293
Naming
nicktindall Sep 22, 2025
34a88df
Constant
nicktindall Sep 22, 2025
cb8ac9a
Fix javadoc
nicktindall Sep 22, 2025
fa2b936
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 22, 2025
1c4dfaf
Used cached MoveDecision if no other moves have been made
nicktindall Sep 23, 2025
738875d
Abstract best move tracking and comparison out of decideMove
nicktindall Sep 29, 2025
f1061e2
Fix comment
nicktindall Sep 29, 2025
7b42376
Move MostDesirableMovementsTracker and ShardMovementPriorityComparato…
nicktindall Sep 29, 2025
baf802d
Put explicit check in for rhsMissing and lhsMissing
nicktindall Sep 29, 2025
104be72
Move shardMoved update below early exit
nicktindall Sep 29, 2025
1302b46
[CI] Update transport version definitions
Sep 29, 2025
0055dc7
Clarify natural ordering in comparator
nicktindall Sep 29, 2025
a031158
Escape > and <
nicktindall Sep 29, 2025
b3d85fc
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 29, 2025
ede9e44
Move MoveNotPreferredDecision local to where it's used
nicktindall Sep 29, 2025
2c87b39
Minimise change after merge
nicktindall Sep 29, 2025
7ef836d
Fix HTML, reference relevant fields
nicktindall Sep 30, 2025
4c6ebdb
Document THRESHOLD_RATIO
nicktindall Sep 30, 2025
a53787c
More specific naming on comparator, fix javadoc paragraph break
nicktindall Sep 30, 2025
7d7d23c
Make comparator put most-preferred first
nicktindall Oct 1, 2025
2199491
Use LinkedHashMap for predictable iteration order
nicktindall Oct 1, 2025
ee95443
Move tracker initialization above loop comment
nicktindall Oct 1, 2025
3ee55e7
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 2, 2025
8673351
Make it clearer what the predicate is when looking for the best non-p…
nicktindall Oct 2, 2025
1b87daa
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Oct 2, 2025
57979dc
Improve naming
nicktindall Oct 2, 2025
3fef076
Improve naming (again)
nicktindall Oct 2, 2025
2ea2ea3
Remove decideCanAllocatePreferredOnly
nicktindall Oct 2, 2025
105d7bf
Remove redundant check
nicktindall Oct 2, 2025
50d266f
Merge remote-tracking branch 'origin/main' into ES-12739_select_hot_s…
nicktindall Oct 2, 2025
7837211
Improve javadoc
nicktindall Oct 2, 2025
cc147c9
Improve javadoc
nicktindall Oct 5, 2025
35dd5d4
Improve javadoc (again)
nicktindall Oct 5, 2025
c32d6e9
Improve javadoc
nicktindall Oct 6, 2025
291416a
Use constant for threshold
nicktindall Oct 6, 2025
0bfcf6b
Document test utility
nicktindall Oct 6, 2025
7668101
Explain single shard indices
nicktindall Oct 6, 2025
f3588e2
Improve naming
nicktindall Oct 6, 2025
7643dd0
Improve naming
nicktindall Oct 6, 2025
e514692
Reference comparator in javadoc
nicktindall Oct 6, 2025
ddc7739
Document test decider
nicktindall Oct 6, 2025
7753a44
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 6, 2025
4170af3
Test movement prioritization in IT
nicktindall Oct 6, 2025
43a2dd3
Update server/src/internalClusterTest/java/org/elasticsearch/cluster/…
nicktindall Oct 6, 2025
29a79e1
Fix Javadoc
nicktindall Oct 6, 2025
e64343f
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -805,37 +806,139 @@ public boolean moveShards() {
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
final Map<String, ShardRouting> bestNonPreferredShardsByNode = new HashMap<>();
final Map<String, ShardMovementPriorityComparator> comparatorCache = new HashMap<>();
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) {
ShardRouting shardRouting = it.next();
ProjectIndex index = projectIndex(shardRouting);
final MoveDecision moveDecision = decideMove(index, shardRouting);
final MoveDecision moveDecision = decideMove(index, shardRouting, bestNonPreferredShardsByNode, comparatorCache);
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(index, shardRouting);
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
"move",
allocation.changes()
);
final ShardRouting shard = relocatingShards.v2();
targetNode.addShard(projectIndex(shard), shard);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
shardMoved = true;
if (completeEarlyOnShardAssignmentChange) {
return true;
// Defer moving of not-preferred until we've moved the NOs
if (moveDecision.getCanRemainDecision().type() == Type.NOT_PREFERRED) {
bestNonPreferredShardsByNode.put(shardRouting.currentNodeId(), shardRouting);
} else {
executeMove(shardRouting, index, moveDecision, "move");
shardMoved = true;
if (completeEarlyOnShardAssignmentChange) {
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this is set up, the MovementsTracker is going to be setup and discarded as many times as there are canRemain:NO shards in the cluster. Potentially a big number, and on average half the shards are iterated each time. The final iteration will go through all the shards without a NO response, allowing MovementsTracker to run. We'll then need to repeat the moveShards() method and this final iteration X times, where X is the number of nodes.

I wonder if setting up the movementsTracker explicitly AFTER all of the NO answers are resolved would be more efficient. We'll guarantee two iterations of all of the shards when the not-preferred section is reached, but we'll avoid discarding the MovementsTracker all those canRemain:NO iterations.

Similarly whether we could do something about repeating the whole setup per node during the not-preferred handling section.

This could be a follow up, if agreed upon.

Copy link
Contributor Author

@nicktindall nicktindall Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it also goes to your comment of having two distinct phases that reuse common traversal logic. I have an idea what this might look like but will hold off doing that in this PR, to unblock end-to-end testing.

If the benchmark shows significant slowdown we might have to bring it forward, but if the benchmark is OK I'll put up a PR as a follow-up to explore this refactor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be ok with that as a follow-up, assuming we still do the No decisions first and keep doing canRemain to find which shards are on non-preferred nodes.

}
}
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
}

// If we get here, attempt to move one of the best not-preferred shards that we identified earlier
for (var entry : bestNonPreferredShardsByNode.entrySet()) {
final var shardRouting = entry.getValue();
final var index = projectIndex(shardRouting);
// Have to re-check move decision in case we made a move that invalidated our existing assessment
final MoveDecision moveDecision = decideMove(index, shardRouting);
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
executeMove(entry.getValue(), index, moveDecision, "move-non-preferred");
// We only ever move a single non-preferred shard at a time
return true;
} else {
logger.trace("[{}][{}] can no longer move (not-preferred)", shardRouting.index(), shardRouting.id());
}
Copy link
Contributor Author

@nicktindall nicktindall Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight risk in this approach that we might end up doing some balancing before NOT_PREFERRED moves.

e.g.

  1. We identify 3 not-preferred shards on a node, we only remember the "best" one
  2. We make some moves for another reason that makes the "best" not-preferred shard immovable, but would have allowed the second or third best shard to move
  3. We don't make any moves here because we only attempt to move the "best" shard

But I think it's an edge case and we can probably just wait for the next allocate call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is only a problem outside simulation? Since otherwise we exit early anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah purely an issue for raw BalancedShardsAllocator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some logic to cache the MoveDecision and use the cached decision when shardsMoved is false in 1c4dfaf

But I wonder if we should only ever move non preferred shards when shardsMoved is false? it would be effectively no change for simulation but it might make things a bit safer for non-simulation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you have now makes more sense to me, it seems desirable to move a shard still when not simulating. I am ok with the slight imprecision there - it is not our preferred mode of operation.

}

return shardMoved;
}

private void executeMove(ShardRouting shardRouting, ProjectIndex index, MoveDecision moveDecision, String reason) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(index, shardRouting);
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
reason,
allocation.changes()
);
final ShardRouting shard = relocatingShards.v2();
targetNode.addShard(projectIndex(shard), shard);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
}

/**
* Sorts shards by desirability to move, ranking goes (in descending priority order)
* <ol>
* <li>Shards with write-load in {threshold} -> maximum write-load (exclusive)</li>
* <li>Shards with write-load in {threshold} -> 0</li>
* <li>Shards with maximum write-load</li>
* <li>Shards with missing write-load</li>
* </ol>
*/
// Visible for testing
static class ShardMovementPriorityComparator implements Comparator<ShardRouting> {

private static final double MISSING_WRITE_LOAD = -1;
private final Map<ShardId, Double> shardWriteLoads;
private final double maxWriteLoadOnNode;
private final double threshold;
private final String nodeId;

ShardMovementPriorityComparator(RoutingAllocation allocation, RoutingNode routingNode) {
shardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
double maxWriteLoadOnNode = MISSING_WRITE_LOAD;
for (ShardRouting shardRouting : routingNode) {
maxWriteLoadOnNode = Math.max(
maxWriteLoadOnNode,
shardWriteLoads.getOrDefault(shardRouting.shardId(), MISSING_WRITE_LOAD)
);
}
this.maxWriteLoadOnNode = maxWriteLoadOnNode;
threshold = maxWriteLoadOnNode * 0.5;
nodeId = routingNode.nodeId();
}

@Override
public int compare(ShardRouting lhs, ShardRouting rhs) {
assert Objects.equals(nodeId, lhs.currentNodeId()) && Objects.equals(nodeId, rhs.currentNodeId())
: this.getClass().getSimpleName()
+ " is node-specific. Comparator node ID="
+ nodeId
+ ", lhs="
+ lhs.currentNodeId()
+ ", rhs="
+ rhs.currentNodeId();
// If we have no shard write-load data, shortcut
if (maxWriteLoadOnNode == MISSING_WRITE_LOAD) {
return 0;
}
// Otherwise, we prefer middle, high, then low write-load shards
double lhsWriteLoad = shardWriteLoads.getOrDefault(lhs.shardId(), -1.0);
double rhsWriteLoad = shardWriteLoads.getOrDefault(rhs.shardId(), -1.0);

if (lhsWriteLoad < maxWriteLoadOnNode && rhsWriteLoad < maxWriteLoadOnNode) {
if (lhsWriteLoad >= threshold && rhsWriteLoad >= threshold) {
// Both values between threshold and maximum, prefer lowest
return (int) Math.signum(rhsWriteLoad - lhsWriteLoad);
} else if (lhsWriteLoad >= threshold && rhsWriteLoad < threshold) {
// lhs between threshold and maximum, rhs below threshold, prefer lhs
return 1;
} else if (lhsWriteLoad < threshold && rhsWriteLoad >= threshold) {
// lhs below threshold, rhs between threshold and maximum, prefer rhs
return -1;
} else {
// Both values below the threshold, prefer highest
return (int) Math.signum(lhsWriteLoad - rhsWriteLoad);
}
} else {
// one of the shards is the max-write-load shard, prefer any present write load over it
if (shardWriteLoads.containsKey(rhs.shardId()) ^ shardWriteLoads.containsKey(lhs.shardId())) {
return shardWriteLoads.containsKey(rhs.shardId()) ? -1 : 1;
} else {
return lhsWriteLoad == rhsWriteLoad ? 0 : (int) Math.signum(rhsWriteLoad - lhsWriteLoad);
}
}
}
}

/**
* Makes a decision on whether to move a started shard to another node. The following rules apply
* to the {@link MoveDecision} return object:
Expand All @@ -848,7 +951,17 @@ public boolean moveShards() {
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
* {@link MoveDecision#getNodeDecisions} will have a non-null value.
*/
public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) {
public MoveDecision decideMove(ProjectIndex index, ShardRouting shardRouting) {
// This is when we're not calling in a loop, so we don't need to keep track of "best non-preferred" shards
return decideMove(index, shardRouting, Map.of(), Map.of());
}

private MoveDecision decideMove(
ProjectIndex index,
ShardRouting shardRouting,
Map<String, ShardRouting> bestNonPreferredShardsByNode,
Map<String, ShardMovementPriorityComparator> comparatorCache
) {
NodeSorter sorter = nodeSorters.sorterForShard(shardRouting);
index.assertMatch(shardRouting);

Expand All @@ -861,19 +974,36 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar
assert sourceNode != null && sourceNode.containsShard(index, shardRouting);
RoutingNode routingNode = sourceNode.getRoutingNode();
Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (canRemain.type() != Decision.Type.NO) {
if (canRemain.type() != Decision.Type.NO && canRemain.type() != Type.NOT_PREFERRED) {
return MoveDecision.remain(canRemain);
}

// Investigate whether it's potentially a better move to make than the one we've discovered already
if (canRemain.type() == Type.NOT_PREFERRED && bestNonPreferredShardsByNode.containsKey(shardRouting.currentNodeId())) {
int compare = comparatorCache.computeIfAbsent(
shardRouting.currentNodeId(),
nodeId -> new ShardMovementPriorityComparator(allocation, allocation.routingNodes().node(nodeId))
Copy link
Contributor Author

@nicktindall nicktindall Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By caching these, we are sorting all shards by the values calculated when the comparator was first created. It's likely that the threshold and maximum will move as shards are moved, but given that we don't move too much in each iteration, I think it's fine to accept the potential for slight differences.

).compare(shardRouting, bestNonPreferredShardsByNode.get(shardRouting.currentNodeId()));
if (compare <= 0) {
// Ignore inferior non-preferred moves
return MoveDecision.NOT_TAKEN;
}
}

sorter.reset(index);
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate);
if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) {
BiFunction<ShardRouting, RoutingNode, Decision> decider = canRemain.type() == Type.NOT_PREFERRED
? this::decideCanAllocatePreferredOnly
: this::decideCanAllocate;
MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, decider);
if (moveDecision.getCanRemainDecision().type() == Type.NO
&& moveDecision.canRemain() == false
&& moveDecision.forceMove() == false) {
final boolean shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE);
if (shardsOnReplacedNode) {
return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate);
Expand Down Expand Up @@ -924,6 +1054,15 @@ private MoveDecision decideMove(
);
}

private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) {
Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation);
// not-preferred means no here
if (decision.type() == Type.NOT_PREFERRED) {
return Decision.NO;
}
return decision;
}

private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
// don't use canRebalance as we want hard filtering rules to apply. See #17698
return allocation.deciders().canAllocate(shardRouting, target, allocation);
Expand Down
Loading