2929import  org .elasticsearch .cluster .service .MasterService ;
3030import  org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
3131import  org .elasticsearch .common .Priority ;
32+ import  org .elasticsearch .common .Strings ;
3233import  org .elasticsearch .common .metrics .CounterMetric ;
3334import  org .elasticsearch .common .metrics .MeanMetric ;
3435import  org .elasticsearch .common .settings .ClusterSettings ;
4344import  java .util .Set ;
4445import  java .util .concurrent .ConcurrentLinkedQueue ;
4546import  java .util .concurrent .atomic .AtomicLong ;
47+ import  java .util .concurrent .atomic .AtomicReference ;
4648
4749/** 
4850 * A {@link ShardsAllocator} which asynchronously refreshes the desired balance held by the {@link DesiredBalanceComputer} and then takes 
@@ -62,7 +64,7 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator {
6264    private  final  AtomicLong  indexGenerator  = new  AtomicLong (-1 );
6365    private  final  ConcurrentLinkedQueue <List <MoveAllocationCommand >> pendingDesiredBalanceMoves  = new  ConcurrentLinkedQueue <>();
6466    private  final  MasterServiceTaskQueue <ReconcileDesiredBalanceTask > masterServiceTaskQueue ;
65-     private  volatile   DesiredBalance   currentDesiredBalance  = DesiredBalance .INITIAL ;
67+     private  final   AtomicReference < DesiredBalance >  currentDesiredBalanceRef  = new   AtomicReference <>( DesiredBalance .NOT_MASTER ) ;
6668    private  volatile  boolean  resetCurrentDesiredBalance  = false ;
6769    private  final  Set <String > processedNodeShutdowns  = new  HashSet <>();
6870    private  final  DesiredBalanceMetrics  desiredBalanceMetrics ;
@@ -129,6 +131,12 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
129131                long  index  = desiredBalanceInput .index ();
130132                logger .debug ("Starting desired balance computation for [{}]" , index );
131133
134+                 final  DesiredBalance  initialDesiredBalance  = getInitialDesiredBalance ();
135+                 if  (initialDesiredBalance  == DesiredBalance .NOT_MASTER ) {
136+                     logger .debug ("Abort desired balance computation because node is no longer master" );
137+                     return ;
138+                 }
139+ 
132140                recordTime (
133141                    cumulativeComputationTime ,
134142                    // We set currentDesiredBalance back to INITIAL when the node stands down as master in onNoLongerMaster. 
@@ -137,7 +145,7 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
137145                    // lead to unexpected behaviours for tests. See also https://github.com/elastic/elasticsearch/pull/116904 
138146                    () -> setCurrentDesiredBalance (
139147                        desiredBalanceComputer .compute (
140-                             getInitialDesiredBalance () ,
148+                             initialDesiredBalance ,
141149                            desiredBalanceInput ,
142150                            pendingDesiredBalanceMoves ,
143151                            this ::isFresh 
@@ -146,7 +154,17 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
146154                );
147155                computationsExecuted .inc ();
148156
149-                 if  (currentDesiredBalance .finishReason () == DesiredBalance .ComputationFinishReason .STOP_EARLY ) {
157+                 final  DesiredBalance  currentDesiredBalance  = currentDesiredBalanceRef .get ();
158+                 if  (currentDesiredBalance  == DesiredBalance .NOT_MASTER  || currentDesiredBalance  == DesiredBalance .BECOME_MASTER_INITIAL ) {
159+                     logger .debug (
160+                         () -> Strings .format (
161+                             "Desired balance computation for [%s] is discarded since master has concurrently changed. " 
162+                                 + "Current desiredBalance=[%s]" ,
163+                             index ,
164+                             currentDesiredBalance 
165+                         )
166+                     );
167+                 } else  if  (currentDesiredBalance .finishReason () == DesiredBalance .ComputationFinishReason .STOP_EARLY ) {
150168                    logger .debug (
151169                        "Desired balance computation for [{}] terminated early with partial result, scheduling reconciliation" ,
152170                        index 
@@ -164,10 +182,13 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
164182            }
165183
166184            private  DesiredBalance  getInitialDesiredBalance () {
185+                 final  DesiredBalance  currentDesiredBalance  = currentDesiredBalanceRef .get ();
167186                if  (resetCurrentDesiredBalance ) {
168187                    logger .info ("Resetting current desired balance" );
169188                    resetCurrentDesiredBalance  = false ;
170-                     return  new  DesiredBalance (currentDesiredBalance .lastConvergedIndex (), Map .of ());
189+                     return  currentDesiredBalance  == DesiredBalance .NOT_MASTER 
190+                         ? DesiredBalance .NOT_MASTER 
191+                         : new  DesiredBalance (currentDesiredBalance .lastConvergedIndex (), Map .of ());
171192                } else  {
172193                    return  currentDesiredBalance ;
173194                }
@@ -215,6 +236,10 @@ public void allocate(RoutingAllocation allocation, ActionListener<Void> listener
215236        var  index  = indexGenerator .incrementAndGet ();
216237        logger .debug ("Executing allocate for [{}]" , index );
217238        queue .add (index , listener );
239+         // This can only run on master, so unset not-master if exists 
240+         if  (currentDesiredBalanceRef .compareAndSet (DesiredBalance .NOT_MASTER , DesiredBalance .BECOME_MASTER_INITIAL )) {
241+             logger .debug ("initialized desired balance for becoming master" );
242+         }
218243        desiredBalanceComputation .onNewInput (DesiredBalanceInput .create (index , allocation ));
219244
220245        if  (allocation .routingTable ().indicesRouting ().isEmpty ()) {
@@ -224,7 +249,7 @@ public void allocate(RoutingAllocation allocation, ActionListener<Void> listener
224249        // Starts reconciliation towards desired balance that might have not been updated with a recent calculation yet. 
225250        // This is fine as balance should have incremental rather than radical changes. 
226251        // This should speed up achieving the desired balance in cases current state is still different from it (due to THROTTLING). 
227-         reconcile (currentDesiredBalance , allocation );
252+         reconcile (currentDesiredBalanceRef . get () , allocation );
228253    }
229254
230255    private  void  processNodeShutdowns (ClusterState  clusterState ) {
@@ -267,16 +292,26 @@ private static List<MoveAllocationCommand> getMoveCommands(AllocationCommands co
267292    }
268293
269294    private  void  setCurrentDesiredBalance (DesiredBalance  newDesiredBalance ) {
270-         if  (logger .isTraceEnabled ()) {
271-             var  diff  = DesiredBalance .hasChanges (currentDesiredBalance , newDesiredBalance )
272-                 ? "Diff: "  + DesiredBalance .humanReadableDiff (currentDesiredBalance , newDesiredBalance )
273-                 : "No changes" ;
274-             logger .trace ("Desired balance updated: {}. {}" , newDesiredBalance , diff );
275-         } else  {
276-             logger .debug ("Desired balance updated for [{}]" , newDesiredBalance .lastConvergedIndex ());
295+         while  (true ) {
296+             final  var  oldDesiredBalance  = currentDesiredBalanceRef .get ();
297+             if  (oldDesiredBalance  == DesiredBalance .NOT_MASTER ) {
298+                 logger .debug ("discard desired balance for [{}] since node is no longer master" , newDesiredBalance .lastConvergedIndex ());
299+                 return ;
300+             }
301+ 
302+             if  (currentDesiredBalanceRef .compareAndSet (oldDesiredBalance , newDesiredBalance )) {
303+                 if  (logger .isTraceEnabled ()) {
304+                     var  diff  = DesiredBalance .hasChanges (oldDesiredBalance , newDesiredBalance )
305+                         ? "Diff: "  + DesiredBalance .humanReadableDiff (oldDesiredBalance , newDesiredBalance )
306+                         : "No changes" ;
307+                     logger .trace ("Desired balance updated: {}. {}" , newDesiredBalance , diff );
308+                 } else  {
309+                     logger .debug ("Desired balance updated for [{}]" , newDesiredBalance .lastConvergedIndex ());
310+                 }
311+                 computedShardMovements .inc (DesiredBalance .shardMovements (oldDesiredBalance , newDesiredBalance ));
312+                 break ;
313+             }
277314        }
278-         computedShardMovements .inc (DesiredBalance .shardMovements (currentDesiredBalance , newDesiredBalance ));
279-         currentDesiredBalance  = newDesiredBalance ;
280315    }
281316
282317    protected  void  submitReconcileTask (DesiredBalance  desiredBalance ) {
@@ -316,7 +351,7 @@ public void execute(RoutingAllocation allocation) {
316351    }
317352
318353    public  DesiredBalance  getDesiredBalance () {
319-         return  currentDesiredBalance ;
354+         return  currentDesiredBalanceRef . get () ;
320355    }
321356
322357    public  void  resetDesiredBalance () {
@@ -325,7 +360,7 @@ public void resetDesiredBalance() {
325360
326361    public  DesiredBalanceStats  getStats () {
327362        return  new  DesiredBalanceStats (
328-             Math .max (currentDesiredBalance .lastConvergedIndex (), 0L ),
363+             Math .max (currentDesiredBalanceRef . get () .lastConvergedIndex (), 0L ),
329364            desiredBalanceComputation .isActive (),
330365            computationsSubmitted .count (),
331366            computationsExecuted .count (),
@@ -342,7 +377,7 @@ public DesiredBalanceStats getStats() {
342377
343378    private  void  onNoLongerMaster () {
344379        if  (indexGenerator .getAndSet (-1 ) != -1 ) {
345-             currentDesiredBalance  =  DesiredBalance .INITIAL ;
380+             currentDesiredBalanceRef . set ( DesiredBalance .NOT_MASTER ) ;
346381            queue .completeAllAsNotMaster ();
347382            pendingDesiredBalanceMoves .clear ();
348383            desiredBalanceReconciler .clear ();
@@ -412,7 +447,7 @@ private static void discardSupersededTasks(
412447
413448    // only for tests - in production, this happens after reconciliation 
414449    protected  final  void  completeToLastConvergedIndex () {
415-         queue .complete (currentDesiredBalance .lastConvergedIndex ());
450+         queue .complete (currentDesiredBalanceRef . get () .lastConvergedIndex ());
416451    }
417452
418453    private  void  recordTime (CounterMetric  metric , Runnable  action ) {
0 commit comments