2020import org .elasticsearch .core .Nullable ;
2121import org .elasticsearch .core .Releasable ;
2222import org .elasticsearch .core .TimeValue ;
23+ import org .elasticsearch .core .Tuple ;
2324import org .elasticsearch .env .NodeEnvironment ;
2425import org .elasticsearch .index .engine .ThreadPoolMergeScheduler .MergeTask ;
2526import org .elasticsearch .monitor .fs .FsInfo ;
2829
2930import java .io .Closeable ;
3031import java .io .IOException ;
32+ import java .util .ArrayList ;
3133import java .util .Arrays ;
3234import java .util .Comparator ;
3335import java .util .IdentityHashMap ;
@@ -550,9 +552,8 @@ private static ByteSizeValue getFreeBytesThreshold(
550552
551553 static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget <MergeTask > {
552554 MergeTaskPriorityBlockingQueue () {
553- // start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked)
554- // use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is
555- // updated according to the remaining disk space requirements of the currently running merge tasks
555+ // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
556+ // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
556557 super (MergeTask ::estimatedRemainingMergeSize , 0L );
557558 }
558559
@@ -563,7 +564,7 @@ long getAvailableBudget() {
563564
564565 // exposed for tests
565566 MergeTask peekQueue () {
566- return enqueuedByBudget .peek ();
567+ return enqueuedByBudget .peek (). v1 () ;
567568 }
568569 }
569570
@@ -573,15 +574,15 @@ MergeTask peekQueue() {
573574 */
574575 static class PriorityBlockingQueueWithBudget <E > {
575576 private final ToLongFunction <? super E > budgetFunction ;
576- protected final PriorityQueue <E > enqueuedByBudget ;
577+ protected final PriorityQueue <Tuple < E , Long > > enqueuedByBudget ;
577578 private final IdentityHashMap <ElementWithReleasableBudget , Long > unreleasedBudgetPerElement ;
578579 private final ReentrantLock lock ;
579580 private final Condition elementAvailable ;
580581 protected long availableBudget ;
581582
582583 PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction , long initialAvailableBudget ) {
583584 this .budgetFunction = budgetFunction ;
584- this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (budgetFunction ));
585+ this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (Tuple :: v2 ));
585586 this .unreleasedBudgetPerElement = new IdentityHashMap <>();
586587 this .lock = new ReentrantLock ();
587588 this .elementAvailable = lock .newCondition ();
@@ -592,7 +593,7 @@ boolean enqueue(E e) {
592593 final ReentrantLock lock = this .lock ;
593594 lock .lock ();
594595 try {
595- enqueuedByBudget .offer (e );
596+ enqueuedByBudget .offer (new Tuple <>( e , budgetFunction . applyAsLong ( e )) );
596597 elementAvailable .signal ();
597598 } finally {
598599 lock .unlock ();
@@ -608,22 +609,22 @@ ElementWithReleasableBudget take() throws InterruptedException {
608609 final ReentrantLock lock = this .lock ;
609610 lock .lockInterruptibly ();
610611 try {
611- E peek ;
612- long peekBudget ;
612+ Tuple <E , Long > head ;
613613 // blocks until the smallest budget element fits the currently available budget
614- while ((peek = enqueuedByBudget .peek ()) == null || ( peekBudget = budgetFunction . applyAsLong ( peek ) ) > availableBudget ) {
614+ while ((head = enqueuedByBudget .peek ()) == null || head . v2 ( ) > availableBudget ) {
615615 elementAvailable .await ();
616616 }
617+ head = enqueuedByBudget .poll ();
617618 // deducts and holds up that element's budget from the available budget
618- return newElementWithReleasableBudget (enqueuedByBudget . poll (), peekBudget );
619+ return newElementWithReleasableBudget (head . v1 (), head . v2 () );
619620 } finally {
620621 lock .unlock ();
621622 }
622623 }
623624
624625 /**
625626 * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
626- * that are still in use. The budget of in-use elements is also updated ( by re-applying the budget function) .
627+ * that are still in use. The elements budget is also updated by re-applying the budget function.
627628 * The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element
628629 * is over this newly computed available budget.
629630 */
@@ -632,16 +633,41 @@ void updateBudget(long availableBudget) {
632633 lock .lock ();
633634 try {
634635 this .availableBudget = availableBudget ;
635- // update the per-element budget (these are all the elements that are using any budget)
636+ // updates the budget of enqueued elements (and possibly reorders the priority queue)
637+ updateBudgetOfEnqueuedElementsAndReorderQueue ();
638+ // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
636639 unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e .element ()));
637- // available budget is decreased by the used per-element budget (for all dequeued elements that are still in use)
640+ // the available budget is decreased by the budget of still in-use elements ( dequeued elements that are still in- use)
638641 this .availableBudget -= unreleasedBudgetPerElement .values ().stream ().mapToLong (i -> i ).sum ();
639642 elementAvailable .signalAll ();
640643 } finally {
641644 lock .unlock ();
642645 }
643646 }
644647
648+ private void updateBudgetOfEnqueuedElementsAndReorderQueue () {
649+ assert this .lock .isHeldByCurrentThread ();
650+ int queueSizeBefore = enqueuedByBudget .size ();
651+ var it = enqueuedByBudget .iterator ();
652+ List <Tuple <E , Long >> elementsToReorder = new ArrayList <>();
653+ while (it .hasNext ()) {
654+ var elementWithBudget = it .next ();
655+ Long previousBudget = elementWithBudget .v2 ();
656+ long latestBudget = budgetFunction .applyAsLong (elementWithBudget .v1 ());
657+ if (previousBudget .equals (latestBudget ) == false ) {
658+ // the budget (estimation) of an enqueued element has changed
659+ // this element will be reordered by removing and reinserting using the latest budget (estimation)
660+ it .remove ();
661+ elementsToReorder .add (new Tuple <>(elementWithBudget .v1 (), latestBudget ));
662+ }
663+ }
664+ // reinsert elements based on the latest budget (estimation)
665+ for (var reorderedElement : elementsToReorder ) {
666+ enqueuedByBudget .offer (reorderedElement );
667+ }
668+ assert queueSizeBefore == enqueuedByBudget .size ();
669+ }
670+
645671 boolean isQueueEmpty () {
646672 return enqueuedByBudget .isEmpty ();
647673 }
0 commit comments