-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Prioritise movement of shards in non-preferred allocations #135058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 44 commits
053edfa
e38191a
909fd6d
2696fd9
10c042d
f03e4b8
be6253c
9112b1d
49d11e3
7e23a15
fcde6c6
ceff1e3
6dcb6f0
c03bcde
fdca404
36cff36
98b5b80
df79e6f
b2d67d7
3c5f592
ae504cd
f204577
3c65dc8
3fd27b5
063995c
3815293
34a88df
cb8ac9a
fa2b936
1c4dfaf
738875d
f1061e2
7b42376
baf802d
104be72
1302b46
0055dc7
a031158
b3d85fc
ede9e44
2c87b39
7ef836d
4c6ebdb
a53787c
7d7d23c
2199491
ee95443
3ee55e7
8673351
1b87daa
57979dc
3fef076
2ea2ea3
105d7bf
50d266f
7837211
cc147c9
35dd5d4
c32d6e9
291416a
0bfcf6b
7668101
f3588e2
7643dd0
e514692
ddc7739
7753a44
4170af3
43a2dd3
29a79e1
e64343f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ | |
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.function.BiFunction; | ||
import java.util.function.Predicate; | ||
|
||
import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE; | ||
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize; | ||
|
@@ -794,7 +795,7 @@ protected int comparePivot(int j) { | |
} | ||
|
||
/** | ||
* Move started shards that can not be allocated to a node anymore | ||
* Move started shards that cannot be allocated to a node anymore, or are in a non-preferred allocation | ||
* | ||
* For each shard to be moved this function executes a move operation | ||
* to the minimal eligible node with respect to the | ||
|
@@ -808,50 +809,96 @@ public boolean moveShards() { | |
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling | ||
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are | ||
// offloading the shards. | ||
final MostDesirableMovementsTracker movementsTracker = new MostDesirableMovementsTracker(); | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) { | ||
ShardRouting shardRouting = it.next(); | ||
ProjectIndex index = projectIndex(shardRouting); | ||
final MoveDecision moveDecision = decideMove(index, shardRouting); | ||
final ShardRouting shardRouting = it.next(); | ||
final ProjectIndex index = projectIndex(shardRouting); | ||
final MoveDecision moveDecision = decideMove(index, shardRouting, movementsTracker); | ||
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { | ||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); | ||
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); | ||
sourceNode.removeShard(index, shardRouting); | ||
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard( | ||
shardRouting, | ||
targetNode.getNodeId(), | ||
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), | ||
"move", | ||
allocation.changes() | ||
); | ||
final ShardRouting shard = relocatingShards.v2(); | ||
targetNode.addShard(projectIndex(shard), shard); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); | ||
} | ||
shardMoved = true; | ||
if (completeEarlyOnShardAssignmentChange) { | ||
return true; | ||
// Defer moving of not-preferred until we've moved the NOs | ||
if (moveDecision.getCanRemainDecision().type() == Type.NOT_PREFERRED) { | ||
movementsTracker.putCurrentMoveDecision(shardRouting, moveDecision); | ||
} else { | ||
executeMove(shardRouting, index, moveDecision, "move"); | ||
if (completeEarlyOnShardAssignmentChange) { | ||
return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way this is set up, the MovementsTracker is going to be setup and discarded as many times as there are canRemain:NO shards in the cluster. Potentially a big number, and on average half the shards are iterated each time. The final iteration will go through all the shards without a NO response, allowing MovementsTracker to run. We'll then need to repeat the moveShards() method and this final iteration X times, where X is the number of nodes. I wonder if setting up the movementsTracker explicitly AFTER all of the NO answers are resolved would be more efficient. We'll guarantee two iterations of all of the shards when the not-preferred section is reached, but we'll avoid discarding the MovementsTracker all those canRemain:NO iterations. Similarly whether we could do something about repeating the whole setup per node during the not-preferred handling section. This could be a follow up, if agreed upon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think it also goes to your comment of having two distinct phases that reuse common traversal logic. I have an idea what this might look like but will hold off doing that in this PR, to unblock end-to-end testing. If the benchmark shows significant slowdown we might have to bring it forward, but if the benchmark is OK I'll put up a PR as a follow-up to explore this refactor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be ok with that as a follow-up, assuming we still do the No decisions first and keep doing |
||
} | ||
shardMoved = true; | ||
} | ||
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { | ||
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); | ||
} | ||
} | ||
|
||
// If we get here, attempt to move one of the best not-preferred shards that we identified earlier | ||
for (var preferredMove : movementsTracker.getPreferredShardMovements()) { | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
final var shardRouting = preferredMove.shardRouting(); | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
final var index = projectIndex(shardRouting); | ||
// If `shardMoved` is true, there may have been moves that have made our previous move decision | ||
// invalid, so we must call `decideMove` again. If not, we know we haven't made any moves, and we | ||
// can use the cached decision. | ||
final var moveDecision = shardMoved ? decideMove(index, shardRouting) : preferredMove.moveDecision(); | ||
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { | ||
executeMove(shardRouting, index, moveDecision, "move-non-preferred"); | ||
// We only ever move a single non-preferred shard at a time | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return true; | ||
} else { | ||
logger.trace("[{}][{}] can no longer move (not-preferred)", shardRouting.index(), shardRouting.id()); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a slight risk in this approach that we might end up doing some balancing before e.g.
But I think it's an edge case and we can probably just wait for the next There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that is only a problem outside simulation? Since otherwise we exit early anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah purely an issue for raw There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added some logic to cache the But I wonder if we should only ever move non preferred shards when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What you have now makes more sense to me, it seems desirable to move a shard still when not simulating. I am ok with the slight imprecision there - it is not our preferred mode of operation. |
||
} | ||
|
||
return shardMoved; | ||
} | ||
|
||
private void executeMove(ShardRouting shardRouting, ProjectIndex index, MoveDecision moveDecision, String reason) { | ||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); | ||
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); | ||
sourceNode.removeShard(index, shardRouting); | ||
final Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard( | ||
shardRouting, | ||
targetNode.getNodeId(), | ||
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), | ||
reason, | ||
allocation.changes() | ||
); | ||
final ShardRouting shard = relocatingShards.v2(); | ||
targetNode.addShard(projectIndex(shard), shard); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); | ||
} | ||
} | ||
|
||
/** | ||
* Decide whether to move a started shard to another node. | ||
* Unconditional version of {@link #decideMove(ProjectIndex, ShardRouting, Predicate)} | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
* | ||
* @param index The index that the shard being considered belongs to | ||
* @param shardRouting The shard routing being considered for movement | ||
* @return The {@link MoveDecision} for the shard | ||
*/ | ||
public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) { | ||
// Always assess options for non-preferred allocations | ||
return decideMove(index, shardRouting, ignored -> true); | ||
} | ||
|
||
/** | ||
* Makes a decision on whether to move a started shard to another node. The following rules apply | ||
* to the {@link MoveDecision} return object: | ||
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. | ||
* 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and | ||
* {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. | ||
* 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be | ||
* populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then | ||
* populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} returns {@code true}, then | ||
* {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null. | ||
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then | ||
* {@link MoveDecision#getNodeDecisions} will have a non-null value. | ||
* | ||
* @param index The index that the shard being considered belongs to | ||
* @param shardRouting The shard routing being considered for movement | ||
* @param nonPreferredPredicate A predicate used to determine whether to assess move options for shards in non-preferred allocations | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
* @return The {@link MoveDecision} for the shard | ||
*/ | ||
public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) { | ||
private MoveDecision decideMove(ProjectIndex index, ShardRouting shardRouting, Predicate<ShardRouting> nonPreferredPredicate) { | ||
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
NodeSorter sorter = nodeSorters.sorterForShard(shardRouting); | ||
index.assertMatch(shardRouting); | ||
|
||
|
@@ -868,15 +915,25 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar | |
return MoveDecision.remain(canRemain); | ||
} | ||
|
||
// Check predicate to decide whether to assess movement options | ||
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (canRemain.type() == Type.NOT_PREFERRED && nonPreferredPredicate.test(shardRouting) == false) { | ||
return MoveDecision.NOT_TAKEN; | ||
} | ||
|
||
sorter.reset(index); | ||
/* | ||
* the sorter holds the minimum weight node first for the shards index. | ||
* We now walk through the nodes until we find a node to allocate the shard. | ||
* This is not guaranteed to be balanced after this operation we still try best effort to | ||
* allocate on the minimal eligible node. | ||
*/ | ||
MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate); | ||
if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) { | ||
final BiFunction<ShardRouting, RoutingNode, Decision> decider = canRemain.type() == Type.NOT_PREFERRED | ||
? this::decideCanAllocatePreferredOnly | ||
: this::decideCanAllocate; | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
final MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, decider); | ||
if (moveDecision.getCanRemainDecision().type() == Type.NO | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
&& moveDecision.canRemain() == false | ||
&& moveDecision.forceMove() == false) { | ||
final boolean shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE); | ||
if (shardsOnReplacedNode) { | ||
return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate); | ||
|
@@ -935,6 +992,147 @@ private MoveDecision decideMove( | |
); | ||
} | ||
|
||
/** | ||
* Stores the most desirable shard seen so far and compares proposed shards against it using | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
* the {@link PrioritiseByShardWriteLoadComparator}. | ||
*/ | ||
private class MostDesirableMovementsTracker implements Predicate<ShardRouting> { | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
public record StoredMoveDecision(ShardRouting shardRouting, MoveDecision moveDecision) {} | ||
|
||
private final Map<String, StoredMoveDecision> bestNonPreferredShardsByNode = new HashMap<>(); | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
private final Map<String, PrioritiseByShardWriteLoadComparator> comparatorCache = new HashMap<>(); | ||
|
||
@Override | ||
public boolean test(ShardRouting shardRouting) { | ||
final var currentShardForNode = bestNonPreferredShardsByNode.get(shardRouting.currentNodeId()); | ||
if (currentShardForNode == null) { | ||
return true; | ||
} | ||
int comparison = comparatorCache.computeIfAbsent( | ||
shardRouting.currentNodeId(), | ||
DiannaHohensee marked this conversation as resolved.
Show resolved
Hide resolved
|
||
nodeId -> new PrioritiseByShardWriteLoadComparator(allocation, allocation.routingNodes().node(nodeId)) | ||
).compare(shardRouting, currentShardForNode.shardRouting()); | ||
// Ignore inferior non-preferred moves | ||
return comparison > 0; | ||
} | ||
|
||
public void putCurrentMoveDecision(ShardRouting shardRouting, MoveDecision moveDecision) { | ||
bestNonPreferredShardsByNode.put(shardRouting.currentNodeId(), new StoredMoveDecision(shardRouting, moveDecision)); | ||
} | ||
|
||
public Iterable<StoredMoveDecision> getPreferredShardMovements() { | ||
return bestNonPreferredShardsByNode.values(); | ||
} | ||
} | ||
|
||
/** | ||
* Sorts shards by desirability to move, ranking goes (in descending priority order) | ||
henningandersen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
* <ol> | ||
* <li>Shards with write-load in <i>{@link PrioritiseByShardWriteLoadComparator#threshold}</i> → | ||
* {@link PrioritiseByShardWriteLoadComparator#maxWriteLoadOnNode} (exclusive)</li> | ||
* <li>Shards with write-load in <i>{@link PrioritiseByShardWriteLoadComparator#threshold}</i> → 0</li> | ||
* <li>Shards with write-load == {@link PrioritiseByShardWriteLoadComparator#maxWriteLoadOnNode}</li> | ||
* <li>Shards with missing write-load</li> | ||
* </ol> | ||
* | ||
* e.g., for any two <code>ShardRouting</code>s, <code>r1</code> and <code>r2</code>, | ||
* <ul> | ||
* <li><code>compare(r1, r2) > 0</code> when <code>r1</code> is most desirable to move</li> | ||
* <li><code>compare(r1, r2) == 0</code> when the two shards are equally desirable to move</li> | ||
* <li><code>compare(r1, r2) < 0</code> when <code>r2</code> is most desirable to move</li> | ||
* </ul> | ||
*/ | ||
// Visible for testing | ||
static class PrioritiseByShardWriteLoadComparator implements Comparator<ShardRouting> { | ||
|
||
/** | ||
* This is the threshold over which we consider shards to have a "high" write load represented | ||
* as a ratio of the maximum write-load present on the node. | ||
* <p> | ||
* We prefer to move shards that have a write-load close to <b>this value</b> x {@link #maxWriteLoadOnNode}. | ||
*/ | ||
private static final double THRESHOLD_RATIO = 0.5; | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
private static final double MISSING_WRITE_LOAD = -1; | ||
private final Map<ShardId, Double> shardWriteLoads; | ||
private final double maxWriteLoadOnNode; | ||
private final double threshold; | ||
private final String nodeId; | ||
|
||
PrioritiseByShardWriteLoadComparator(RoutingAllocation allocation, RoutingNode routingNode) { | ||
shardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); | ||
double maxWriteLoadOnNode = MISSING_WRITE_LOAD; | ||
for (ShardRouting shardRouting : routingNode) { | ||
maxWriteLoadOnNode = Math.max( | ||
maxWriteLoadOnNode, | ||
shardWriteLoads.getOrDefault(shardRouting.shardId(), MISSING_WRITE_LOAD) | ||
); | ||
} | ||
this.maxWriteLoadOnNode = maxWriteLoadOnNode; | ||
threshold = maxWriteLoadOnNode * THRESHOLD_RATIO; | ||
nodeId = routingNode.nodeId(); | ||
} | ||
|
||
@Override | ||
public int compare(ShardRouting lhs, ShardRouting rhs) { | ||
assert nodeId.equals(lhs.currentNodeId()) && nodeId.equals(rhs.currentNodeId()) | ||
: this.getClass().getSimpleName() | ||
+ " is node-specific. comparator=" | ||
+ nodeId | ||
+ ", lhs=" | ||
+ lhs.currentNodeId() | ||
+ ", rhs=" | ||
+ rhs.currentNodeId(); | ||
|
||
// If we have no shard write-load data, shortcut | ||
if (maxWriteLoadOnNode == MISSING_WRITE_LOAD) { | ||
return 0; | ||
} | ||
|
||
final double lhsWriteLoad = shardWriteLoads.getOrDefault(lhs.shardId(), MISSING_WRITE_LOAD); | ||
final double rhsWriteLoad = shardWriteLoads.getOrDefault(rhs.shardId(), MISSING_WRITE_LOAD); | ||
|
||
// prefer any known write-load over any unknown write-load | ||
final var rhsIsMissing = rhsWriteLoad == MISSING_WRITE_LOAD; | ||
final var lhsIsMissing = lhsWriteLoad == MISSING_WRITE_LOAD; | ||
if (rhsIsMissing && lhsIsMissing) { | ||
return 0; | ||
} | ||
if (rhsIsMissing ^ lhsIsMissing) { | ||
return lhsIsMissing ? -1 : 1; | ||
} | ||
|
||
if (lhsWriteLoad < maxWriteLoadOnNode && rhsWriteLoad < maxWriteLoadOnNode) { | ||
final var lhsOverThreshold = lhsWriteLoad >= threshold; | ||
final var rhsOverThreshold = rhsWriteLoad >= threshold; | ||
if (lhsOverThreshold && rhsOverThreshold) { | ||
// Both values between threshold and maximum, prefer lowest | ||
return Double.compare(rhsWriteLoad, lhsWriteLoad); | ||
} else if (lhsOverThreshold) { | ||
// lhs between threshold and maximum, rhs below threshold, prefer lhs | ||
return 1; | ||
} else if (rhsOverThreshold) { | ||
// lhs below threshold, rhs between threshold and maximum, prefer rhs | ||
return -1; | ||
} | ||
// Both values below the threshold, prefer highest | ||
return Double.compare(lhsWriteLoad, rhsWriteLoad); | ||
} | ||
|
||
// prefer the non-max write load if there is one | ||
return Double.compare(rhsWriteLoad, lhsWriteLoad); | ||
} | ||
} | ||
|
||
private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) { | ||
final Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation); | ||
// not-preferred means no here | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if (decision.type() == Type.NOT_PREFERRED) { | ||
return Decision.NO; | ||
} | ||
return decision; | ||
} | ||
|
||
private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { | ||
// don't use canRebalance as we want hard filtering rules to apply. See #17698 | ||
return allocation.deciders().canAllocate(shardRouting, target, allocation); | ||
|
Uh oh!
There was an error while loading. Please reload this page.