@@ -140,83 +140,25 @@ public BalancedShardsAllocator(
140
140
141
141
@ Override
142
142
public void allocate (RoutingAllocation allocation ) {
143
- assert allocation .isSimulating () == false || balancerSettings .completeEarlyOnShardAssignmentChange ()
144
- : "inconsistent states: isSimulating ["
145
- + allocation .isSimulating ()
146
- + "] vs completeEarlyOnShardAssignmentChange ["
147
- + balancerSettings .completeEarlyOnShardAssignmentChange ()
148
- + "]" ;
149
143
if (allocation .metadata ().hasAnyIndices ()) {
150
144
// must not use licensed features when just starting up
151
145
writeLoadForecaster .refreshLicense ();
152
146
}
153
147
154
148
assert allocation .ignoreDisable () == false ;
155
- assert allocation .isSimulating () == false || allocation .routingNodes ().hasInactiveShards () == false
156
- : "expect no initializing shard, but got " + allocation .routingNodes ();
157
- // TODO: ES-12943 cannot assert the following because shards moved by commands are not simulated promptly in DesiredBalanceComputer
158
- // assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
159
- // : "expect no relocating shard, but got " + allocation.routingNodes();
160
149
161
150
if (allocation .routingNodes ().size () == 0 ) {
162
151
failAllocationOfNewPrimaries (allocation );
163
152
return ;
164
153
}
165
154
final BalancingWeights balancingWeights = balancingWeightsFactory .create ();
166
- final Balancer balancer = new Balancer (
167
- writeLoadForecaster ,
168
- allocation ,
169
- balancerSettings .getThreshold (),
170
- balancingWeights ,
171
- balancerSettings .completeEarlyOnShardAssignmentChange ()
172
- );
173
-
174
- boolean shardAssigned = false , shardMoved = false , shardBalanced = false ;
175
- try {
176
- shardAssigned = balancer .allocateUnassigned ();
177
- if (shardAssigned && balancerSettings .completeEarlyOnShardAssignmentChange ()) {
178
- return ;
179
- }
180
-
181
- shardMoved = balancer .moveShards ();
182
- if (shardMoved && balancerSettings .completeEarlyOnShardAssignmentChange ()) {
183
- return ;
184
- }
185
-
186
- shardBalanced = balancer .balance ();
187
- } finally {
188
- if (logger .isDebugEnabled ()) {
189
- logger .debug (
190
- "shards assigned: {}, shards moved: {}, shards balanced: {}, "
191
- + "routingNodes hasInactiveShards [{}], relocation count [{}]" ,
192
- shardAssigned ,
193
- shardMoved ,
194
- shardBalanced ,
195
- allocation .routingNodes ().hasInactiveShards (),
196
- allocation .routingNodes ().getRelocatingShardCount ()
197
- );
198
- }
199
- assert assertShardAssignmentChanges (allocation , shardAssigned , shardMoved , shardBalanced );
200
- // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
201
- collectAndRecordNodeWeightStats (balancer , balancingWeights , allocation );
202
- }
203
- }
155
+ final Balancer balancer = new Balancer (writeLoadForecaster , allocation , balancerSettings .getThreshold (), balancingWeights );
156
+ balancer .allocateUnassigned ();
157
+ balancer .moveShards ();
158
+ balancer .balance ();
204
159
205
- private boolean assertShardAssignmentChanges (
206
- RoutingAllocation allocation ,
207
- boolean shardAssigned ,
208
- boolean shardMoved ,
209
- boolean shardBalanced
210
- ) {
211
- if (allocation .isSimulating () == false ) {
212
- return true ;
213
- }
214
- assert shardAssigned == false || allocation .routingNodes ().hasInactiveShards ()
215
- : "expect initializing shard, but got " + allocation .routingNodes ();
216
-
217
- assert (shardMoved == false && shardBalanced == false ) || allocation .routingNodes ().getRelocatingShardCount () > 0
218
- : "expect relocating shard, but got " + allocation .routingNodes ();
219
- return true ;
160
+ // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
161
+ collectAndRecordNodeWeightStats (balancer , balancingWeights , allocation );
220
162
}
221
163
222
164
private void collectAndRecordNodeWeightStats (Balancer balancer , BalancingWeights balancingWeights , RoutingAllocation allocation ) {
@@ -246,8 +188,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
246
188
writeLoadForecaster ,
247
189
allocation ,
248
190
balancerSettings .getThreshold (),
249
- balancingWeightsFactory .create (),
250
- balancerSettings .completeEarlyOnShardAssignmentChange ()
191
+ balancingWeightsFactory .create ()
251
192
);
252
193
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision .NOT_TAKEN ;
253
194
MoveDecision moveDecision = MoveDecision .NOT_TAKEN ;
@@ -307,14 +248,12 @@ public static class Balancer {
307
248
private final Map <String , ModelNode > nodes ;
308
249
private final BalancingWeights balancingWeights ;
309
250
private final NodeSorters nodeSorters ;
310
- private final boolean completeEarlyOnShardAssignmentChange ;
311
251
312
252
private Balancer (
313
253
WriteLoadForecaster writeLoadForecaster ,
314
254
RoutingAllocation allocation ,
315
255
float threshold ,
316
- BalancingWeights balancingWeights ,
317
- boolean completeEarlyOnShardAssignmentChange
256
+ BalancingWeights balancingWeights
318
257
) {
319
258
this .writeLoadForecaster = writeLoadForecaster ;
320
259
this .allocation = allocation ;
@@ -327,7 +266,6 @@ private Balancer(
327
266
nodes = Collections .unmodifiableMap (buildModelFromAssigned ());
328
267
this .nodeSorters = balancingWeights .createNodeSorters (nodesArray (), this );
329
268
this .balancingWeights = balancingWeights ;
330
- this .completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange ;
331
269
}
332
270
333
271
private static long getShardDiskUsageInBytes (ShardRouting shardRouting , IndexMetadata indexMetadata , ClusterInfo clusterInfo ) {
@@ -420,7 +358,7 @@ private IndexMetadata indexMetadata(ProjectIndex index) {
420
358
* Balances the nodes on the cluster model according to the weight function.
421
359
* The actual balancing is delegated to {@link #balanceByWeights(NodeSorter)}
422
360
*/
423
- private boolean balance () {
361
+ private void balance () {
424
362
if (logger .isTraceEnabled ()) {
425
363
logger .trace ("Start balancing cluster" );
426
364
}
@@ -433,27 +371,21 @@ private boolean balance() {
433
371
* Therefore we only do a rebalance if we have fetched all information.
434
372
*/
435
373
logger .debug ("skipping rebalance due to in-flight shard/store fetches" );
436
- return false ;
374
+ return ;
437
375
}
438
376
if (allocation .deciders ().canRebalance (allocation ).type () != Type .YES ) {
439
377
logger .trace ("skipping rebalance as it is disabled" );
440
- return false ;
378
+ return ;
441
379
}
442
380
443
- boolean shardBalanced = false ;
444
381
// Balance each partition
445
382
for (NodeSorter nodeSorter : nodeSorters ) {
446
383
if (nodeSorter .modelNodes .length < 2 ) { /* skip if we only have one node */
447
384
logger .trace ("skipping rebalance as the partition has single node only" );
448
385
continue ;
449
386
}
450
- shardBalanced |= balanceByWeights (nodeSorter );
451
- // TODO: We could choose to account shardBalanced separately for each partition since they do not overlap.
452
- if (shardBalanced && completeEarlyOnShardAssignmentChange ) {
453
- return true ;
454
- }
387
+ balanceByWeights (nodeSorter );
455
388
}
456
- return shardBalanced ;
457
389
}
458
390
459
391
/**
@@ -599,8 +531,7 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin
599
531
* only, or in other words relocations that move the weight delta closer
600
532
* to {@code 0.0}
601
533
*/
602
- private boolean balanceByWeights (NodeSorter sorter ) {
603
- boolean shardBalanced = false ;
534
+ private void balanceByWeights (NodeSorter sorter ) {
604
535
final AllocationDeciders deciders = allocation .deciders ();
605
536
final ModelNode [] modelNodes = sorter .modelNodes ;
606
537
final float [] weights = sorter .weights ;
@@ -699,15 +630,6 @@ private boolean balanceByWeights(NodeSorter sorter) {
699
630
sorter .sort (0 , relevantNodes );
700
631
lowIdx = 0 ;
701
632
highIdx = relevantNodes - 1 ;
702
-
703
- shardBalanced = true ;
704
- if (completeEarlyOnShardAssignmentChange && routingNodes .getRelocatingShardCount () > 0 ) {
705
- // ES-12955: Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
706
- // It should not happen in production, i.e, throttling should not happen unless there is a prior shard
707
- // that is already relocating. But in tests, we have decider like RandomAllocationDecider that can
708
- // randomly return THROTTLE when there is no existing relocation.
709
- return true ;
710
- }
711
633
continue ;
712
634
}
713
635
}
@@ -729,7 +651,6 @@ private boolean balanceByWeights(NodeSorter sorter) {
729
651
}
730
652
}
731
653
}
732
- return shardBalanced ;
733
654
}
734
655
735
656
/**
@@ -800,8 +721,7 @@ protected int comparePivot(int j) {
800
721
* shard is created with an incremented version in the state
801
722
* {@link ShardRoutingState#INITIALIZING}.
802
723
*/
803
- public boolean moveShards () {
804
- boolean shardMoved = false ;
724
+ public void moveShards () {
805
725
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
806
726
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
807
727
// offloading the shards.
@@ -825,15 +745,10 @@ public boolean moveShards() {
825
745
if (logger .isTraceEnabled ()) {
826
746
logger .trace ("Moved shard [{}] to node [{}]" , shardRouting , targetNode .getRoutingNode ());
827
747
}
828
- shardMoved = true ;
829
- if (completeEarlyOnShardAssignmentChange ) {
830
- return true ;
831
- }
832
748
} else if (moveDecision .isDecisionTaken () && moveDecision .canRemain () == false ) {
833
749
logger .trace ("[{}][{}] can't move" , shardRouting .index (), shardRouting .id ());
834
750
}
835
751
}
836
- return shardMoved ;
837
752
}
838
753
839
754
/**
@@ -973,14 +888,14 @@ private Map<String, ModelNode> buildModelFromAssigned() {
973
888
* Allocates all given shards on the minimal eligible node for the shards index
974
889
* with respect to the weight function. All given shards must be unassigned.
975
890
*/
976
- private boolean allocateUnassigned () {
891
+ private void allocateUnassigned () {
977
892
RoutingNodes .UnassignedShards unassigned = routingNodes .unassigned ();
978
893
assert nodes .isEmpty () == false ;
979
894
if (logger .isTraceEnabled ()) {
980
895
logger .trace ("Start allocating unassigned shards" );
981
896
}
982
897
if (unassigned .isEmpty ()) {
983
- return false ;
898
+ return ;
984
899
}
985
900
986
901
/*
@@ -1017,7 +932,6 @@ private boolean allocateUnassigned() {
1017
932
int secondaryLength = 0 ;
1018
933
int primaryLength = primary .length ;
1019
934
ArrayUtil .timSort (primary , comparator );
1020
- boolean shardAssignmentChanged = false ;
1021
935
do {
1022
936
for (int i = 0 ; i < primaryLength ; i ++) {
1023
937
ShardRouting shard = primary [i ];
@@ -1035,7 +949,6 @@ private boolean allocateUnassigned() {
1035
949
1036
950
final long shardSize = getExpectedShardSize (shard , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE , allocation );
1037
951
shard = routingNodes .initializeShard (shard , minNode .getNodeId (), null , shardSize , allocation .changes ());
1038
- shardAssignmentChanged = true ;
1039
952
minNode .addShard (index , shard );
1040
953
if (shard .primary () == false ) {
1041
954
// copy over the same replica shards to the secondary array so they will get allocated
@@ -1059,9 +972,6 @@ private boolean allocateUnassigned() {
1059
972
assert allocationDecision .getAllocationStatus () == AllocationStatus .DECIDERS_THROTTLED ;
1060
973
final long shardSize = getExpectedShardSize (shard , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE , allocation );
1061
974
minNode .addShard (projectIndex (shard ), shard .initialize (minNode .getNodeId (), null , shardSize ));
1062
- // If we see a throttle decision in simulation, there must be other shards that got assigned before it.
1063
- assert allocation .isSimulating () == false || shardAssignmentChanged
1064
- : "shard " + shard + " was throttled but no other shards were assigned" ;
1065
975
} else {
1066
976
if (logger .isTraceEnabled ()) {
1067
977
logger .trace ("No Node found to assign shard [{}]" , shard );
@@ -1084,7 +994,6 @@ private boolean allocateUnassigned() {
1084
994
secondaryLength = 0 ;
1085
995
} while (primaryLength > 0 );
1086
996
// clear everything we have either added it or moved to ignoreUnassigned
1087
- return shardAssignmentChanged ;
1088
997
}
1089
998
1090
999
private ProjectIndex projectIndex (ShardRouting shardRouting ) {
0 commit comments