Skip to content

Commit 6f96ea3

Browse files
Balancer changes to use Decision#NOT_PREFERRED (#134160)
Closes ES-12716
1 parent ff6b623 commit 6f96ea3

File tree

6 files changed

+432
-142
lines changed

6 files changed

+432
-142
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java

Lines changed: 380 additions & 127 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/MoveDecision.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ public Decision getCanRemainDecision() {
179179
* the result of this method is meaningless, as no rebalance decision was taken. If {@link #isDecisionTaken()}
180180
* returns {@code false}, then invoking this method will throw an {@code IllegalStateException}.
181181
*/
182+
// @VisibleForTesting
182183
public boolean canRebalanceCluster() {
183184
checkDecisionState();
184185
return clusterRebalanceDecision != null && clusterRebalanceDecision.type() == Type.YES;
@@ -192,6 +193,7 @@ public boolean canRebalanceCluster() {
192193
* If {@link #isDecisionTaken()} returns {@code false}, then invoking this method will throw an
193194
* {@code IllegalStateException}.
194195
*/
196+
// @VisibleForTesting
195197
@Nullable
196198
public Decision getClusterRebalanceDecision() {
197199
checkDecisionState();

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ public MoveDecision decideMove(final ProjectIndex index, final ShardRouting shar
861861
assert sourceNode != null && sourceNode.containsShard(index, shardRouting);
862862
RoutingNode routingNode = sourceNode.getRoutingNode();
863863
Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
864-
if (canRemain.type() != Decision.Type.NO) {
864+
if (canRemain.type() != Decision.Type.NO && canRemain.type() != Decision.Type.NOT_PREFERRED) {
865865
return MoveDecision.remain(canRemain);
866866
}
867867

@@ -901,7 +901,11 @@ private MoveDecision decideMove(
901901
if (explain) {
902902
nodeResults.add(new NodeAllocationResult(currentNode.getRoutingNode().node(), allocationDecision, ++weightRanking));
903903
}
904-
// TODO maybe we can respect throttling here too?
904+
// TODO (ES-12633): test that nothing moves when the source is not-preferred and the target is not-preferred.
905+
if (allocationDecision.type() == Type.NOT_PREFERRED && remainDecision.type() == Type.NOT_PREFERRED) {
906+
// Relocating a shard from one NOT_PREFERRED node to another would not improve the situation.
907+
continue;
908+
}
905909
if (allocationDecision.type().higherThan(bestDecision)) {
906910
bestDecision = allocationDecision.type();
907911
if (bestDecision == Type.YES) {
@@ -911,6 +915,10 @@ private MoveDecision decideMove(
911915
// no need to continue iterating
912916
break;
913917
}
918+
} else if (bestDecision == Type.NOT_PREFERRED) {
919+
assert remainDecision.type() != Type.NOT_PREFERRED;
920+
// If we don't ever find a YES decision, we'll settle for NOT_PREFERRED as preferable to NO.
921+
targetNode = target;
914922
}
915923
}
916924
}
@@ -1221,7 +1229,7 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn
12211229
continue;
12221230
}
12231231
final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation);
1224-
if (allocationDecision.type() == Type.NO) {
1232+
if (allocationDecision.type() == Type.NO || allocationDecision.type() == Type.NOT_PREFERRED) {
12251233
continue;
12261234
}
12271235

@@ -1407,7 +1415,7 @@ public boolean containsShard(ShardRouting shard) {
14071415
public static final class NodeSorter extends IntroSorter {
14081416

14091417
final ModelNode[] modelNodes;
1410-
/* the nodes weights with respect to the current weight function / index */
1418+
/** The nodes weights with respect to the current weight function / index */
14111419
final float[] weights;
14121420
private final WeightFunction function;
14131421
private ProjectIndex index;

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,10 @@ private void moveShards() {
503503

504504
final var routingNode = routingNodes.node(shardRouting.currentNodeId());
505505
final var canRemainDecision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
506-
if (canRemainDecision.type() != Decision.Type.NO) {
507-
// it's desired elsewhere but technically it can remain on its current node. Defer its movement until later on to give
508-
// priority to shards that _must_ move.
506+
if (canRemainDecision.type() != Decision.Type.NO && canRemainDecision.type() != Decision.Type.NOT_PREFERRED) {
507+
// If movement is throttled, a future reconciliation round will see a resolution. For now, leave it alone.
508+
// Reconciliation treats canRemain NOT_PREFERRED answers as YES because the DesiredBalance computation already decided
509+
// how to handle the situation.
509510
continue;
510511
}
511512

@@ -650,6 +651,7 @@ private DiscoveryNode findRelocationTarget(
650651
Set<String> desiredNodeIds,
651652
BiFunction<ShardRouting, RoutingNode, Decision> canAllocateDecider
652653
) {
654+
DiscoveryNode chosenNode = null;
653655
for (final var nodeId : desiredNodeIds) {
654656
// TODO consider ignored nodes here too?
655657
if (nodeId.equals(shardRouting.currentNodeId())) {
@@ -661,12 +663,24 @@ private DiscoveryNode findRelocationTarget(
661663
}
662664
final var decision = canAllocateDecider.apply(shardRouting, node);
663665
logger.trace("relocate {} to {}: {}", shardRouting, nodeId, decision);
666+
667+
// Assign shards to the YES nodes first. This way we might delay moving shards to NOT_PREFERRED nodes until after shards are
668+
// first moved away. The DesiredBalance could be moving shards away from a hot node as well as moving shards to it, and it's
669+
// better to offload shards first.
664670
if (decision.type() == Decision.Type.YES) {
665-
return node.node();
671+
chosenNode = node.node();
672+
// As soon as we get any YES, we return it.
673+
break;
674+
} else if (decision.type() == Decision.Type.NOT_PREFERRED && chosenNode == null) {
675+
// If the best answer is not-preferred, then the shard will still be assigned. It is okay to assign to a not-preferred
676+
// node because the desired balance computation had a reason to override it: when there aren't any better nodes to
677+
// choose and the shard cannot remain where it is, we accept not-preferred. NOT_PREFERRED is essentially a YES for
678+
// reconciliation.
679+
chosenNode = node.node();
666680
}
667681
}
668682

669-
return null;
683+
return chosenNode;
670684
}
671685

672686
private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
7373
shardRouting.shardId()
7474
);
7575
logger.debug(explain);
76-
return Decision.single(Decision.Type.NO, NAME, explain);
76+
return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain);
7777
}
7878

79-
if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) {
79+
var newWriteThreadPoolUtilization = calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad);
80+
if (newWriteThreadPoolUtilization >= nodeWriteThreadPoolLoadThreshold) {
8081
// The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard.
8182
// This could lead to a hot spot on this node and is undesirable.
8283
String explain = Strings.format(
@@ -92,10 +93,22 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
9293
nodeWriteThreadPoolStats.totalThreadPoolThreads()
9394
);
9495
logger.debug(explain);
95-
return Decision.single(Decision.Type.NO, NAME, explain);
96+
return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain);
9697
}
9798

98-
return Decision.YES;
99+
String explanation = Strings.format(
100+
"Shard [%s] in index [%s] can be assigned to node [%s]. The node's utilization would become [%s]",
101+
shardRouting.shardId(),
102+
shardRouting.index(),
103+
node.nodeId(),
104+
newWriteThreadPoolUtilization
105+
);
106+
107+
if (logger.isTraceEnabled()) {
108+
logger.trace(explanation);
109+
}
110+
111+
return allocation.decision(Decision.YES, NAME, explanation);
99112
}
100113

101114
@Override

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void testWriteLoadDeciderCanAllocate() {
104104
);
105105
assertEquals(
106106
"Assigning a new shard to a node that is above the threshold should fail",
107-
Decision.Type.NO,
107+
Decision.Type.NOT_PREFERRED,
108108
writeLoadDecider.canAllocate(
109109
testHarness.shardRouting2,
110110
testHarness.exceedingThresholdRoutingNode,
@@ -128,7 +128,7 @@ public void testWriteLoadDeciderCanAllocate() {
128128
);
129129
assertEquals(
130130
"Assigning a new shard that would cause the node to exceed capacity should fail",
131-
Decision.Type.NO,
131+
Decision.Type.NOT_PREFERRED,
132132
writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation)
133133
.type()
134134
);

0 commit comments

Comments
 (0)