2020import  org .elasticsearch .core .Nullable ;
2121import  org .elasticsearch .core .Releasable ;
2222import  org .elasticsearch .core .TimeValue ;
23+ import  org .elasticsearch .core .Tuple ;
2324import  org .elasticsearch .env .NodeEnvironment ;
2425import  org .elasticsearch .index .engine .ThreadPoolMergeScheduler .MergeTask ;
2526import  org .elasticsearch .monitor .fs .FsInfo ;
2829
2930import  java .io .Closeable ;
3031import  java .io .IOException ;
32+ import  java .util .ArrayList ;
3133import  java .util .Arrays ;
3234import  java .util .Comparator ;
3335import  java .util .IdentityHashMap ;
@@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
5961    /** How frequently we check disk usage (default: 5 seconds). */ 
6062    public  static  final  Setting <TimeValue > INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING  = Setting .positiveTimeSetting (
6163        "indices.merge.disk.check_interval" ,
62-         // disabled by default 
63-         // there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient) 
64-         // see: https://github.com/elastic/elasticsearch/issues/129335 
65-         TimeValue .timeValueSeconds (0 ),
64+         TimeValue .timeValueSeconds (5 ),
6665        Property .Dynamic ,
6766        Property .NodeScope 
6867    );
@@ -294,6 +293,10 @@ public boolean allDone() {
294293        return  queuedMergeTasks .isQueueEmpty () && runningMergeTasks .isEmpty () && ioThrottledMergeTasksCount .get () == 0L ;
295294    }
296295
296+     public  boolean  isMergingBlockedDueToInsufficientDiskSpace () {
297+         return  availableDiskSpacePeriodicMonitor .isScheduled () && queuedMergeTasks .queueHeadIsOverTheAvailableBudget ();
298+     }
299+ 
297300    /** 
298301     * Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time. 
299302     * A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level. 
@@ -550,9 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold(
550553
551554    static  class  MergeTaskPriorityBlockingQueue  extends  PriorityBlockingQueueWithBudget <MergeTask > {
552555        MergeTaskPriorityBlockingQueue () {
553-             // start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked) 
554-             // use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is 
555-             // updated according to the remaining disk space requirements of the currently running merge tasks 
556+             // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked) 
557+             // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated 
556558            super (MergeTask ::estimatedRemainingMergeSize , 0L );
557559        }
558560
@@ -563,7 +565,7 @@ long getAvailableBudget() {
563565
564566        // exposed for tests 
565567        MergeTask  peekQueue () {
566-             return  enqueuedByBudget .peek ();
568+             return  enqueuedByBudget .peek (). v1 () ;
567569        }
568570    }
569571
@@ -573,15 +575,15 @@ MergeTask peekQueue() {
573575     */ 
574576    static  class  PriorityBlockingQueueWithBudget <E > {
575577        private  final  ToLongFunction <? super  E > budgetFunction ;
576-         protected  final  PriorityQueue <E > enqueuedByBudget ;
578+         protected  final  PriorityQueue <Tuple < E ,  Long > > enqueuedByBudget ;
577579        private  final  IdentityHashMap <ElementWithReleasableBudget , Long > unreleasedBudgetPerElement ;
578580        private  final  ReentrantLock  lock ;
579581        private  final  Condition  elementAvailable ;
580582        protected  long  availableBudget ;
581583
582584        PriorityBlockingQueueWithBudget (ToLongFunction <? super  E > budgetFunction , long  initialAvailableBudget ) {
583585            this .budgetFunction  = budgetFunction ;
584-             this .enqueuedByBudget  = new  PriorityQueue <>(64 , Comparator .comparingLong (budgetFunction ));
586+             this .enqueuedByBudget  = new  PriorityQueue <>(64 , Comparator .comparingLong (Tuple :: v2 ));
585587            this .unreleasedBudgetPerElement  = new  IdentityHashMap <>();
586588            this .lock  = new  ReentrantLock ();
587589            this .elementAvailable  = lock .newCondition ();
@@ -592,7 +594,7 @@ boolean enqueue(E e) {
592594            final  ReentrantLock  lock  = this .lock ;
593595            lock .lock ();
594596            try  {
595-                 enqueuedByBudget .offer (e );
597+                 enqueuedByBudget .offer (new   Tuple <>( e ,  budgetFunction . applyAsLong ( e )) );
596598                elementAvailable .signal ();
597599            } finally  {
598600                lock .unlock ();
@@ -608,22 +610,22 @@ ElementWithReleasableBudget take() throws InterruptedException {
608610            final  ReentrantLock  lock  = this .lock ;
609611            lock .lockInterruptibly ();
610612            try  {
611-                 E  peek ;
612-                 long  peekBudget ;
613+                 Tuple <E , Long > head ;
613614                // blocks until the smallest budget element fits the currently available budget 
614-                 while  ((peek  = enqueuedByBudget .peek ()) == null  || ( peekBudget  =  budgetFunction . applyAsLong ( peek ) ) > availableBudget ) {
615+                 while  ((head  = enqueuedByBudget .peek ()) == null  || head . v2 ( ) > availableBudget ) {
615616                    elementAvailable .await ();
616617                }
618+                 head  = enqueuedByBudget .poll ();
617619                // deducts and holds up that element's budget from the available budget 
618-                 return  newElementWithReleasableBudget (enqueuedByBudget . poll (), peekBudget );
620+                 return  newElementWithReleasableBudget (head . v1 (), head . v2 () );
619621            } finally  {
620622                lock .unlock ();
621623            }
622624        }
623625
624626        /** 
625627         * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements 
626-          * that are still in use. The budget of in-use  elements is also updated ( by re-applying the budget function) . 
628+          * that are still in use. The elements budget  is also updated by re-applying the budget function. 
627629         * The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element 
628630         * is over this newly computed available budget. 
629631         */ 
@@ -632,20 +634,50 @@ void updateBudget(long availableBudget) {
632634            lock .lock ();
633635            try  {
634636                this .availableBudget  = availableBudget ;
635-                 // update the per-element budget (these are all the elements that are using any budget) 
637+                 // updates the budget of enqueued elements (and possibly reorders the priority queue) 
638+                 updateBudgetOfEnqueuedElementsAndReorderQueue ();
639+                 // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget) 
636640                unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e .element ()));
637-                 // available budget is decreased by the used per-element budget (for all  dequeued elements that are still in  use) 
641+                 // the  available budget is decreased by the budget of still in-use elements ( dequeued elements that are still in- use) 
638642                this .availableBudget  -= unreleasedBudgetPerElement .values ().stream ().mapToLong (i  -> i ).sum ();
639643                elementAvailable .signalAll ();
640644            } finally  {
641645                lock .unlock ();
642646            }
643647        }
644648
649+         private  void  updateBudgetOfEnqueuedElementsAndReorderQueue () {
650+             assert  this .lock .isHeldByCurrentThread ();
651+             int  queueSizeBefore  = enqueuedByBudget .size ();
652+             var  it  = enqueuedByBudget .iterator ();
653+             List <Tuple <E , Long >> elementsToReorder  = new  ArrayList <>();
654+             while  (it .hasNext ()) {
655+                 var  elementWithBudget  = it .next ();
656+                 Long  previousBudget  = elementWithBudget .v2 ();
657+                 long  latestBudget  = budgetFunction .applyAsLong (elementWithBudget .v1 ());
658+                 if  (previousBudget .equals (latestBudget ) == false ) {
659+                     // the budget (estimation) of an enqueued element has changed 
660+                     // this element will be reordered by removing and reinserting using the latest budget (estimation) 
661+                     it .remove ();
662+                     elementsToReorder .add (new  Tuple <>(elementWithBudget .v1 (), latestBudget ));
663+                 }
664+             }
665+             // reinsert elements based on the latest budget (estimation) 
666+             for  (var  reorderedElement  : elementsToReorder ) {
667+                 enqueuedByBudget .offer (reorderedElement );
668+             }
669+             assert  queueSizeBefore  == enqueuedByBudget .size ();
670+         }
671+ 
645672        boolean  isQueueEmpty () {
646673            return  enqueuedByBudget .isEmpty ();
647674        }
648675
676+         boolean  queueHeadIsOverTheAvailableBudget () {
677+             var  head  = enqueuedByBudget .peek ();
678+             return  head  != null  && head .v2 () > availableBudget ;
679+         }
680+ 
649681        int  queueSize () {
650682            return  enqueuedByBudget .size ();
651683        }
0 commit comments