2727
2828import java .io .Closeable ;
2929import java .io .IOException ;
30- import java .util .Collections ;
3130import java .util .Comparator ;
3231import java .util .IdentityHashMap ;
3332import java .util .Iterator ;
@@ -265,7 +264,7 @@ private void enqueueMergeTask(MergeTask mergeTask) {
265264 // To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued
266265 // before adding the merge task to the queue. Adding to the queue should not fail.
267266 mergeEventListeners .forEach (l -> l .onMergeQueued (mergeTask .getOnGoingMerge (), mergeTask .getMergeMemoryEstimateBytes ()));
268- boolean added = queuedMergeTasks .add (mergeTask );
267+ boolean added = queuedMergeTasks .enqueue (mergeTask );
269268 assert added ;
270269 }
271270
@@ -283,7 +282,7 @@ private boolean enqueueMergeTaskExecution() {
283282 // one such runnable always executes a SINGLE merge task from the queue
284283 // this is important for merge queue statistics, i.e. the executor's queue size represents the current amount of merges
285284 while (true ) {
286- PriorityBlockingQueueWithBudget <MergeTask >.ElementWithBudgetRelease smallestMergeTaskWithReleasableBudget ;
285+ PriorityBlockingQueueWithBudget <MergeTask >.ElementWithReleasableBudget smallestMergeTaskWithReleasableBudget ;
287286 try {
288287 // will block if there are backlogged merges until they're enqueued again
289288 smallestMergeTaskWithReleasableBudget = queuedMergeTasks .take ();
@@ -430,21 +429,21 @@ private static ByteSizeValue getFreeBytesThreshold(
430429 static class PriorityBlockingQueueWithBudget <E > {
431430 private final ToLongFunction <? super E > budgetFunction ;
432431 private final PriorityQueue <E > enqueuedByBudget ;
433- private final Set < E > tookNotReleased ;
432+ private final IdentityHashMap < E , Long > unreleasedBudgetPerElement ;
434433 private final ReentrantLock lock ;
435434 private final Condition elementAvailable ;
436435 private long availableBudget ;
437436
438437 PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction , long availableBudget ) {
439438 this .budgetFunction = budgetFunction ;
440439 this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (budgetFunction ));
441- this .tookNotReleased = Collections . newSetFromMap ( new IdentityHashMap <>() );
440+ this .unreleasedBudgetPerElement = new IdentityHashMap <>();
442441 this .lock = new ReentrantLock ();
443442 this .elementAvailable = lock .newCondition ();
444443 this .availableBudget = availableBudget ;
445444 }
446445
447- boolean add (E e ) {
446+ boolean enqueue (E e ) {
448447 final ReentrantLock lock = this .lock ;
449448 lock .lock ();
450449 try {
@@ -456,17 +455,15 @@ boolean add(E e) {
456455 return true ;
457456 }
458457
459- ElementWithBudgetRelease take () throws InterruptedException {
458+ ElementWithReleasableBudget take () throws InterruptedException {
460459 final ReentrantLock lock = this .lock ;
461460 lock .lockInterruptibly ();
462461 try {
463462 E peek ;
464463 long peekBudget ;
465464 while ((peek = enqueuedByBudget .peek ()) == null || (peekBudget = budgetFunction .applyAsLong (peek )) > availableBudget )
466465 elementAvailable .await ();
467- availableBudget -= peekBudget ;
468- assert availableBudget > 0L ;
469- return new ElementWithBudgetRelease (enqueuedByBudget .poll ());
466+ return new ElementWithReleasableBudget (enqueuedByBudget .poll (), peekBudget );
470467 } finally {
471468 lock .unlock ();
472469 }
@@ -477,10 +474,10 @@ void updateAvailableBudget(long availableBudget) {
477474 lock .lock ();
478475 try {
479476 this .availableBudget = availableBudget ;
480- for ( E tookElement : tookNotReleased ) {
481- this . availableBudget -= budgetFunction .applyAsLong (tookElement );
482- }
483- this .availableBudget = Math . max ( 0L , this . availableBudget );
477+ // also update budget per element
478+ unreleasedBudgetPerElement . replaceAll (( e , v ) -> budgetFunction .applyAsLong (e ) );
479+ // update available budget given the per element budget
480+ this .availableBudget -= unreleasedBudgetPerElement . values (). stream (). reduce ( 0L , Long :: sum );
484481 elementAvailable .signalAll ();
485482 } finally {
486483 lock .unlock ();
@@ -495,17 +492,28 @@ int size() {
495492 return enqueuedByBudget .size ();
496493 }
497494
498- class ElementWithBudgetRelease implements Releasable {
495+ class ElementWithReleasableBudget implements Releasable {
499496 private final E element ;
500497
501- private ElementWithBudgetRelease (E element ) {
498+ private ElementWithReleasableBudget (E element , long budget ) {
502499 this .element = element ;
503- tookNotReleased .add (element );
500+ assert PriorityBlockingQueueWithBudget .this .lock .isHeldByCurrentThread ();
501+ var prev = unreleasedBudgetPerElement .put (element , budget );
502+ assert prev == null ;
503+ availableBudget -= budget ;
504+ assert availableBudget >= 0L ;
504505 }
505506
506507 @ Override
507508 public void close () {
508- tookNotReleased .remove (element );
509+ final ReentrantLock lock = PriorityBlockingQueueWithBudget .this .lock ;
510+ lock .lock ();
511+ try {
512+ availableBudget += unreleasedBudgetPerElement .remove (element );
513+ elementAvailable .signalAll ();
514+ } finally {
515+ lock .unlock ();
516+ }
509517 }
510518
511519 public E element () {
0 commit comments