Skip to content
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
053edfa
New new approach
nicktindall Sep 19, 2025
e38191a
Add logging when moveDecision is stale
nicktindall Sep 19, 2025
909fd6d
assert we're not abusing the comparator
nicktindall Sep 19, 2025
2696fd9
Change comparator ranking, test
nicktindall Sep 22, 2025
10c042d
[CI] Update transport version definitions
Sep 22, 2025
f03e4b8
Use constant for missing write load
nicktindall Sep 22, 2025
be6253c
Merge remote-tracking branch 'origin/main' into ES-12739_select_hot_s…
nicktindall Sep 22, 2025
9112b1d
Distinguish between move and move-not-preferred movements
nicktindall Sep 22, 2025
49d11e3
Update javadoc
nicktindall Sep 22, 2025
7e23a15
Tweak test params
nicktindall Sep 22, 2025
fcde6c6
Simplify logic
nicktindall Sep 22, 2025
ceff1e3
Simplify logic
nicktindall Sep 22, 2025
6dcb6f0
Simplify logic
nicktindall Sep 22, 2025
c03bcde
Simplify logic
nicktindall Sep 22, 2025
fdca404
Tidy
nicktindall Sep 22, 2025
36cff36
Simplify logic
nicktindall Sep 22, 2025
98b5b80
Tidy
nicktindall Sep 22, 2025
df79e6f
Pedantry
nicktindall Sep 22, 2025
b2d67d7
Add test for NOT_PREFERRED movement
nicktindall Sep 22, 2025
3c5f592
Pedantry
nicktindall Sep 22, 2025
ae504cd
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 22, 2025
f204577
Expand test to cover prioritisation
nicktindall Sep 22, 2025
3c65dc8
Pedantry
nicktindall Sep 22, 2025
3fd27b5
Pedantry
nicktindall Sep 22, 2025
063995c
Reduce logging
nicktindall Sep 22, 2025
3815293
Naming
nicktindall Sep 22, 2025
34a88df
Constant
nicktindall Sep 22, 2025
cb8ac9a
Fix javadoc
nicktindall Sep 22, 2025
fa2b936
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 22, 2025
1c4dfaf
Used cached MoveDecision if no other moves have been made
nicktindall Sep 23, 2025
738875d
Abstract best move tracking and comparison out of decideMove
nicktindall Sep 29, 2025
f1061e2
Fix comment
nicktindall Sep 29, 2025
7b42376
Move MostDesirableMovementsTracker and ShardMovementPriorityComparato…
nicktindall Sep 29, 2025
baf802d
Put explicit check in for rhsMissing and lhsMissing
nicktindall Sep 29, 2025
104be72
Move shardMoved update below early exit
nicktindall Sep 29, 2025
1302b46
[CI] Update transport version definitions
Sep 29, 2025
0055dc7
Clarify natural ordering in comparator
nicktindall Sep 29, 2025
a031158
Escape > and <
nicktindall Sep 29, 2025
b3d85fc
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Sep 29, 2025
ede9e44
Move MoveNotPreferredDecision local to where it's used
nicktindall Sep 29, 2025
2c87b39
Minimise change after merge
nicktindall Sep 29, 2025
7ef836d
Fix HTML, reference relevant fields
nicktindall Sep 30, 2025
4c6ebdb
Document THRESHOLD_RATIO
nicktindall Sep 30, 2025
a53787c
More specific naming on comparator, fix javadoc paragraph break
nicktindall Sep 30, 2025
7d7d23c
Make comparator put most-preferred first
nicktindall Oct 1, 2025
2199491
Use LinkedHashMap for predictable iteration order
nicktindall Oct 1, 2025
ee95443
Move tracker initialization above loop comment
nicktindall Oct 1, 2025
3ee55e7
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 2, 2025
8673351
Make it clearer what the predicate is when looking for the best non-p…
nicktindall Oct 2, 2025
1b87daa
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Oct 2, 2025
57979dc
Improve naming
nicktindall Oct 2, 2025
3fef076
Improve naming (again)
nicktindall Oct 2, 2025
2ea2ea3
Remove decideCanAllocatePreferredOnly
nicktindall Oct 2, 2025
105d7bf
Remove redundant check
nicktindall Oct 2, 2025
50d266f
Merge remote-tracking branch 'origin/main' into ES-12739_select_hot_s…
nicktindall Oct 2, 2025
7837211
Improve javadoc
nicktindall Oct 2, 2025
cc147c9
Improve javadoc
nicktindall Oct 5, 2025
35dd5d4
Improve javadoc (again)
nicktindall Oct 5, 2025
c32d6e9
Improve javadoc
nicktindall Oct 6, 2025
291416a
Use constant for threshold
nicktindall Oct 6, 2025
0bfcf6b
Document test utility
nicktindall Oct 6, 2025
7668101
Explain single shard indices
nicktindall Oct 6, 2025
f3588e2
Improve naming
nicktindall Oct 6, 2025
7643dd0
Improve naming
nicktindall Oct 6, 2025
e514692
Reference comparator in javadoc
nicktindall Oct 6, 2025
ddc7739
Document test decider
nicktindall Oct 6, 2025
7753a44
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 6, 2025
4170af3
Test movement prioritization in IT
nicktindall Oct 6, 2025
43a2dd3
Update server/src/internalClusterTest/java/org/elasticsearch/cluster/…
nicktindall Oct 6, 2025
29a79e1
Fix Javadoc
nicktindall Oct 6, 2025
e64343f
Merge branch 'main' into ES-12739_select_hot_shard_to_move_off_data_n…
nicktindall Oct 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE;
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
Expand Down Expand Up @@ -794,7 +796,7 @@ protected int comparePivot(int j) {
}

/**
* Move started shards that can not be allocated to a node anymore
* Move started shards that cannot be allocated to a node anymore, or are in a non-preferred allocation
*
* For each shard to be moved this function executes a move operation
* to the minimal eligible node with respect to the
Expand All @@ -805,53 +807,105 @@ protected int comparePivot(int j) {
*/
public boolean moveShards() {
boolean shardMoved = false;
final BestShardMovementsTracker bestNonPreferredShardMovementsTracker = new BestShardMovementsTracker();
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) {
ShardRouting shardRouting = it.next();
ProjectIndex index = projectIndex(shardRouting);
final MoveDecision moveDecision = decideMove(index, shardRouting);
final ShardRouting shardRouting = it.next();
final ProjectIndex index = projectIndex(shardRouting);
final MoveDecision moveDecision = decideMove(
index,
shardRouting,
bestNonPreferredShardMovementsTracker::shardIsBetterThanCurrent
);
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(index, shardRouting);
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
"move",
allocation.changes()
);
final ShardRouting shard = relocatingShards.v2();
targetNode.addShard(projectIndex(shard), shard);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
shardMoved = true;
if (completeEarlyOnShardAssignmentChange) {
return true;
// Defer moving of not-preferred until we've moved the NOs
if (moveDecision.getCanRemainDecision().type() == Type.NOT_PREFERRED) {
bestNonPreferredShardMovementsTracker.putBestMoveDecision(shardRouting, moveDecision);
} else {
executeMove(shardRouting, index, moveDecision, "move");
if (completeEarlyOnShardAssignmentChange) {
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this is set up, the MovementsTracker is going to be setup and discarded as many times as there are canRemain:NO shards in the cluster. Potentially a big number, and on average half the shards are iterated each time. The final iteration will go through all the shards without a NO response, allowing MovementsTracker to run. We'll then need to repeat the moveShards() method and this final iteration X times, where X is the number of nodes.

I wonder if setting up the movementsTracker explicitly AFTER all of the NO answers are resolved would be more efficient. We'll guarantee two iterations of all of the shards when the not-preferred section is reached, but we'll avoid discarding the MovementsTracker all those canRemain:NO iterations.

Similarly whether we could do something about repeating the whole setup per node during the not-preferred handling section.

This could be a follow up, if agreed upon.

Copy link
Contributor Author

@nicktindall nicktindall Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it also goes to your comment of having two distinct phases that reuse common traversal logic. I have an idea what this might look like but will hold off doing that in this PR, to unblock end-to-end testing.

If the benchmark shows significant slowdown we might have to bring it forward, but if the benchmark is OK I'll put up a PR as a follow-up to explore this refactor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be ok with that as a follow-up, assuming we still do the No decisions first and keep doing canRemain to find which shards are on non-preferred nodes.

}
shardMoved = true;
}
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
}

// If we get here, attempt to move one of the best not-preferred shards that we identified earlier
for (var storedShardMovement : bestNonPreferredShardMovementsTracker.getBestShardMovements()) {
final var shardRouting = storedShardMovement.shardRouting();
final var index = projectIndex(shardRouting);
// If `shardMoved` is true, there may have been moves that have made our previous move decision
// invalid, so we must call `decideMove` again. If not, we know we haven't made any moves, and we
// can use the cached decision.
final var moveDecision = shardMoved ? decideMove(index, shardRouting) : storedShardMovement.moveDecision();
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
executeMove(shardRouting, index, moveDecision, "move-non-preferred");
// Return after a single move so that the change can be simulated before further moves are made.
return true;
Comment on lines +848 to +849
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to check if (completeEarlyOnShardAssignmentChange) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted not to because it's happening after all the NO moves and this is new behaviour, we know that we rely on the simulation being updated for these movements to make sense so I don't think we should ever attempt more than one.

} else {
logger.trace("[{}][{}] can no longer move (not-preferred)", shardRouting.index(), shardRouting.id());
}
Copy link
Contributor Author

@nicktindall nicktindall Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight risk in this approach that we might end up doing some balancing before NOT_PREFERRED moves.

e.g.

  1. We identify 3 not-preferred shards on a node, we only remember the "best" one
  2. We make some moves for another reason that makes the "best" not-preferred shard immovable, but would have allowed the second or third best shard to move
  3. We don't make any moves here because we only attempt to move the "best" shard

But I think it's an edge case and we can probably just wait for the next allocate call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is only a problem outside simulation? Since otherwise we exit early anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah purely an issue for raw BalancedShardsAllocator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some logic to cache the MoveDecision and use the cached decision when shardsMoved is false in 1c4dfaf

But I wonder if we should only ever move non preferred shards when shardsMoved is false? it would be effectively no change for simulation but it might make things a bit safer for non-simulation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you have now makes more sense to me, it seems desirable to move a shard still when not simulating. I am ok with the slight imprecision there - it is not our preferred mode of operation.

}

return shardMoved;
}

private void executeMove(ShardRouting shardRouting, ProjectIndex index, MoveDecision moveDecision, String reason) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(index, shardRouting);
final Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
reason,
allocation.changes()
);
final ShardRouting shard = relocatingShards.v2();
targetNode.addShard(projectIndex(shard), shard);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
}

/**
* Makes a decision on whether to move a started shard to another node.
* <p>
* This overload will always assess move options for non-preferred allocations.
*
* @see #decideMove(ProjectIndex, ShardRouting, Predicate)
* @param index The index that the shard being considered belongs to
* @param shardRouting The shard routing being considered for movement
* @return The {@link MoveDecision} for the shard
*/
public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) {
// Always assess options for non-preferred allocations
return decideMove(index, shardRouting, ignored -> true);
}

/**
* Makes a decision on whether to move a started shard to another node. The following rules apply
* to the {@link MoveDecision} return object:
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
* 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and
* {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null.
* 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be
* populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then
* populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} returns {@code true}, then
* {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null.
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
* {@link MoveDecision#getNodeDecisions} will have a non-null value.
*
* @param index The index that the shard being considered belongs to
* @param shardRouting The shard routing being considered for movement
* @param nonPreferredPredicate A predicate used to determine whether to assess move options for shards in non-preferred allocations
* @return The {@link MoveDecision} for the shard
*/
public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) {
private MoveDecision decideMove(ProjectIndex index, ShardRouting shardRouting, Predicate<ShardRouting> nonPreferredPredicate) {
NodeSorter sorter = nodeSorters.sorterForShard(shardRouting);
index.assertMatch(shardRouting);

Expand All @@ -868,14 +922,19 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar
return MoveDecision.remain(canRemain);
}

// Check predicate to decide whether to assess movement options
if (canRemain.type() == Type.NOT_PREFERRED && nonPreferredPredicate.test(shardRouting) == false) {
return MoveDecision.NOT_TAKEN;
}

sorter.reset(index);
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate);
final MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate);
if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) {
final boolean shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE);
if (shardsOnReplacedNode) {
Expand Down Expand Up @@ -935,6 +994,145 @@ private MoveDecision decideMove(
);
}

/**
* Stores the most desirable shard seen so far and compares proposed shards against it using
* the {@link PrioritiseByShardWriteLoadComparator}.
*/
private class BestShardMovementsTracker {

public record StoredShardMovement(ShardRouting shardRouting, MoveDecision moveDecision) {}

// LinkedHashMap so we iterate in insertion order
private final Map<String, StoredShardMovement> bestShardMovementsByNode = new LinkedHashMap<>();
private final Map<String, PrioritiseByShardWriteLoadComparator> comparatorCache = new HashMap<>();

/**
* Is the provided {@link ShardRouting} potentially a better shard to move than the one
* we currently have stored for this node?
*
* @param shardRouting The shard routing being considered for movement
* @return true if the shard is more desirable to move that the current one we stored for this node, false otherwise.
*/
public boolean shardIsBetterThanCurrent(ShardRouting shardRouting) {
final var currentShardForNode = bestShardMovementsByNode.get(shardRouting.currentNodeId());
if (currentShardForNode == null) {
return true;
}
int comparison = comparatorCache.computeIfAbsent(
shardRouting.currentNodeId(),
nodeId -> new PrioritiseByShardWriteLoadComparator(allocation, allocation.routingNodes().node(nodeId))
).compare(shardRouting, currentShardForNode.shardRouting());
// Ignore inferior non-preferred moves
return comparison < 0;
}

public void putBestMoveDecision(ShardRouting shardRouting, MoveDecision moveDecision) {
bestShardMovementsByNode.put(shardRouting.currentNodeId(), new StoredShardMovement(shardRouting, moveDecision));
}

public Iterable<StoredShardMovement> getBestShardMovements() {
return bestShardMovementsByNode.values();
}
}

/**
* Sorts shards by desirability to move, sort order goes:
* <ol>
* <li>Shards with write-load in <i>{@link PrioritiseByShardWriteLoadComparator#threshold}</i> &rarr;
* {@link PrioritiseByShardWriteLoadComparator#maxWriteLoadOnNode} (exclusive)</li>
* <li>Shards with write-load in <i>{@link PrioritiseByShardWriteLoadComparator#threshold}</i> &rarr; 0</li>
* <li>Shards with write-load == {@link PrioritiseByShardWriteLoadComparator#maxWriteLoadOnNode}</li>
* <li>Shards with missing write-load</li>
* </ol>
*
* e.g., for any two <code>ShardRouting</code>s, <code>r1</code> and <code>r2</code>,
* <ul>
* <li><code>compare(r1, r2) &gt; 0</code> when <code>r2</code> is more desirable to move</li>
* <li><code>compare(r1, r2) == 0</code> when the two shards are equally desirable to move</li>
* <li><code>compare(r1, r2) &lt; 0</code> when <code>r1</code> is more desirable to move</li>
* </ul>
*/
// Visible for testing
static class PrioritiseByShardWriteLoadComparator implements Comparator<ShardRouting> {

/**
* This is the threshold over which we consider shards to have a "high" write load represented
* as a ratio of the maximum write-load present on the node.
* <p>
* We prefer to move shards that have a write-load close to <b>this value</b> x {@link #maxWriteLoadOnNode}.
*/
private static final double THRESHOLD_RATIO = 0.5;
private static final double MISSING_WRITE_LOAD = -1;
private final Map<ShardId, Double> shardWriteLoads;
private final double maxWriteLoadOnNode;
private final double threshold;
private final String nodeId;

PrioritiseByShardWriteLoadComparator(RoutingAllocation allocation, RoutingNode routingNode) {
shardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
double maxWriteLoadOnNode = MISSING_WRITE_LOAD;
for (ShardRouting shardRouting : routingNode) {
maxWriteLoadOnNode = Math.max(
maxWriteLoadOnNode,
shardWriteLoads.getOrDefault(shardRouting.shardId(), MISSING_WRITE_LOAD)
);
}
this.maxWriteLoadOnNode = maxWriteLoadOnNode;
threshold = maxWriteLoadOnNode * THRESHOLD_RATIO;
nodeId = routingNode.nodeId();
}

@Override
public int compare(ShardRouting lhs, ShardRouting rhs) {
assert nodeId.equals(lhs.currentNodeId()) && nodeId.equals(rhs.currentNodeId())
: this.getClass().getSimpleName()
+ " is node-specific. comparator="
+ nodeId
+ ", lhs="
+ lhs.currentNodeId()
+ ", rhs="
+ rhs.currentNodeId();

// If we have no shard write-load data, shortcut
if (maxWriteLoadOnNode == MISSING_WRITE_LOAD) {
return 0;
}

final double lhsWriteLoad = shardWriteLoads.getOrDefault(lhs.shardId(), MISSING_WRITE_LOAD);
final double rhsWriteLoad = shardWriteLoads.getOrDefault(rhs.shardId(), MISSING_WRITE_LOAD);

// prefer any known write-load over any unknown write-load
final var rhsIsMissing = rhsWriteLoad == MISSING_WRITE_LOAD;
final var lhsIsMissing = lhsWriteLoad == MISSING_WRITE_LOAD;
if (rhsIsMissing && lhsIsMissing) {
return 0;
}
if (rhsIsMissing ^ lhsIsMissing) {
return lhsIsMissing ? 1 : -1;
}

if (lhsWriteLoad < maxWriteLoadOnNode && rhsWriteLoad < maxWriteLoadOnNode) {
final var lhsOverThreshold = lhsWriteLoad >= threshold;
final var rhsOverThreshold = rhsWriteLoad >= threshold;
if (lhsOverThreshold && rhsOverThreshold) {
// Both values between threshold and maximum, prefer lowest
return Double.compare(lhsWriteLoad, rhsWriteLoad);
} else if (lhsOverThreshold) {
// lhs between threshold and maximum, rhs below threshold, prefer lhs
return -1;
} else if (rhsOverThreshold) {
// lhs below threshold, rhs between threshold and maximum, prefer rhs
return 1;
}
// Both values below the threshold, prefer highest
return Double.compare(rhsWriteLoad, lhsWriteLoad);
}

// prefer the non-max write load if there is one
return Double.compare(lhsWriteLoad, rhsWriteLoad);
}
}

private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
// don't use canRebalance as we want hard filtering rules to apply. See #17698
return allocation.deciders().canAllocate(shardRouting, target, allocation);
Expand Down
Loading