2929
3030import java .io .Closeable ;
3131import java .io .IOException ;
32+ import java .util .ArrayList ;
3233import java .util .Arrays ;
3334import java .util .Comparator ;
3435import java .util .IdentityHashMap ;
3940import java .util .Set ;
4041import java .util .concurrent .CopyOnWriteArrayList ;
4142import java .util .concurrent .ExecutorService ;
43+ import java .util .concurrent .PriorityBlockingQueue ;
4244import java .util .concurrent .RejectedExecutionException ;
4345import java .util .concurrent .atomic .AtomicInteger ;
4446import java .util .concurrent .atomic .AtomicLong ;
@@ -163,9 +165,9 @@ public Iterator<Setting<?>> settings() {
163165 * The merge tasks that are waiting execution. This does NOT include backlogged or currently executing merge tasks.
164166 * For instance, this can be empty while there are backlogged merge tasks awaiting re-enqueuing.
165167 */
166- private final PriorityBlockingQueueWithBudget <MergeTask > queuedMergeTasks = new PriorityBlockingQueueWithBudget <>(
167- MergeTask :: estimatedRemainingMergeSize ,
168- Long . MAX_VALUE
168+ private final PriorityBlockingQueue <MergeTask > queuedMergeTasks = new PriorityBlockingQueue <>(
169+ 64 ,
170+ Comparator . comparingLong ( MergeTask :: estimatedMergeSize )
169171 );
170172 /**
171173 * The set of all merge tasks currently being executed by merge threads from the pool.
@@ -184,6 +186,7 @@ public Iterator<Setting<?>> settings() {
184186 private final int maxConcurrentMerges ;
185187 private final int concurrentMergesFloorLimitForThrottling ;
186188 private final int concurrentMergesCeilLimitForThrottling ;
189+ private final BudgetTracker <MergeTask > budgetTracker ;
187190 private final AvailableDiskSpacePeriodicMonitor availableDiskSpacePeriodicMonitor ;
188191
189192 private final List <MergeEventListener > mergeEventListeners = new CopyOnWriteArrayList <>();
@@ -213,13 +216,14 @@ private ThreadPoolMergeExecutorService(
213216 this .concurrentMergesFloorLimitForThrottling = 2 ;
214217 this .concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2 ;
215218 assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling ;
219+ this .budgetTracker = new BudgetTracker <>(MergeTask ::estimatedRemainingMergeSize , Long .MAX_VALUE , queuedMergeTasks ::add );
216220 this .availableDiskSpacePeriodicMonitor = new AvailableDiskSpacePeriodicMonitor (
217221 nodeEnvironment .dataPaths (),
218222 threadPool ,
219223 INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .get (settings ),
220224 INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING .get (settings ),
221225 INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING .get (settings ),
222- (availableDiskSpaceByteSize ) -> queuedMergeTasks . updateAvailableBudget (availableDiskSpaceByteSize .getBytes ())
226+ (availableDiskSpaceByteSize ) -> budgetTracker . updateBudget (availableDiskSpaceByteSize .getBytes ())
223227 );
224228 clusterSettings .addSettingsUpdateConsumer (
225229 INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING ,
@@ -230,8 +234,8 @@ private ThreadPoolMergeExecutorService(
230234 this .availableDiskSpacePeriodicMonitor ::setHighStageMaxHeadroom
231235 );
232236 clusterSettings .addSettingsUpdateConsumer (
233- INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING ,
234- this .availableDiskSpacePeriodicMonitor ::setCheckInterval
237+ INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING ,
238+ this .availableDiskSpacePeriodicMonitor ::setCheckInterval
235239 );
236240 }
237241
@@ -282,7 +286,7 @@ private void enqueueMergeTask(MergeTask mergeTask) {
282286 // To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued
283287 // before adding the merge task to the queue. Adding to the queue should not fail.
284288 mergeEventListeners .forEach (l -> l .onMergeQueued (mergeTask .getOnGoingMerge (), mergeTask .getMergeMemoryEstimateBytes ()));
285- boolean added = queuedMergeTasks .enqueue (mergeTask );
289+ boolean added = queuedMergeTasks .add (mergeTask );
286290 assert added ;
287291 }
288292
@@ -300,10 +304,10 @@ private boolean enqueueMergeTaskExecution() {
300304 // one such runnable always executes a SINGLE merge task from the queue
301305 // this is important for merge queue statistics, i.e. the executor's queue size represents the current amount of merges
302306 while (true ) {
303- PriorityBlockingQueueWithBudget < MergeTask >. ElementWithReleasableBudget smallestMergeTaskWithReleasableBudget ;
307+ MergeTask smallestMergeTask ;
304308 try {
305- // will block if there are backlogged merges until they're enqueued again
306- smallestMergeTaskWithReleasableBudget = queuedMergeTasks .take ();
309+ // will block if there are backlogged or over-budget merges until they're enqueued again
310+ smallestMergeTask = queuedMergeTasks .take ();
307311 } catch (InterruptedException e ) {
308312 // An active worker thread has been interrupted while waiting for backlogged merges to be re-enqueued.
309313 // In this case, we terminate the worker thread promptly and forget about the backlogged merges.
@@ -313,9 +317,7 @@ private boolean enqueueMergeTaskExecution() {
313317 // is also drained, so any queued merge tasks are also forgotten.
314318 break ;
315319 }
316- try {
317- // let the task's scheduler decide if it can actually run the merge task now
318- MergeTask smallestMergeTask = smallestMergeTaskWithReleasableBudget .element ();
320+ try (var budgetRelease = budgetTracker .checkBudget (smallestMergeTask )) {
319321 ThreadPoolMergeScheduler .Schedule schedule = smallestMergeTask .schedule ();
320322 if (schedule == RUN ) {
321323 runMergeTask (smallestMergeTask );
@@ -328,8 +330,8 @@ private boolean enqueueMergeTaskExecution() {
328330 // the merge task is backlogged by the merge scheduler, try to get the next smallest one
329331 // it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run
330332 }
331- } finally {
332- smallestMergeTaskWithReleasableBudget . close ();
333+ } catch ( BudgetTracker . BudgetOverflowException e ) {
334+ // continue to poll for new merge tasks, over-budget merge task will be reenqueued
333335 }
334336 }
335337 });
@@ -482,6 +484,75 @@ private static ByteSizeValue getFreeBytesThreshold(
482484 }
483485 }
484486
487+ static class BudgetTracker <E > {
488+ private final ToLongFunction <? super E > budgetFunction ;
489+ private long availableBudget ;
490+ private final IdentityHashMap <E , Long > commitedBudgetPerElement ;
491+ private final List <E > budgetOverflow ;
492+ private final Consumer <? super E > budgetOverflowConsumer ;
493+ private final Object mutex = new Object ();
494+
495+ BudgetTracker (ToLongFunction <? super E > budgetFunction , long availableBudget , Consumer <? super E > budgetOverflowConsumer ) {
496+ this .budgetFunction = budgetFunction ;
497+ this .availableBudget = availableBudget ;
498+ this .commitedBudgetPerElement = new IdentityHashMap <>();
499+ this .budgetOverflow = new ArrayList <>();
500+ this .budgetOverflowConsumer = budgetOverflowConsumer ;
501+ }
502+
503+ Releasable checkBudget (E element ) throws BudgetOverflowException {
504+ long elementBudget = budgetFunction .applyAsLong (element );
505+ synchronized (mutex ) {
506+ if (elementBudget > availableBudget ) {
507+ budgetOverflow .add (element );
508+ throw new BudgetOverflowException ();
509+ } else {
510+ assert commitedBudgetPerElement .containsKey (element ) == false ;
511+ commitedBudgetPerElement .put (element , elementBudget );
512+ availableBudget -= elementBudget ;
513+ assert availableBudget > 0 ;
514+ return () -> {
515+ synchronized (mutex ) {
516+ assert commitedBudgetPerElement .containsKey (element );
517+ availableBudget += commitedBudgetPerElement .remove (element );
518+ dropBudgetOverflow ();
519+ }
520+ };
521+ }
522+ }
523+ }
524+
525+ void updateBudget (long availableBudget ) {
526+ assert availableBudget > 0 ;
527+ synchronized (mutex ) {
528+ long previouslyAvailableBudget = this .availableBudget ;
529+ this .availableBudget = availableBudget ;
530+ // update the per-element budget
531+ commitedBudgetPerElement .replaceAll ((e , v ) -> budgetFunction .applyAsLong (e ));
532+ // update available budget given the per-element budget
533+ this .availableBudget -= commitedBudgetPerElement .values ().stream ().reduce (0L , Long ::sum );
534+ if (this .availableBudget > previouslyAvailableBudget ) {
535+ dropBudgetOverflow ();
536+ }
537+ }
538+ }
539+
540+ private void dropBudgetOverflow () {
541+ synchronized (mutex ) {
542+ for (E element : budgetOverflow ) {
543+ budgetOverflowConsumer .accept (element );
544+ }
545+ budgetOverflow .clear ();
546+ }
547+ }
548+
549+ static final class BudgetOverflowException extends Exception {
550+ public BudgetOverflowException () {
551+ super ("budget overflow" );
552+ }
553+ }
554+ }
555+
485556 static class PriorityBlockingQueueWithBudget <E > {
486557 private final ToLongFunction <? super E > budgetFunction ;
487558 private final PriorityQueue <E > enqueuedByBudget ;
0 commit comments