-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Prioritise movement of shards in non-preferred allocations #135058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
053edfa
e38191a
909fd6d
2696fd9
10c042d
f03e4b8
be6253c
9112b1d
49d11e3
7e23a15
fcde6c6
ceff1e3
6dcb6f0
c03bcde
fdca404
36cff36
98b5b80
df79e6f
b2d67d7
3c5f592
ae504cd
f204577
3c65dc8
3fd27b5
063995c
3815293
34a88df
cb8ac9a
fa2b936
1c4dfaf
738875d
f1061e2
7b42376
baf802d
104be72
1302b46
0055dc7
a031158
b3d85fc
ede9e44
2c87b39
7ef836d
4c6ebdb
a53787c
7d7d23c
2199491
ee95443
3ee55e7
8673351
1b87daa
57979dc
3fef076
2ea2ea3
105d7bf
50d266f
7837211
cc147c9
35dd5d4
c32d6e9
291416a
0bfcf6b
7668101
f3588e2
7643dd0
e514692
ddc7739
7753a44
4170af3
43a2dd3
29a79e1
e64343f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -805,37 +805,104 @@ public boolean moveShards() { | |
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling | ||
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are | ||
// offloading the shards. | ||
final Map<String, ShardRouting> bestNonPreferredShardsByNode = new HashMap<>(); | ||
final Map<String, ShardMovementPriorityComparator> comparatorCache = new HashMap<>(); | ||
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) { | ||
ShardRouting shardRouting = it.next(); | ||
ProjectIndex index = projectIndex(shardRouting); | ||
final MoveDecision moveDecision = decideMove(index, shardRouting); | ||
final MoveDecision moveDecision = decideMove(index, shardRouting, bestNonPreferredShardsByNode, comparatorCache); | ||
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { | ||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); | ||
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); | ||
sourceNode.removeShard(index, shardRouting); | ||
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard( | ||
shardRouting, | ||
targetNode.getNodeId(), | ||
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), | ||
"move", | ||
allocation.changes() | ||
); | ||
final ShardRouting shard = relocatingShards.v2(); | ||
targetNode.addShard(projectIndex(shard), shard); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); | ||
} | ||
shardMoved = true; | ||
if (completeEarlyOnShardAssignmentChange) { | ||
return true; | ||
// Defer moving of not-preferred until we've moved the NOs | ||
if (moveDecision.getCanRemainDecision().type() == Type.NOT_PREFERRED) { | ||
bestNonPreferredShardsByNode.put(shardRouting.currentNodeId(), shardRouting); | ||
} else { | ||
executeMove(shardRouting, index, moveDecision); | ||
shardMoved = true; | ||
if (completeEarlyOnShardAssignmentChange) { | ||
return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way this is set up, the MovementsTracker is going to be setup and discarded as many times as there are canRemain:NO shards in the cluster. Potentially a big number, and on average half the shards are iterated each time. The final iteration will go through all the shards without a NO response, allowing MovementsTracker to run. We'll then need to repeat the moveShards() method and this final iteration X times, where X is the number of nodes. I wonder if setting up the movementsTracker explicitly AFTER all of the NO answers are resolved would be more efficient. We'll guarantee two iterations of all of the shards when the not-preferred section is reached, but we'll avoid discarding the MovementsTracker all those canRemain:NO iterations. Similarly whether we could do something about repeating the whole setup per node during the not-preferred handling section. This could be a follow up, if agreed upon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think it also goes to your comment of having two distinct phases that reuse common traversal logic. I have an idea what this might look like but will hold off doing that in this PR, to unblock end-to-end testing. If the benchmark shows significant slowdown we might have to bring it forward, but if the benchmark is OK I'll put up a PR as a follow-up to explore this refactor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be ok with that as a follow-up, assuming we still do the No decisions first and keep doing |
||
} | ||
} | ||
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { | ||
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); | ||
} | ||
} | ||
|
||
// If we get here, attempt to move one of the best not-preferred shards that we identified earlier | ||
for (var entry : bestNonPreferredShardsByNode.entrySet()) { | ||
final var shardRouting = entry.getValue(); | ||
final var index = projectIndex(shardRouting); | ||
// Have to re-check move decision in case we made a move that invalidated our existing assessment | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
final MoveDecision moveDecision = decideMove(index, shardRouting); | ||
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { | ||
executeMove(entry.getValue(), index, moveDecision); | ||
// We only ever move a single non-preferred shard at a time | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
return true; | ||
} else { | ||
logger.trace("[{}][{}] can no longer move (not-preferred)", shardRouting.index(), shardRouting.id()); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a slight risk in this approach that we might end up doing some balancing before e.g.
But I think it's an edge case and we can probably just wait for the next There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that is only a problem outside simulation? Since otherwise we exit early anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah purely an issue for raw There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added some logic to cache the But I wonder if we should only ever move non preferred shards when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What you have now makes more sense to me, it seems desirable to move a shard still when not simulating. I am ok with the slight imprecision there - it is not our preferred mode of operation. |
||
} | ||
|
||
return shardMoved; | ||
} | ||
|
||
private void executeMove(ShardRouting shardRouting, ProjectIndex index, MoveDecision moveDecision) { | ||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); | ||
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); | ||
sourceNode.removeShard(index, shardRouting); | ||
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard( | ||
shardRouting, | ||
targetNode.getNodeId(), | ||
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), | ||
"move", | ||
allocation.changes() | ||
); | ||
final ShardRouting shard = relocatingShards.v2(); | ||
targetNode.addShard(projectIndex(shard), shard); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); | ||
} | ||
} | ||
|
||
private static class ShardMovementPriorityComparator implements Comparator<ShardRouting> { | ||
|
||
private final Map<ShardId, Double> shardWriteLoads; | ||
private final double lowThreshold; | ||
private final double highThreshold; | ||
|
||
ShardMovementPriorityComparator(RoutingAllocation allocation, RoutingNode routingNode) { | ||
shardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); | ||
double maxWriteLoadOnNode = 0; | ||
for (ShardRouting shardRouting : routingNode) { | ||
maxWriteLoadOnNode = Math.max(maxWriteLoadOnNode, shardWriteLoads.getOrDefault(shardRouting.shardId(), 0.0)); | ||
} | ||
lowThreshold = maxWriteLoadOnNode * 0.5; | ||
highThreshold = maxWriteLoadOnNode * 0.8; | ||
} | ||
|
||
@Override | ||
public int compare(ShardRouting o1, ShardRouting o2) { | ||
// If we have no shard write-load data, shortcut | ||
if (highThreshold == 0) { | ||
return 0; | ||
} | ||
// Otherwise, we prefer middle, high, then low write-load shards | ||
return Integer.compare( | ||
getWriteLoadLevel(shardWriteLoads.getOrDefault(o1.shardId(), 0.0)), | ||
getWriteLoadLevel(shardWriteLoads.getOrDefault(o2.shardId(), 0.0)) | ||
); | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} | ||
|
||
private int getWriteLoadLevel(double writeLoad) { | ||
if (writeLoad < lowThreshold) { | ||
return 2; | ||
} | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if (writeLoad < highThreshold) { | ||
return 0; | ||
} | ||
return 1; | ||
} | ||
} | ||
|
||
/** | ||
* Makes a decision on whether to move a started shard to another node. The following rules apply | ||
* to the {@link MoveDecision} return object: | ||
|
@@ -848,7 +915,17 @@ public boolean moveShards() { | |
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then | ||
* {@link MoveDecision#getNodeDecisions} will have a non-null value. | ||
*/ | ||
public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shardRouting) { | ||
public MoveDecision decideMove(ProjectIndex index, ShardRouting shardRouting) { | ||
// This is when we're not calling in a loop, so we don't need to keep track of "best non-preferred" shards | ||
return decideMove(index, shardRouting, Map.of(), Map.of()); | ||
} | ||
|
||
private MoveDecision decideMove( | ||
ProjectIndex index, | ||
ShardRouting shardRouting, | ||
Map<String, ShardRouting> bestNonPreferredShardsByNode, | ||
Map<String, ShardMovementPriorityComparator> comparatorCache | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
) { | ||
NodeSorter sorter = nodeSorters.sorterForShard(shardRouting); | ||
index.assertMatch(shardRouting); | ||
|
||
|
@@ -861,19 +938,36 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar | |
assert sourceNode != null && sourceNode.containsShard(index, shardRouting); | ||
RoutingNode routingNode = sourceNode.getRoutingNode(); | ||
Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); | ||
if (canRemain.type() != Decision.Type.NO) { | ||
if (canRemain.type() != Decision.Type.NO && canRemain.type() != Type.NOT_PREFERRED) { | ||
return MoveDecision.remain(canRemain); | ||
} | ||
|
||
// Investigate whether it's potentially a better move to make than the one we've discovered already | ||
if (canRemain.type() == Type.NOT_PREFERRED && bestNonPreferredShardsByNode.containsKey(shardRouting.currentNodeId())) { | ||
int compare = comparatorCache.computeIfAbsent( | ||
shardRouting.currentNodeId(), | ||
nodeId -> new ShardMovementPriorityComparator(allocation, allocation.routingNodes().node(nodeId)) | ||
|
||
).compare(shardRouting, bestNonPreferredShardsByNode.get(shardRouting.currentNodeId())); | ||
if (compare <= 0) { | ||
// Ignore inferior non-preferred moves | ||
return MoveDecision.NOT_TAKEN; | ||
} | ||
} | ||
|
||
sorter.reset(index); | ||
/* | ||
* the sorter holds the minimum weight node first for the shards index. | ||
* We now walk through the nodes until we find a node to allocate the shard. | ||
* This is not guaranteed to be balanced after this operation we still try best effort to | ||
* allocate on the minimal eligible node. | ||
*/ | ||
MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocate); | ||
if (moveDecision.canRemain() == false && moveDecision.forceMove() == false) { | ||
BiFunction<ShardRouting, RoutingNode, Decision> decider = canRemain.type() == Type.NOT_PREFERRED | ||
? this::decideCanAllocatePreferredOnly | ||
: this::decideCanAllocate; | ||
MoveDecision moveDecision = decideMove(sorter, shardRouting, sourceNode, canRemain, decider); | ||
if (moveDecision.getCanRemainDecision().type() == Type.NO | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
&& moveDecision.canRemain() == false | ||
&& moveDecision.forceMove() == false) { | ||
final boolean shardsOnReplacedNode = allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId(), REPLACE); | ||
if (shardsOnReplacedNode) { | ||
return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanForceAllocateForVacate); | ||
|
@@ -924,6 +1018,15 @@ private MoveDecision decideMove( | |
); | ||
} | ||
|
||
private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) { | ||
Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation); | ||
// not-preferred means no here | ||
DiannaHohensee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if (decision.type() == Type.NOT_PREFERRED) { | ||
return Decision.NO; | ||
} | ||
return decision; | ||
} | ||
|
||
private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { | ||
// don't use canRebalance as we want hard filtering rules to apply. See #17698 | ||
return allocation.deciders().canAllocate(shardRouting, target, allocation); | ||
|
Uh oh!
There was an error while loading. Please reload this page.