3535import java .util .IdentityHashMap ;
3636import java .util .Iterator ;
3737import java .util .List ;
38+ import java .util .Locale ;
3839import java .util .Map ;
3940import java .util .PriorityQueue ;
4041import java .util .Set ;
@@ -552,6 +553,8 @@ private static ByteSizeValue getFreeBytesThreshold(
552553 }
553554
554555 static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget <MergeTask > {
556+ private static final Logger LOGGER = LogManager .getLogger (MergeTaskPriorityBlockingQueue .class );
557+
555558 MergeTaskPriorityBlockingQueue () {
556559 // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
557560 // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
@@ -567,6 +570,55 @@ long getAvailableBudget() {
567570 MergeTask peekQueue () {
568571 return enqueuedByBudget .peek ().v1 ();
569572 }
573+
574+ @ Override
575+ void postBudgetUpdate () {
576+ assert super .lock .isHeldByCurrentThread ();
577+ Tuple <MergeTask , Long > head = enqueuedByBudget .peek ();
578+ if (head != null && head .v2 () > availableBudget ) {
579+ LOGGER .warn (
580+ String .format (
581+ Locale .ROOT ,
582+ "There are merge tasks enqueued but there's insufficient disk space available to execute them "
583+ + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)" ,
584+ head .v2 (),
585+ availableBudget
586+ )
587+ );
588+ if (LOGGER .isDebugEnabled ()) {
589+ if (unreleasedBudgetPerElement .isEmpty ()) {
590+ LOGGER .debug (
591+ String .format (
592+ Locale .ROOT ,
593+ "There are no merge tasks currently running, "
594+ + "but there are [%d] enqueued ones that are blocked because of insufficient disk space "
595+ + "(the smallest merge task requires [%d] bytes, but the available disk space is only [%d] bytes)" ,
596+ enqueuedByBudget .size (),
597+ head .v2 (),
598+ availableBudget
599+ )
600+ );
601+ } else {
602+ StringBuilder messageBuilder = new StringBuilder ();
603+ messageBuilder .append ("The following merge tasks are currently running [" );
604+ for (var runningMergeTask : super .unreleasedBudgetPerElement .entrySet ()) {
605+ messageBuilder .append (runningMergeTask .getKey ().element ().toString ());
606+ messageBuilder .append (" with disk space budgets in bytes " ).append (runningMergeTask .getValue ()).append (" , " );
607+ }
608+ messageBuilder .delete (messageBuilder .length () - 3 , messageBuilder .length ());
609+ messageBuilder .append ("], and there are [" )
610+ .append (enqueuedByBudget .size ())
611+ .append ("] additional enqueued ones that are blocked because of insufficient disk space" );
612+ messageBuilder .append (" (the smallest merge task requires [" )
613+ .append (head .v2 ())
614+ .append ("] bytes, but the available disk space is only [" )
615+ .append (availableBudget )
616+ .append ("] bytes)" );
617+ LOGGER .debug (messageBuilder .toString ());
618+ }
619+ }
620+ }
621+ }
570622 }
571623
572624 /**
@@ -576,7 +628,7 @@ MergeTask peekQueue() {
576628 static class PriorityBlockingQueueWithBudget <E > {
577629 private final ToLongFunction <? super E > budgetFunction ;
578630 protected final PriorityQueue <Tuple <E , Long >> enqueuedByBudget ;
579- private final IdentityHashMap <ElementWithReleasableBudget , Long > unreleasedBudgetPerElement ;
631+ protected final IdentityHashMap <ElementWithReleasableBudget , Budgets > unreleasedBudgetPerElement ;
580632 private final ReentrantLock lock ;
581633 private final Condition elementAvailable ;
582634 protected long availableBudget ;
@@ -637,15 +689,23 @@ void updateBudget(long availableBudget) {
637689 // updates the budget of enqueued elements (and possibly reorders the priority queue)
638690 updateBudgetOfEnqueuedElementsAndReorderQueue ();
639691 // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
640- unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e .element ()));
692+ unreleasedBudgetPerElement .replaceAll ((e , v ) -> v . updateBudgetEstimation ( budgetFunction .applyAsLong (e .element () )));
641693 // the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
642- this .availableBudget -= unreleasedBudgetPerElement .values ().stream ().mapToLong (i -> i ).sum ();
694+ this .availableBudget -= unreleasedBudgetPerElement .values ()
695+ .stream ()
696+ .mapToLong (i -> i .latestBudgetEstimationForElement )
697+ .sum ();
643698 elementAvailable .signalAll ();
699+ postBudgetUpdate ();
644700 } finally {
645701 lock .unlock ();
646702 }
647703 }
648704
705+ void postBudgetUpdate () {
706+ assert lock .isHeldByCurrentThread ();
707+ };
708+
649709 private void updateBudgetOfEnqueuedElementsAndReorderQueue () {
650710 assert this .lock .isHeldByCurrentThread ();
651711 int queueSizeBefore = enqueuedByBudget .size ();
@@ -686,7 +746,7 @@ private ElementWithReleasableBudget newElementWithReleasableBudget(E element, lo
686746 ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget (element );
687747 assert this .lock .isHeldByCurrentThread ();
688748 // the taken element holds up some budget
689- var prev = this .unreleasedBudgetPerElement .put (elementWithReleasableBudget , budget );
749+ var prev = this .unreleasedBudgetPerElement .put (elementWithReleasableBudget , new Budgets ( budget , budget , this . availableBudget ) );
690750 assert prev == null ;
691751 this .availableBudget -= budget ;
692752 assert this .availableBudget >= 0L ;
@@ -736,6 +796,16 @@ E element() {
736796 return element ;
737797 }
738798 }
799+
800+ record Budgets (long initialBudgetEstimationForElement , long latestBudgetEstimationForElement , long initialTotalAvailableBudget ) {
801+ Budgets updateBudgetEstimation (long latestBudgetEstimationForElement ) {
802+ return new Budgets (
803+ this .initialBudgetEstimationForElement ,
804+ latestBudgetEstimationForElement ,
805+ this .initialTotalAvailableBudget
806+ );
807+ }
808+ }
739809 }
740810
741811 private static long newTargetIORateBytesPerSec (
0 commit comments