@@ -140,25 +140,83 @@ public BalancedShardsAllocator(
140140
141141 @ Override
142142 public void allocate (RoutingAllocation allocation ) {
143+ assert allocation .isSimulating () == false || balancerSettings .completeEarlyOnShardAssignmentChange ()
144+ : "inconsistent states: isSimulating ["
145+ + allocation .isSimulating ()
146+ + "] vs completeEarlyOnShardAssignmentChange ["
147+ + balancerSettings .completeEarlyOnShardAssignmentChange ()
148+ + "]" ;
143149 if (allocation .metadata ().hasAnyIndices ()) {
144150 // must not use licensed features when just starting up
145151 writeLoadForecaster .refreshLicense ();
146152 }
147153
148154 assert allocation .ignoreDisable () == false ;
155+ assert allocation .isSimulating () == false || allocation .routingNodes ().hasInactiveShards () == false
156+ : "expect no initializing shard, but got " + allocation .routingNodes ();
157+ // TODO: ES-12943 cannot assert the following because shards moved by commands are not simulated promptly in DesiredBalanceComputer
158+ // assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
159+ // : "expect no relocating shard, but got " + allocation.routingNodes();
149160
150161 if (allocation .routingNodes ().size () == 0 ) {
151162 failAllocationOfNewPrimaries (allocation );
152163 return ;
153164 }
154165 final BalancingWeights balancingWeights = balancingWeightsFactory .create ();
155- final Balancer balancer = new Balancer (writeLoadForecaster , allocation , balancerSettings .getThreshold (), balancingWeights );
156- balancer .allocateUnassigned ();
157- balancer .moveShards ();
158- balancer .balance ();
166+ final Balancer balancer = new Balancer (
167+ writeLoadForecaster ,
168+ allocation ,
169+ balancerSettings .getThreshold (),
170+ balancingWeights ,
171+ balancerSettings .completeEarlyOnShardAssignmentChange ()
172+ );
173+
174+ boolean shardAssigned = false , shardMoved = false , shardBalanced = false ;
175+ try {
176+ shardAssigned = balancer .allocateUnassigned ();
177+ if (shardAssigned && balancerSettings .completeEarlyOnShardAssignmentChange ()) {
178+ return ;
179+ }
180+
181+ shardMoved = balancer .moveShards ();
182+ if (shardMoved && balancerSettings .completeEarlyOnShardAssignmentChange ()) {
183+ return ;
184+ }
185+
186+ shardBalanced = balancer .balance ();
187+ } finally {
188+ if (logger .isDebugEnabled ()) {
189+ logger .debug (
190+ "shards assigned: {}, shards moved: {}, shards balanced: {}, "
191+ + "routingNodes hasInactiveShards [{}], relocation count [{}]" ,
192+ shardAssigned ,
193+ shardMoved ,
194+ shardBalanced ,
195+ allocation .routingNodes ().hasInactiveShards (),
196+ allocation .routingNodes ().getRelocatingShardCount ()
197+ );
198+ }
199+ assert assertShardAssignmentChanges (allocation , shardAssigned , shardMoved , shardBalanced );
200+ // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
201+ collectAndRecordNodeWeightStats (balancer , balancingWeights , allocation );
202+ }
203+ }
159204
160- // Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
161- collectAndRecordNodeWeightStats (balancer , balancingWeights , allocation );
205+ private boolean assertShardAssignmentChanges (
206+ RoutingAllocation allocation ,
207+ boolean shardAssigned ,
208+ boolean shardMoved ,
209+ boolean shardBalanced
210+ ) {
211+ if (allocation .isSimulating () == false ) {
212+ return true ;
213+ }
214+ assert shardAssigned == false || allocation .routingNodes ().hasInactiveShards ()
215+ : "expect initializing shard, but got " + allocation .routingNodes ();
216+
217+ assert (shardMoved == false && shardBalanced == false ) || allocation .routingNodes ().getRelocatingShardCount () > 0
218+ : "expect relocating shard, but got " + allocation .routingNodes ();
219+ return true ;
162220 }
163221
164222 private void collectAndRecordNodeWeightStats (Balancer balancer , BalancingWeights balancingWeights , RoutingAllocation allocation ) {
@@ -188,7 +246,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
188246 writeLoadForecaster ,
189247 allocation ,
190248 balancerSettings .getThreshold (),
191- balancingWeightsFactory .create ()
249+ balancingWeightsFactory .create (),
250+ balancerSettings .completeEarlyOnShardAssignmentChange ()
192251 );
193252 AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision .NOT_TAKEN ;
194253 MoveDecision moveDecision = MoveDecision .NOT_TAKEN ;
@@ -248,12 +307,14 @@ public static class Balancer {
248307 private final Map <String , ModelNode > nodes ;
249308 private final BalancingWeights balancingWeights ;
250309 private final NodeSorters nodeSorters ;
310+ private final boolean completeEarlyOnShardAssignmentChange ;
251311
252312 private Balancer (
253313 WriteLoadForecaster writeLoadForecaster ,
254314 RoutingAllocation allocation ,
255315 float threshold ,
256- BalancingWeights balancingWeights
316+ BalancingWeights balancingWeights ,
317+ boolean completeEarlyOnShardAssignmentChange
257318 ) {
258319 this .writeLoadForecaster = writeLoadForecaster ;
259320 this .allocation = allocation ;
@@ -266,6 +327,7 @@ private Balancer(
266327 nodes = Collections .unmodifiableMap (buildModelFromAssigned ());
267328 this .nodeSorters = balancingWeights .createNodeSorters (nodesArray (), this );
268329 this .balancingWeights = balancingWeights ;
330+ this .completeEarlyOnShardAssignmentChange = completeEarlyOnShardAssignmentChange ;
269331 }
270332
271333 private static long getShardDiskUsageInBytes (ShardRouting shardRouting , IndexMetadata indexMetadata , ClusterInfo clusterInfo ) {
@@ -358,7 +420,7 @@ private IndexMetadata indexMetadata(ProjectIndex index) {
358420 * Balances the nodes on the cluster model according to the weight function.
359421 * The actual balancing is delegated to {@link #balanceByWeights(NodeSorter)}
360422 */
361- private void balance () {
423+ private boolean balance () {
362424 if (logger .isTraceEnabled ()) {
363425 logger .trace ("Start balancing cluster" );
364426 }
@@ -371,21 +433,27 @@ private void balance() {
371433 * Therefore we only do a rebalance if we have fetched all information.
372434 */
373435 logger .debug ("skipping rebalance due to in-flight shard/store fetches" );
374- return ;
436+ return false ;
375437 }
376438 if (allocation .deciders ().canRebalance (allocation ).type () != Type .YES ) {
377439 logger .trace ("skipping rebalance as it is disabled" );
378- return ;
440+ return false ;
379441 }
380442
443+ boolean shardBalanced = false ;
381444 // Balance each partition
382445 for (NodeSorter nodeSorter : nodeSorters ) {
383446 if (nodeSorter .modelNodes .length < 2 ) { /* skip if we only have one node */
384447 logger .trace ("skipping rebalance as the partition has single node only" );
385448 continue ;
386449 }
387- balanceByWeights (nodeSorter );
450+ shardBalanced |= balanceByWeights (nodeSorter );
451+ // TODO: We could choose to account shardBalanced separately for each partition since they do not overlap.
452+ if (shardBalanced && completeEarlyOnShardAssignmentChange ) {
453+ return true ;
454+ }
388455 }
456+ return shardBalanced ;
389457 }
390458
391459 /**
@@ -531,7 +599,8 @@ private MoveDecision decideRebalance(final ProjectIndex index, final ShardRoutin
531599 * only, or in other words relocations that move the weight delta closer
532600 * to {@code 0.0}
533601 */
534- private void balanceByWeights (NodeSorter sorter ) {
602+ private boolean balanceByWeights (NodeSorter sorter ) {
603+ boolean shardBalanced = false ;
535604 final AllocationDeciders deciders = allocation .deciders ();
536605 final ModelNode [] modelNodes = sorter .modelNodes ;
537606 final float [] weights = sorter .weights ;
@@ -630,6 +699,15 @@ private void balanceByWeights(NodeSorter sorter) {
630699 sorter .sort (0 , relevantNodes );
631700 lowIdx = 0 ;
632701 highIdx = relevantNodes - 1 ;
702+
703+ shardBalanced = true ;
704+ if (completeEarlyOnShardAssignmentChange && routingNodes .getRelocatingShardCount () > 0 ) {
705+ // ES-12955: Check routingNodes.getRelocatingShardCount() > 0 in case the first relocation is a THROTTLE.
706+ // It should not happen in production, i.e, throttling should not happen unless there is a prior shard
707+ // that is already relocating. But in tests, we have decider like RandomAllocationDecider that can
708+ // randomly return THROTTLE when there is no existing relocation.
709+ return true ;
710+ }
633711 continue ;
634712 }
635713 }
@@ -651,6 +729,7 @@ private void balanceByWeights(NodeSorter sorter) {
651729 }
652730 }
653731 }
732+ return shardBalanced ;
654733 }
655734
656735 /**
@@ -721,7 +800,8 @@ protected int comparePivot(int j) {
721800 * shard is created with an incremented version in the state
722801 * {@link ShardRoutingState#INITIALIZING}.
723802 */
724- public void moveShards () {
803+ public boolean moveShards () {
804+ boolean shardMoved = false ;
725805 // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
726806 // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
727807 // offloading the shards.
@@ -745,10 +825,15 @@ public void moveShards() {
745825 if (logger .isTraceEnabled ()) {
746826 logger .trace ("Moved shard [{}] to node [{}]" , shardRouting , targetNode .getRoutingNode ());
747827 }
828+ shardMoved = true ;
829+ if (completeEarlyOnShardAssignmentChange ) {
830+ return true ;
831+ }
748832 } else if (moveDecision .isDecisionTaken () && moveDecision .canRemain () == false ) {
749833 logger .trace ("[{}][{}] can't move" , shardRouting .index (), shardRouting .id ());
750834 }
751835 }
836+ return shardMoved ;
752837 }
753838
754839 /**
@@ -888,14 +973,14 @@ private Map<String, ModelNode> buildModelFromAssigned() {
888973 * Allocates all given shards on the minimal eligible node for the shards index
889974 * with respect to the weight function. All given shards must be unassigned.
890975 */
891- private void allocateUnassigned () {
976+ private boolean allocateUnassigned () {
892977 RoutingNodes .UnassignedShards unassigned = routingNodes .unassigned ();
893978 assert nodes .isEmpty () == false ;
894979 if (logger .isTraceEnabled ()) {
895980 logger .trace ("Start allocating unassigned shards" );
896981 }
897982 if (unassigned .isEmpty ()) {
898- return ;
983+ return false ;
899984 }
900985
901986 /*
@@ -932,6 +1017,7 @@ private void allocateUnassigned() {
9321017 int secondaryLength = 0 ;
9331018 int primaryLength = primary .length ;
9341019 ArrayUtil .timSort (primary , comparator );
1020+ boolean shardAssignmentChanged = false ;
9351021 do {
9361022 for (int i = 0 ; i < primaryLength ; i ++) {
9371023 ShardRouting shard = primary [i ];
@@ -949,6 +1035,7 @@ private void allocateUnassigned() {
9491035
9501036 final long shardSize = getExpectedShardSize (shard , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE , allocation );
9511037 shard = routingNodes .initializeShard (shard , minNode .getNodeId (), null , shardSize , allocation .changes ());
1038+ shardAssignmentChanged = true ;
9521039 minNode .addShard (index , shard );
9531040 if (shard .primary () == false ) {
9541041 // copy over the same replica shards to the secondary array so they will get allocated
@@ -972,6 +1059,9 @@ private void allocateUnassigned() {
9721059 assert allocationDecision .getAllocationStatus () == AllocationStatus .DECIDERS_THROTTLED ;
9731060 final long shardSize = getExpectedShardSize (shard , ShardRouting .UNAVAILABLE_EXPECTED_SHARD_SIZE , allocation );
9741061 minNode .addShard (projectIndex (shard ), shard .initialize (minNode .getNodeId (), null , shardSize ));
1062+ // If we see a throttle decision in simulation, there must be other shards that got assigned before it.
1063+ assert allocation .isSimulating () == false || shardAssignmentChanged
1064+ : "shard " + shard + " was throttled but no other shards were assigned" ;
9751065 } else {
9761066 if (logger .isTraceEnabled ()) {
9771067 logger .trace ("No Node found to assign shard [{}]" , shard );
@@ -994,6 +1084,7 @@ private void allocateUnassigned() {
9941084 secondaryLength = 0 ;
9951085 } while (primaryLength > 0 );
9961086 // clear everything we have either added it or moved to ignoreUnassigned
1087+ return shardAssignmentChanged ;
9971088 }
9981089
9991090 private ProjectIndex projectIndex (ShardRouting shardRouting ) {
0 commit comments