@@ -571,7 +571,7 @@ MergeTask peekQueue() {
571571 static class PriorityBlockingQueueWithBudget <E > {
572572 private final ToLongFunction <? super E > budgetFunction ;
573573 protected final PriorityQueue <E > enqueuedByBudget ;
574- private final IdentityHashMap <Wrap < E > , Long > unreleasedBudgetPerElement ;
574+ private final IdentityHashMap <ElementWithReleasableBudget , Long > unreleasedBudgetPerElement ;
575575 private final ReentrantLock lock ;
576576 private final Condition elementAvailable ;
577577 protected long availableBudget ;
@@ -612,7 +612,7 @@ ElementWithReleasableBudget take() throws InterruptedException {
612612 elementAvailable .await ();
613613 }
614614 // deducts and holds up that element's budget from the available budget
615- return new ElementWithReleasableBudget (enqueuedByBudget .poll (), peekBudget );
615+ return newElementWithReleasableBudget (enqueuedByBudget .poll (), peekBudget );
616616 } finally {
617617 lock .unlock ();
618618 }
@@ -647,23 +647,40 @@ int queueSize() {
647647 return enqueuedByBudget .size ();
648648 }
649649
650+ private ElementWithReleasableBudget newElementWithReleasableBudget (E element , long budget ) {
651+ ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget (element );
652+ assert this .lock .isHeldByCurrentThread ();
653+ // the taken element holds up some budget
654+ var prev = this .unreleasedBudgetPerElement .put (elementWithReleasableBudget , budget );
655+ assert prev == null ;
656+ this .availableBudget -= budget ;
657+ assert this .availableBudget >= 0L ;
658+ return elementWithReleasableBudget ;
659+ }
660+
661+ private void release (ElementWithReleasableBudget elementWithReleasableBudget ) {
662+ final ReentrantLock lock = this .lock ;
663+ lock .lock ();
664+ try {
665+ assert elementWithReleasableBudget .isClosed () == false ;
666+ // when the taken element is not used anymore, it will not influence subsequent computations for available budget,
667+ // but its allotted budget is not yet released
668+ var val = unreleasedBudgetPerElement .remove (elementWithReleasableBudget );
669+ assert val != null ;
670+ } finally {
671+ lock .unlock ();
672+ }
673+ }
674+
675+ private boolean isReleased (ElementWithReleasableBudget elementWithReleasableBudget ) {
676+ return unreleasedBudgetPerElement .containsKey (elementWithReleasableBudget ) == false ;
677+ }
678+
650679 class ElementWithReleasableBudget implements Releasable {
651- private final Wrap <E > wrappedElement ;
652-
653- private ElementWithReleasableBudget (E element , long budget ) {
654- // Wrap the element in a brand-new instance that's used as the key in the
655- // {@link PriorityBlockingQueueWithBudget#unreleasedBudgetPerElement} identity map.
656- // This allows the same exact "element" instance to hold budgets multiple times concurrently.
657- // This way we allow to re-enqueue and re-take an element before a previous take completed and
658- // released the budget.
659- this .wrappedElement = new Wrap <>(element );
660- assert PriorityBlockingQueueWithBudget .this .lock .isHeldByCurrentThread ();
661- // the taken element holds up some budget
662- var prev = PriorityBlockingQueueWithBudget .this .unreleasedBudgetPerElement .put (wrappedElement , budget );
663- assert prev == null ;
664- assert isClosed () == false ;
665- PriorityBlockingQueueWithBudget .this .availableBudget -= budget ;
666- assert PriorityBlockingQueueWithBudget .this .availableBudget >= 0L ;
680+ private final E element ;
681+
682+ private ElementWithReleasableBudget (E element ) {
683+ this .element = element ;
667684 }
668685
669686 /**
@@ -673,27 +690,17 @@ private ElementWithReleasableBudget(E element, long budget) {
673690 */
674691 @ Override
675692 public void close () {
676- final ReentrantLock lock = PriorityBlockingQueueWithBudget .this .lock ;
677- lock .lock ();
678- try {
679- assert isClosed () == false ;
680- // when the taken element is not used anymore, it will not influence subsequent available budget computations
681- unreleasedBudgetPerElement .remove (wrappedElement );
682- } finally {
683- lock .unlock ();
684- }
693+ PriorityBlockingQueueWithBudget .this .release (this );
685694 }
686695
687696 boolean isClosed () {
688- return unreleasedBudgetPerElement . containsKey ( wrappedElement ) == false ;
697+ return PriorityBlockingQueueWithBudget . this . isReleased ( this ) ;
689698 }
690699
691700 E element () {
692- return wrappedElement . element () ;
701+ return element ;
693702 }
694703 }
695-
696- private record Wrap <E >(E element ) {}
697704 }
698705
699706 private static long newTargetIORateBytesPerSec (
0 commit comments