@@ -489,7 +489,7 @@ private static ByteSizeValue getFreeBytesThreshold(
489489 static class PriorityBlockingQueueWithBudget <E > {
490490 private final ToLongFunction <? super E > budgetFunction ;
491491 private final PriorityQueue <E > enqueuedByBudget ;
492- private final IdentityHashMap <E , Long > unreleasedBudgetPerElement ;
492+ private final IdentityHashMap <Wrap < E > , Long > unreleasedBudgetPerElement ;
493493 private final ReentrantLock lock ;
494494 private final Condition elementAvailable ;
495495 private long availableBudget ;
@@ -540,7 +540,7 @@ void updateBudget(long availableBudget) {
540540 try {
541541 this .availableBudget = availableBudget ;
542542 // update the per-element budget (these are all the elements that are using any budget)
543- unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e ));
543+ unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e . element () ));
544544 // available budget is decreased by the used per-element budget (for all dequeued elements that are still in use)
545545 this .availableBudget -= unreleasedBudgetPerElement .values ().stream ().reduce (0L , Long ::sum );
546546 elementAvailable .signalAll ();
@@ -558,13 +558,13 @@ int size() {
558558 }
559559
560560 class ElementWithReleasableBudget implements Releasable {
561- private final E element ;
561+ private final Wrap < E > wrappedElement ;
562562
563563 private ElementWithReleasableBudget (E element , long budget ) {
564- this .element = element ;
564+ this .wrappedElement = new Wrap <>( element ) ;
565565 assert PriorityBlockingQueueWithBudget .this .lock .isHeldByCurrentThread ();
566566 // the taken element holds up some budget
567- var prev = unreleasedBudgetPerElement .put (element , budget );
567+ var prev = unreleasedBudgetPerElement .put (wrappedElement , budget );
568568 assert prev == null ;
569569 availableBudget -= budget ;
570570 assert availableBudget >= 0L ;
@@ -575,19 +575,21 @@ public void close() {
575575 final ReentrantLock lock = PriorityBlockingQueueWithBudget .this .lock ;
576576 lock .lock ();
577577 try {
578- assert unreleasedBudgetPerElement .containsKey (element );
578+ assert unreleasedBudgetPerElement .containsKey (wrappedElement );
579579 // when the taken element is not used anymore, the budget it hold is released
580- availableBudget += unreleasedBudgetPerElement .remove (element );
580+ availableBudget += unreleasedBudgetPerElement .remove (wrappedElement );
581581 elementAvailable .signalAll ();
582582 } finally {
583583 lock .unlock ();
584584 }
585585 }
586586
587587 E element () {
588- return element ;
588+ return wrappedElement . element () ;
589589 }
590590 }
591+
592+ private record Wrap <E >(E element ) {}
591593 }
592594
593595 @ Override
0 commit comments