1717import org .elasticsearch .common .unit .RelativeByteSizeValue ;
1818import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
1919import org .elasticsearch .core .Nullable ;
20+ import org .elasticsearch .core .Releasable ;
2021import org .elasticsearch .core .TimeValue ;
2122import org .elasticsearch .env .NodeEnvironment ;
2223import org .elasticsearch .index .engine .ThreadPoolMergeScheduler .MergeTask ;
2627
2728import java .io .Closeable ;
2829import java .io .IOException ;
30+ import java .util .Collections ;
2931import java .util .Comparator ;
32+ import java .util .IdentityHashMap ;
3033import java .util .Iterator ;
3134import java .util .List ;
3235import java .util .Map ;
@@ -158,8 +161,8 @@ public Iterator<Setting<?>> settings() {
158161 * The merge tasks that are waiting execution. This does NOT include backlogged or currently executing merge tasks.
159162 * For instance, this can be empty while there are backlogged merge tasks awaiting re-enqueuing.
160163 */
161- private final PriorityBlockingQueueWithMaxLimit <MergeTask > queuedMergeTasks = new PriorityBlockingQueueWithMaxLimit <>(
162- MergeTask ::estimatedMergeSize ,
164+ private final PriorityBlockingQueueWithBudget <MergeTask > queuedMergeTasks = new PriorityBlockingQueueWithBudget <>(
165+ MergeTask ::estimatedRemainingMergeSize ,
163166 Long .MAX_VALUE
164167 );
165168 /**
@@ -280,10 +283,10 @@ private boolean enqueueMergeTaskExecution() {
280283 // one such runnable always executes a SINGLE merge task from the queue
281284 // this is important for merge queue statistics, i.e. the executor's queue size represents the current amount of merges
282285 while (true ) {
283- MergeTask smallestMergeTask ;
286+ PriorityBlockingQueueWithBudget < MergeTask >. ElementWithBudgetRelease smallestMergeTaskWithReleasableBudget ;
284287 try {
285288 // will block if there are backlogged merges until they're enqueued again
286- smallestMergeTask = queuedMergeTasks .take ();
289+ smallestMergeTaskWithReleasableBudget = queuedMergeTasks .take ();
287290 } catch (InterruptedException e ) {
288291 // An active worker thread has been interrupted while waiting for backlogged merges to be re-enqueued.
289292 // In this case, we terminate the worker thread promptly and forget about the backlogged merges.
@@ -293,18 +296,23 @@ private boolean enqueueMergeTaskExecution() {
293296 // is also drained, so any queued merge tasks are also forgotten.
294297 break ;
295298 }
296- // let the task's scheduler decide if it can actually run the merge task now
297- ThreadPoolMergeScheduler .Schedule schedule = smallestMergeTask .schedule ();
298- if (schedule == RUN ) {
299- runMergeTask (smallestMergeTask );
300- break ;
301- } else if (schedule == ABORT ) {
302- abortMergeTask (smallestMergeTask );
303- break ;
304- } else {
305- assert schedule == BACKLOG ;
306- // the merge task is backlogged by the merge scheduler, try to get the next smallest one
307- // it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run
299+ try {
300+ // let the task's scheduler decide if it can actually run the merge task now
301+ MergeTask smallestMergeTask = smallestMergeTaskWithReleasableBudget .element ();
302+ ThreadPoolMergeScheduler .Schedule schedule = smallestMergeTask .schedule ();
303+ if (schedule == RUN ) {
304+ runMergeTask (smallestMergeTask );
305+ break ;
306+ } else if (schedule == ABORT ) {
307+ abortMergeTask (smallestMergeTask );
308+ break ;
309+ } else {
310+ assert schedule == BACKLOG ;
311+ // the merge task is backlogged by the merge scheduler, try to get the next smallest one
312+ // it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run
313+ }
314+ } finally {
315+ smallestMergeTaskWithReleasableBudget .close ();
308316 }
309317 }
310318 });
@@ -402,7 +410,7 @@ public void run() {
402410 .getBytes ();
403411 // the rest is the maximum disk space available for a new merge task
404412 long maxMergeSizeLimit = Math .max (0L , leastAvailableDiskSpaceBytes );
405- queuedMergeTasks .updateMaxPriorityLimit (maxMergeSizeLimit );
413+ queuedMergeTasks .updateAvailableBudget (maxMergeSizeLimit );
406414 }
407415
408416 private static ByteSizeValue getFreeBytesThreshold (
@@ -419,63 +427,90 @@ private static ByteSizeValue getFreeBytesThreshold(
419427 }
420428 }
421429
422- static class PriorityBlockingQueueWithMaxLimit <E > {
423- private final ToLongFunction <? super E > priorityFunction ;
424- private final PriorityQueue <E > priorityQueue ;
430+ static class PriorityBlockingQueueWithBudget <E > {
431+ private final ToLongFunction <? super E > budgetFunction ;
432+ private final PriorityQueue <E > enqueuedByBudget ;
433+ private final Set <E > tookNotReleased ;
425434 private final ReentrantLock lock ;
426435 private final Condition elementAvailable ;
427- private long maxPriorityLimit ;
436+ private long availableBudget ;
428437
429- PriorityBlockingQueueWithMaxLimit (ToLongFunction <? super E > priorityFunction , long maxPriorityLimit ) {
430- this .priorityFunction = priorityFunction ;
431- this .priorityQueue = new PriorityQueue <E >(64 , Comparator .comparingLong (priorityFunction ));
438+ PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction , long availableBudget ) {
439+ this .budgetFunction = budgetFunction ;
440+ this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (budgetFunction ));
441+ this .tookNotReleased = Collections .newSetFromMap (new IdentityHashMap <>());
432442 this .lock = new ReentrantLock ();
433443 this .elementAvailable = lock .newCondition ();
434- this .maxPriorityLimit = maxPriorityLimit ;
444+ this .availableBudget = availableBudget ;
435445 }
436446
437447 boolean add (E e ) {
438448 final ReentrantLock lock = this .lock ;
439449 lock .lock ();
440450 try {
441- priorityQueue .offer (e );
451+ enqueuedByBudget .offer (e );
442452 elementAvailable .signal ();
443453 } finally {
444454 lock .unlock ();
445455 }
446456 return true ;
447457 }
448458
449- E take () throws InterruptedException {
459+ ElementWithBudgetRelease take () throws InterruptedException {
450460 final ReentrantLock lock = this .lock ;
451461 lock .lockInterruptibly ();
452- E peek ;
453462 try {
454- while ((peek = priorityQueue .peek ()) == null || priorityFunction .applyAsLong (peek ) > maxPriorityLimit )
463+ E peek ;
464+ long peekBudget ;
465+ while ((peek = enqueuedByBudget .peek ()) == null || (peekBudget = budgetFunction .applyAsLong (peek )) > availableBudget )
455466 elementAvailable .await ();
456- return priorityQueue .poll ();
467+ availableBudget -= peekBudget ;
468+ assert availableBudget > 0L ;
469+ return new ElementWithBudgetRelease (enqueuedByBudget .poll ());
457470 } finally {
458471 lock .unlock ();
459472 }
460473 }
461474
462- void updateMaxPriorityLimit (long maxPriorityLimit ) {
475+ void updateAvailableBudget (long availableBudget ) {
463476 final ReentrantLock lock = this .lock ;
464477 lock .lock ();
465478 try {
466- this .maxPriorityLimit = maxPriorityLimit ;
479+ this .availableBudget = availableBudget ;
480+ for (E tookElement : tookNotReleased ) {
481+ this .availableBudget -= budgetFunction .applyAsLong (tookElement );
482+ }
483+ this .availableBudget = Math .max (0L , this .availableBudget );
467484 elementAvailable .signalAll ();
468485 } finally {
469486 lock .unlock ();
470487 }
471488 }
472489
473490 boolean isEmpty () {
474- return priorityQueue .isEmpty ();
491+ return enqueuedByBudget .isEmpty ();
475492 }
476493
477494 int size () {
478- return priorityQueue .size ();
495+ return enqueuedByBudget .size ();
496+ }
497+
498+ class ElementWithBudgetRelease implements Releasable {
499+ private final E element ;
500+
501+ private ElementWithBudgetRelease (E element ) {
502+ this .element = element ;
503+ tookNotReleased .add (element );
504+ }
505+
506+ @ Override
507+ public void close () {
508+ tookNotReleased .remove (element );
509+ }
510+
511+ public E element () {
512+ return element ;
513+ }
479514 }
480515 }
481516
0 commit comments