Skip to content

Commit 43741c2

Browse files
authored
Add an option to return early from an allocate call (#134786)
Instead of running simulation all the way to balance or no possible movement, this PR adds an option to finish early based on the number of relocating shards. Note this early return mechanism is enabled only when desired balance is in use, i.e. it affects simulation only. Resolves: ES-12862
1 parent aae1ffc commit 43741c2

File tree

5 files changed

+256
-18
lines changed

5 files changed

+256
-18
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 107 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -140,25 +140,83 @@ public BalancedShardsAllocator(
140140

141141
@Override
142142
public void allocate(RoutingAllocation allocation) {
143+
assert allocation.isSimulating() == false || balancerSettings.completeEarlyOnShardAssignmentChange()
144+
: "inconsistent states: isSimulating ["
145+
+ allocation.isSimulating()
146+
+ "] vs completeEarlyOnShardAssignmentChange ["
147+
+ balancerSettings.completeEarlyOnShardAssignmentChange()
148+
+ "]";
143149
if (allocation.metadata().hasAnyIndices()) {
144150
// must not use licensed features when just starting up
145151
writeLoadForecaster.refreshLicense();
146152
}
147153

148154
assert allocation.ignoreDisable() == false;
155+
assert allocation.isSimulating() == false || allocation.routingNodes().hasInactiveShards() == false
156+
: "expect no initializing shard, but got " + allocation.routingNodes();
157+
// TODO: ES-12943 cannot assert the following because shards moved by commands are not simulated promptly in DesiredBalanceComputer
158+
// assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
159+
// : "expect no relocating shard, but got " + allocation.routingNodes();
149160

150161
if (allocation.routingNodes().size() == 0) {
151162
failAllocationOfNewPrimaries(allocation);
152163
return;
153164
}
154165
final BalancingWeights balancingWeights = balancingWeightsFactory.create();
155-
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights);
156-
balancer.allocateUnassigned();
157-
balancer.moveShards();
158-
balancer.balance();
166+
final Balancer balancer = new Balancer(
167+
writeLoadForecaster,
168+
allocation,
169+
balancerSettings.getThreshold(),
170+
balancingWeights,
171+
balancerSettings.completeEarlyOnShardAssignmentChange()
172+
);
173+
174+
boolean shardAssigned = false, shardMoved = false, shardBalanced = false;
175+
try {
176+
shardAssigned = balancer.allocateUnassigned();
177+
if (shardAssigned && balancerSettings.completeEarlyOnShardAssignmentChange()) {
178+
return;
179+
}
180+
181+
shardMoved = balancer.moveShards();
182+
if (shardMoved && balancerSettings.completeEarlyOnShardAssignmentChange()) {
183+
return;
184+
}
185+
186+
shardBalanced = balancer.balance();
187+
} finally {
188+
if (logger.isDebugEnabled()) {
189+
logger.debug(
190+
"shards assigned: {}, shards moved: {}, shards balanced: {}, "
191+
+ "routingNodes hasInactiveShards [{}], relocation count [{}]",
192+
shardAssigned,
193+
shardMoved,
194+
shardBalanced,
195+
allocation.routingNodes().hasInactiveShards(),
196+
allocation.routingNodes().getRelocatingShardCount()
197+
);
198+
}
199+
assert assertShardAssignmentChanges(allocation, shardAssigned, shardMoved, shardBalanced);
200+
// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
201+
collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation);
202+
}
203+
}
159204

160-
// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
161-
collectAndRecordNodeWeightStats(balancer, balancingWeights, allocation);
205+
private boolean assertShardAssignmentChanges(
206+
RoutingAllocation allocation,
207+
boolean shardAssigned,
208+
boolean shardMoved,
209+
boolean shardBalanced
210+
) {
211+
if (allocation.isSimulating() == false) {
212+
return true;
213+
}
214+
assert shardAssigned == false || allocation.routingNodes().hasInactiveShards()
215+
: "expect initializing shard, but got " + allocation.routingNodes();
216+
217+
assert (shardMoved == false && shardBalanced == false) || allocation.routingNodes().getRelocatingShardCount() > 0
218+
: "expect relocating shard, but got " + allocation.routingNodes();
219+
return true;
162220
}
163221

164222
private void collectAndRecordNodeWeightStats(Balancer balancer, BalancingWeights balancingWeights, RoutingAllocation allocation) {
@@ -188,7 +246,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
188246
writeLoadForecaster,
189247
allocation,
190248
balancerSettings.getThreshold(),
191-
balancingWeightsFactory.create()
249+
balancingWeightsFactory.create(),
250+
balancerSettings.completeEarlyOnShardAssignmentChange()
192251
);
193252
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
194253
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
@@ -248,12 +307,14 @@ public static class Balancer {
248307
private final Map<String, ModelNode> nodes;
249308
private final BalancingWeights balancingWeights;
250309
private final NodeSorters nodeSorters;
310+
private final boolean completeEarlyOnShardAssignmentChange;
251311

252312
private Balancer(
253313
WriteLoadForecaster writeLoadForecaster,
254314
RoutingAllocation allocation,
255315
float threshold,
256-
BalancingWeights balancingWeights
316+
BalancingWeights balancingWeights,
317+
boolean completeEarlyOnShardAssignmentChange
257318
) {
258319
this.writeLoadForecaster = writeLoadForecaster;
259320
this.allocation = allocation;
@@ -266,6 +327,7 @@ private Balancer(
266327
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
267328
this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this);
268329
this.balancingWeights = balancingWeights;
330+
this.completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange;
269331
}
270332

271333
private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) {
@@ -358,7 +420,7 @@ private IndexMetadata indexMetadata(ProjectIndex index) {
358420
* Balances the nodes on the cluster model according to the weight function.
359421
* The actual balancing is delegated to {@link #balanceByWeights(NodeSorter)}
360422
*/
361-
private void balance() {
423+
private boolean balance() {
362424
if (logger.isTraceEnabled()) {
363425
logger.trace("Start balancing cluster");
364426
}
@@ -371,21 +433,27 @@ private void balance() {
371433
* Therefore we only do a rebalance if we have fetched all information.
372434
*/
373435
logger.debug("skipping rebalance due to in-flight shard/store fetches");
374-
return;
436+
return false;
375437
}
376438
if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
377439
logger.trace("skipping rebalance as it is disabled");
378-
return;
440+
return false;
379441
}
380442

443+
boolean shardBalanced = false;
381444
// Balance each partition
382445
for (NodeSorter nodeSorter : nodeSorters) {
383446
if (nodeSorter.modelNodes.length < 2) { /* skip if we only have one node */
384447
logger.trace("skipping rebalance as the partition has single node only");
385448
continue;
386449
}
387-
balanceByWeights(nodeSorter);
450+
shardBalanced |= balanceByWeights(nodeSorter);
451+
// TODO: We could choose to account shardBalanced separately for each partition since they do not overlap.
452+
if (shardBalanced && completeEarlyOnShardAssignmentChange) {
453+
return true;
454+
}
388455
}
456+
return shardBalanced;
389457
}
390458

391459
/**
@@ -531,7 +599,8 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin
531599
* only, or in other words relocations that move the weight delta closer
532600
* to {@code 0.0}
533601
*/
534-
private void balanceByWeights(NodeSorter sorter) {
602+
private boolean balanceByWeights(NodeSorter sorter) {
603+
boolean shardBalanced = false;
535604
final AllocationDeciders deciders = allocation.deciders();
536605
final ModelNode[] modelNodes = sorter.modelNodes;
537606
final float[] weights = sorter.weights;
@@ -630,6 +699,15 @@ private void balanceByWeights(NodeSorter sorter) {
630699
sorter.sort(0, relevantNodes);
631700
lowIdx = 0;
632701
highIdx = relevantNodes - 1;
702+
703+
shardBalanced = true;
704+
if (completeEarlyOnShardAssignmentChange && routingNodes.getRelocatingShardCount() > 0) {
705+
// ES-12955: Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
706+
// It should not happen in production, i.e, throttling should not happen unless there is a prior shard
707+
// that is already relocating. But in tests, we have decider like RandomAllocationDecider that can
708+
// randomly return THROTTLE when there is no existing relocation.
709+
return true;
710+
}
633711
continue;
634712
}
635713
}
@@ -651,6 +729,7 @@ private void balanceByWeights(NodeSorter sorter) {
651729
}
652730
}
653731
}
732+
return shardBalanced;
654733
}
655734

656735
/**
@@ -721,7 +800,8 @@ protected int comparePivot(int j) {
721800
* shard is created with an incremented version in the state
722801
* {@link ShardRoutingState#INITIALIZING}.
723802
*/
724-
public void moveShards() {
803+
public boolean moveShards() {
804+
boolean shardMoved = false;
725805
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
726806
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
727807
// offloading the shards.
@@ -745,10 +825,15 @@ public void moveShards() {
745825
if (logger.isTraceEnabled()) {
746826
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
747827
}
828+
shardMoved = true;
829+
if (completeEarlyOnShardAssignmentChange) {
830+
return true;
831+
}
748832
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
749833
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
750834
}
751835
}
836+
return shardMoved;
752837
}
753838

754839
/**
@@ -880,14 +965,14 @@ private Map<String, ModelNode> buildModelFromAssigned() {
880965
* Allocates all given shards on the minimal eligible node for the shards index
881966
* with respect to the weight function. All given shards must be unassigned.
882967
*/
883-
private void allocateUnassigned() {
968+
private boolean allocateUnassigned() {
884969
RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
885970
assert nodes.isEmpty() == false;
886971
if (logger.isTraceEnabled()) {
887972
logger.trace("Start allocating unassigned shards");
888973
}
889974
if (unassigned.isEmpty()) {
890-
return;
975+
return false;
891976
}
892977

893978
/*
@@ -924,6 +1009,7 @@ private void allocateUnassigned() {
9241009
int secondaryLength = 0;
9251010
int primaryLength = primary.length;
9261011
ArrayUtil.timSort(primary, comparator);
1012+
boolean shardAssignmentChanged = false;
9271013
do {
9281014
for (int i = 0; i < primaryLength; i++) {
9291015
ShardRouting shard = primary[i];
@@ -941,6 +1027,7 @@ private void allocateUnassigned() {
9411027

9421028
final long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation);
9431029
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
1030+
shardAssignmentChanged = true;
9441031
minNode.addShard(index, shard);
9451032
if (shard.primary() == false) {
9461033
// copy over the same replica shards to the secondary array so they will get allocated
@@ -964,6 +1051,9 @@ private void allocateUnassigned() {
9641051
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
9651052
final long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation);
9661053
minNode.addShard(projectIndex(shard), shard.initialize(minNode.getNodeId(), null, shardSize));
1054+
// If we see a throttle decision in simulation, there must be other shards that got assigned before it.
1055+
assert allocation.isSimulating() == false || shardAssignmentChanged
1056+
: "shard " + shard + " was throttled but no other shards were assigned";
9671057
} else {
9681058
if (logger.isTraceEnabled()) {
9691059
logger.trace("No Node found to assign shard [{}]", shard);
@@ -986,6 +1076,7 @@ private void allocateUnassigned() {
9861076
secondaryLength = 0;
9871077
} while (primaryLength > 0);
9881078
// clear everything we have either added it or moved to ignoreUnassigned
1079+
return shardAssignmentChanged;
9891080
}
9901081

9911082
private ProjectIndex projectIndex(ShardRouting shardRouting) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import org.elasticsearch.cluster.ClusterModule;
1213
import org.elasticsearch.common.settings.ClusterSettings;
1314
import org.elasticsearch.common.settings.Settings;
1415

@@ -26,6 +27,7 @@ public class BalancerSettings {
2627
private volatile float writeLoadBalanceFactor;
2728
private volatile float diskUsageBalanceFactor;
2829
private volatile float threshold;
30+
private final boolean completeEarlyOnShardAssignmentChange;
2931

3032
public BalancerSettings(Settings settings) {
3133
this(ClusterSettings.createBuiltInClusterSettings(settings));
@@ -37,6 +39,9 @@ public BalancerSettings(ClusterSettings clusterSettings) {
3739
clusterSettings.initializeAndWatch(WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value);
3840
clusterSettings.initializeAndWatch(DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value);
3941
clusterSettings.initializeAndWatch(THRESHOLD_SETTING, value -> this.threshold = value);
42+
this.completeEarlyOnShardAssignmentChange = ClusterModule.DESIRED_BALANCE_ALLOCATOR.equals(
43+
clusterSettings.get(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING)
44+
);
4045
}
4146

4247
/**
@@ -67,4 +72,8 @@ public float getDiskUsageBalanceFactor() {
6772
public float getThreshold() {
6873
return threshold;
6974
}
75+
76+
public boolean completeEarlyOnShardAssignmentChange() {
77+
return completeEarlyOnShardAssignmentChange;
78+
}
7079
}

0 commit comments

Comments
 (0)