@@ -550,103 +550,10 @@ static final class BudgetOverflowException extends Exception {
550550 public BudgetOverflowException () {
551551 super ("budget overflow" );
552552 }
553- }
554- }
555-
556- static class PriorityBlockingQueueWithBudget <E > {
557- private final ToLongFunction <? super E > budgetFunction ;
558- private final PriorityQueue <E > enqueuedByBudget ;
559- private final IdentityHashMap <E , Long > unreleasedBudgetPerElement ;
560- private final ReentrantLock lock ;
561- private final Condition elementAvailable ;
562- private long availableBudget ;
563-
564- PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction , long availableBudget ) {
565- this .budgetFunction = budgetFunction ;
566- this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (budgetFunction ));
567- this .unreleasedBudgetPerElement = new IdentityHashMap <>();
568- this .lock = new ReentrantLock ();
569- this .elementAvailable = lock .newCondition ();
570- this .availableBudget = availableBudget ;
571- }
572-
573- boolean enqueue (E e ) {
574- final ReentrantLock lock = this .lock ;
575- lock .lock ();
576- try {
577- enqueuedByBudget .offer (e );
578- elementAvailable .signal ();
579- } finally {
580- lock .unlock ();
581- }
582- return true ;
583- }
584-
585- ElementWithReleasableBudget take () throws InterruptedException {
586- final ReentrantLock lock = this .lock ;
587- lock .lockInterruptibly ();
588- try {
589- E peek ;
590- long peekBudget ;
591- while ((peek = enqueuedByBudget .peek ()) == null || (peekBudget = budgetFunction .applyAsLong (peek )) > availableBudget ) {
592- elementAvailable .await ();
593- }
594- return new ElementWithReleasableBudget (enqueuedByBudget .poll (), peekBudget );
595- } finally {
596- lock .unlock ();
597- }
598- }
599-
600- void updateAvailableBudget (long availableBudget ) {
601- final ReentrantLock lock = this .lock ;
602- lock .lock ();
603- try {
604- this .availableBudget = availableBudget ;
605- // update the per-element budget
606- unreleasedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e ));
607- // update available budget given the per-element budget
608- this .availableBudget -= unreleasedBudgetPerElement .values ().stream ().reduce (0L , Long ::sum );
609- elementAvailable .signalAll ();
610- } finally {
611- lock .unlock ();
612- }
613- }
614-
615- boolean isEmpty () {
616- return enqueuedByBudget .isEmpty ();
617- }
618-
619- int size () {
620- return enqueuedByBudget .size ();
621- }
622-
623- class ElementWithReleasableBudget implements Releasable {
624- private final E element ;
625-
626- private ElementWithReleasableBudget (E element , long budget ) {
627- this .element = element ;
628- assert PriorityBlockingQueueWithBudget .this .lock .isHeldByCurrentThread ();
629- var prev = unreleasedBudgetPerElement .put (element , budget );
630- assert prev == null ;
631- availableBudget -= budget ;
632- assert availableBudget >= 0L ;
633- }
634553
635554 @ Override
636- public void close () {
637- final ReentrantLock lock = PriorityBlockingQueueWithBudget .this .lock ;
638- lock .lock ();
639- try {
640- assert unreleasedBudgetPerElement .containsKey (element );
641- availableBudget += unreleasedBudgetPerElement .remove (element );
642- elementAvailable .signalAll ();
643- } finally {
644- lock .unlock ();
645- }
646- }
647-
648- public E element () {
649- return element ;
555+ public Throwable fillInStackTrace () {
556+ return this ; // this exception doesn't imply a bug, no need for a stack trace
650557 }
651558 }
652559 }
0 commit comments