2020import org .elasticsearch .core .Nullable ;
2121import org .elasticsearch .core .Releasable ;
2222import org .elasticsearch .core .TimeValue ;
23+ import org .elasticsearch .core .Tuple ;
2324import org .elasticsearch .env .NodeEnvironment ;
2425import org .elasticsearch .index .engine .ThreadPoolMergeScheduler .MergeTask ;
2526import org .elasticsearch .monitor .fs .FsInfo ;
2829
2930import java .io .Closeable ;
3031import java .io .IOException ;
32+ import java .util .ArrayList ;
3133import java .util .Arrays ;
3234import java .util .Comparator ;
3335import java .util .IdentityHashMap ;
@@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
5961 /** How frequently we check disk usage (default: 5 seconds). */
6062 public static final Setting <TimeValue > INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting .positiveTimeSetting (
6163 "indices.merge.disk.check_interval" ,
62- // disabled by default
63- // there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient)
64- // see: https://github.com/elastic/elasticsearch/issues/129335
65- TimeValue .timeValueSeconds (0 ),
64+ TimeValue .timeValueSeconds (5 ),
6665 Property .Dynamic ,
6766 Property .NodeScope
6867 );
@@ -294,6 +293,10 @@ public boolean allDone() {
294293 return queuedMergeTasks .isQueueEmpty () && runningMergeTasks .isEmpty () && ioThrottledMergeTasksCount .get () == 0L ;
295294 }
296295
296+ public boolean isMergingBlockedDueToInsufficientDiskSpace () {
297+ return availableDiskSpacePeriodicMonitor .isScheduled () && queuedMergeTasks .queueHeadIsOverTheAvailableBudget ();
298+ }
299+
297300 /**
298301 * Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time.
299302 * A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level.
@@ -550,9 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold(
550553
551554 static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget <MergeTask > {
552555 MergeTaskPriorityBlockingQueue () {
553- // start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked)
554- // use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is
555- // updated according to the remaining disk space requirements of the currently running merge tasks
556+ // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
557+ // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
556558 super (MergeTask ::estimatedRemainingMergeSize , 0L );
557559 }
558560
@@ -563,7 +565,7 @@ long getAvailableBudget() {
563565
564566 // exposed for tests
565567 MergeTask peekQueue () {
566- return enqueuedByBudget .peek ();
568+ return enqueuedByBudget .peek (). v1 () ;
567569 }
568570 }
569571
@@ -573,15 +575,15 @@ MergeTask peekQueue() {
573575 */
574576 static class PriorityBlockingQueueWithBudget <E > {
575577 private final ToLongFunction <? super E > budgetFunction ;
576- protected final PriorityQueue <E > enqueuedByBudget ;
578+ protected final PriorityQueue <Tuple < E , Long > > enqueuedByBudget ;
577579 private final IdentityHashMap <ElementWithReleasableBudget , Long > unreleasedBudgetPerElement ;
578580 private final ReentrantLock lock ;
579581 private final Condition elementAvailable ;
580582 protected long availableBudget ;
581583
582584 PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction , long initialAvailableBudget ) {
583585 this .budgetFunction = budgetFunction ;
584- this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (budgetFunction ));
586+ this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (Tuple :: v2 ));
585587 this .unreleasedBudgetPerElement = new IdentityHashMap <>();
586588 this .lock = new ReentrantLock ();
587589 this .elementAvailable = lock .newCondition ();
@@ -592,7 +594,7 @@ boolean enqueue(E e) {
592594 final ReentrantLock lock = this .lock ;
593595 lock .lock ();
594596 try {
595- enqueuedByBudget .offer (e );
597+ enqueuedByBudget .offer (new Tuple <>( e , budgetFunction . applyAsLong ( e )) );
596598 elementAvailable .signal ();
597599 } finally {
598600 lock .unlock ();
@@ -608,22 +610,22 @@ ElementWithReleasableBudget take() throws InterruptedException {
608610 final ReentrantLock lock = this .lock ;
609611 lock .lockInterruptibly ();
610612 try {
611- E peek ;
612- long peekBudget ;
613+ Tuple <E , Long > head ;
613614 // blocks until the smallest budget element fits the currently available budget
614- while ((peek = enqueuedByBudget .peek ()) == null || ( peekBudget = budgetFunction . applyAsLong ( peek ) ) > availableBudget ) {
615+ while ((head = enqueuedByBudget .peek ()) == null || head . v2 ( ) > availableBudget ) {
615616 elementAvailable .await ();
616617 }
618+ head = enqueuedByBudget .poll ();
617619 // deducts and holds up that element's budget from the available budget
618- return newElementWithReleasableBudget (enqueuedByBudget . poll (), peekBudget );
620+ return newElementWithReleasableBudget (head . v1 (), head . v2 () );
619621 } finally {
620622 lock .unlock ();
621623 }
622624 }
623625
624626 /**
625627 * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
626- * that are still in use. The budget of in-use elements is also updated ( by re-applying the budget function) .
628+ * that are still in use. The elements budget is also updated by re-applying the budget function.
627629 * The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element
628630 * is over this newly computed available budget.
629631 */
@@ -632,20 +634,50 @@ void updateBudget(long availableBudget) {
632634 lock .lock ();
633635 try {
634636 this .availableBudget = availableBudget ;
635- // update the per-element budget (these are all the elements that are using any budget)
637+ // updates the budget of enqueued elements (and possibly reorders the priority queue)
638+ updateBudgetOfEnqueuedElementsAndReorderQueue ();
639+ // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
636640 unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e .element ()));
637- // available budget is decreased by the used per-element budget (for all dequeued elements that are still in use)
641+ // the available budget is decreased by the budget of still in-use elements ( dequeued elements that are still in- use)
638642 this .availableBudget -= unreleasedBudgetPerElement .values ().stream ().mapToLong (i -> i ).sum ();
639643 elementAvailable .signalAll ();
640644 } finally {
641645 lock .unlock ();
642646 }
643647 }
644648
649+ private void updateBudgetOfEnqueuedElementsAndReorderQueue () {
650+ assert this .lock .isHeldByCurrentThread ();
651+ int queueSizeBefore = enqueuedByBudget .size ();
652+ var it = enqueuedByBudget .iterator ();
653+ List <Tuple <E , Long >> elementsToReorder = new ArrayList <>();
654+ while (it .hasNext ()) {
655+ var elementWithBudget = it .next ();
656+ Long previousBudget = elementWithBudget .v2 ();
657+ long latestBudget = budgetFunction .applyAsLong (elementWithBudget .v1 ());
658+ if (previousBudget .equals (latestBudget ) == false ) {
659+ // the budget (estimation) of an enqueued element has changed
660+ // this element will be reordered by removing and reinserting using the latest budget (estimation)
661+ it .remove ();
662+ elementsToReorder .add (new Tuple <>(elementWithBudget .v1 (), latestBudget ));
663+ }
664+ }
665+ // reinsert elements based on the latest budget (estimation)
666+ for (var reorderedElement : elementsToReorder ) {
667+ enqueuedByBudget .offer (reorderedElement );
668+ }
669+ assert queueSizeBefore == enqueuedByBudget .size ();
670+ }
671+
645672 boolean isQueueEmpty () {
646673 return enqueuedByBudget .isEmpty ();
647674 }
648675
676+ boolean queueHeadIsOverTheAvailableBudget () {
677+ var head = enqueuedByBudget .peek ();
678+ return head != null && head .v2 () > availableBudget ;
679+ }
680+
649681 int queueSize () {
650682 return enqueuedByBudget .size ();
651683 }
0 commit comments