@@ -179,9 +179,7 @@ public Iterator<Setting<?>> settings() {
179179 * The budget (estimation) for a merge task is the disk space (still) required for it to complete. As the merge progresses,
180180 * its budget decreases (as the bytes already written have been incorporated into the filesystem stats about the used disk space).
181181 */
182- private final PriorityBlockingQueueWithBudget <MergeTask > queuedMergeTasks = new PriorityBlockingQueueWithBudget <>(
183- MergeTask ::estimatedRemainingMergeSize
184- );
182+ private final MergeTaskPriorityBlockingQueue queuedMergeTasks ;
185183 /**
186184 * The set of all merge tasks currently being executed by merge threads from the pool.
187185 * These are tracked notably in order to be able to update their disk IO throttle rate, after they have started, while executing.
@@ -209,24 +207,35 @@ public Iterator<Setting<?>> settings() {
209207 NodeEnvironment nodeEnvironment
210208 ) {
211209 if (clusterSettings .get (USE_THREAD_POOL_MERGE_SCHEDULER_SETTING )) {
212- ThreadPoolMergeExecutorService threadPoolMergeExecutorService = new ThreadPoolMergeExecutorService (
210+ MergeTaskPriorityBlockingQueue mergeTaskPriorityBlockingQueue = new MergeTaskPriorityBlockingQueue ();
211+ // start monitoring the available disk space, and update the available budget for running merge tasks
212+ // Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
213+ // available disk space. In this case, merges will NOT be blocked for shards on data paths with insufficient available
214+ // disk space, as long as a single data path has enough available disk space to run merges for any shards that it stores
215+ // (i.e. multiple data path is not really supported when blocking merges due to insufficient available disk space
216+ // (but nothing blows up either, if using multiple data paths))
217+ AvailableDiskSpacePeriodicMonitor availableDiskSpacePeriodicMonitor = new AvailableDiskSpacePeriodicMonitor (
218+ nodeEnvironment .dataPaths (),
213219 threadPool ,
214- clusterSettings ,
215- nodeEnvironment
220+ clusterSettings .get (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING ),
221+ clusterSettings .get (INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING ),
222+ clusterSettings .get (INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING ),
223+ (availableDiskSpaceByteSize ) -> mergeTaskPriorityBlockingQueue .updateBudget (availableDiskSpaceByteSize .getBytes ())
216224 );
217225 clusterSettings .addSettingsUpdateConsumer (
218226 INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING ,
219- threadPoolMergeExecutorService . getAvailableDiskSpacePeriodicMonitor () ::setHighStageWatermark
227+ availableDiskSpacePeriodicMonitor ::setHighStageWatermark
220228 );
221229 clusterSettings .addSettingsUpdateConsumer (
222230 INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING ,
223- threadPoolMergeExecutorService . getAvailableDiskSpacePeriodicMonitor () ::setHighStageMaxHeadroom
231+ availableDiskSpacePeriodicMonitor ::setHighStageMaxHeadroom
224232 );
225233 clusterSettings .addSettingsUpdateConsumer (
226234 INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING ,
227- threadPoolMergeExecutorService . getAvailableDiskSpacePeriodicMonitor () ::setCheckInterval
235+ availableDiskSpacePeriodicMonitor ::setCheckInterval
228236 );
229- return threadPoolMergeExecutorService ;
237+ // owns and closes the disk space monitor
238+ return new ThreadPoolMergeExecutorService (threadPool , mergeTaskPriorityBlockingQueue , availableDiskSpacePeriodicMonitor );
230239 } else {
231240 // register no-op setting update consumers so that setting validations work properly
232241 // (some validations are bypassed if there are no update consumers registered),
@@ -238,27 +247,19 @@ public Iterator<Setting<?>> settings() {
238247 }
239248 }
240249
241- private ThreadPoolMergeExecutorService (ThreadPool threadPool , ClusterSettings clusterSettings , NodeEnvironment nodeEnvironment ) {
250+ private ThreadPoolMergeExecutorService (
251+ ThreadPool threadPool ,
252+ MergeTaskPriorityBlockingQueue mergeTaskPriorityBlockingQueue ,
253+ AvailableDiskSpacePeriodicMonitor availableDiskSpacePeriodicMonitor
254+ ) {
242255 this .executorService = threadPool .executor (ThreadPool .Names .MERGE );
243256 this .maxConcurrentMerges = threadPool .info (ThreadPool .Names .MERGE ).getMax ();
244257 // the intent here is to throttle down whenever we submit a task and no other task is running
245258 this .concurrentMergesFloorLimitForThrottling = 2 ;
246259 this .concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2 ;
247260 assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling ;
248- // start monitoring the available disk space, and update the available budget for running merge tasks
249- // Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
250- // available disk space. In this case, merges will NOT be blocked for shards on data paths with insufficient available
251- // disk space, as long as a single data path has enough available disk space to run merges for any shards that it stores
252- // (i.e. multiple data path is not really supported when blocking merges due to insufficient available disk space
253- // (but nothing blows up either, if using multiple data paths))
254- this .availableDiskSpacePeriodicMonitor = new AvailableDiskSpacePeriodicMonitor (
255- nodeEnvironment .dataPaths (),
256- threadPool ,
257- clusterSettings .get (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING ),
258- clusterSettings .get (INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING ),
259- clusterSettings .get (INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING ),
260- (availableDiskSpaceByteSize ) -> queuedMergeTasks .updateBudget (availableDiskSpaceByteSize .getBytes ())
261- );
261+ this .queuedMergeTasks = mergeTaskPriorityBlockingQueue ;
262+ this .availableDiskSpacePeriodicMonitor = availableDiskSpacePeriodicMonitor ;
262263 }
263264
264265 boolean submitMergeTask (MergeTask mergeTask ) {
@@ -305,10 +306,6 @@ void reEnqueueBackloggedMergeTask(MergeTask mergeTask) {
305306 enqueueMergeTask (mergeTask );
306307 }
307308
308- AvailableDiskSpacePeriodicMonitor getAvailableDiskSpacePeriodicMonitor () {
309- return this .availableDiskSpacePeriodicMonitor ;
310- }
311-
312309 private void enqueueMergeTask (MergeTask mergeTask ) {
313310 // To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued
314311 // before adding the merge task to the queue. Adding to the queue should not fail.
@@ -318,7 +315,7 @@ private void enqueueMergeTask(MergeTask mergeTask) {
318315 }
319316
320317 public boolean allDone () {
321- return queuedMergeTasks .isEmpty () && runningMergeTasks .isEmpty () && ioThrottledMergeTasksCount .get () == 0L ;
318+ return queuedMergeTasks .isQueueEmpty () && runningMergeTasks .isEmpty () && ioThrottledMergeTasksCount .get () == 0L ;
322319 }
323320
324321 /**
@@ -361,7 +358,8 @@ private boolean enqueueMergeTaskExecution() {
361358 assert schedule == BACKLOG ;
362359 // The merge task is backlogged by the merge scheduler, try to get the next smallest one.
363360 // It's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when
364- // itself decides that the merge task could be run.
361+ // itself decides that the merge task could be run. Note that it is possible that this merge
362+ // task is re-enqueued and re-took before the budget hold-up here is released below.
365363 }
366364 } finally {
367365 // releases any budget that is still being allocated for the merge task
@@ -516,6 +514,19 @@ private static ByteSizeValue getFreeBytesThreshold(
516514 }
517515 }
518516
517+ static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget <MergeTask > {
518+ MergeTaskPriorityBlockingQueue () {
519+ // start with unlimited budget (so this will behave like a regular priority queue until {@link #updateBudget} is invoked)
520+ // use the "remaining" merge size as the budget function so that the "budget" of taken elements is updated according
521+ // to the remaining disk space requirements of currently running merge tasks
522+ super (MergeTask ::estimatedRemainingMergeSize , Long .MAX_VALUE );
523+ }
524+ }
525+
526+ /**
527+ * Similar to a regular priority queue, but the {@link #take()} operation will also block if the smallest element
528+ * (according to the specified "budget" function) is larger than an updatable limit budget.
529+ */
519530 static class PriorityBlockingQueueWithBudget <E > {
520531 private final ToLongFunction <? super E > budgetFunction ;
521532 private final PriorityQueue <E > enqueuedByBudget ;
@@ -524,17 +535,13 @@ static class PriorityBlockingQueueWithBudget<E> {
524535 private final Condition elementAvailable ;
525536 private long availableBudget ;
526537
527- PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction ) {
528- this (budgetFunction , Long .MAX_VALUE );
529- }
530-
531- PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction , long availableBudget ) {
538+ PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction , long initialAvailableBudget ) {
532539 this .budgetFunction = budgetFunction ;
533540 this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (budgetFunction ));
534541 this .unreleasedBudgetPerElement = new IdentityHashMap <>();
535542 this .lock = new ReentrantLock ();
536543 this .elementAvailable = lock .newCondition ();
537- this .availableBudget = availableBudget ;
544+ this .availableBudget = initialAvailableBudget ;
538545 }
539546
540547 boolean enqueue (E e ) {
@@ -549,21 +556,33 @@ boolean enqueue(E e) {
549556 return true ;
550557 }
551558
559+ /**
560+ * Dequeues the smallest element (according to the specified "budget" function) if its budget is below the available limit.
561+ * This method invocation blocks if the queue is empty or the element's budget is above the available limit.
562+ */
552563 ElementWithReleasableBudget take () throws InterruptedException {
553564 final ReentrantLock lock = this .lock ;
554565 lock .lockInterruptibly ();
555566 try {
556567 E peek ;
557568 long peekBudget ;
569+ // blocks until the smallest budget element fits the currently available budget
558570 while ((peek = enqueuedByBudget .peek ()) == null || (peekBudget = budgetFunction .applyAsLong (peek )) > availableBudget ) {
559571 elementAvailable .await ();
560572 }
573+ // deducts and holds up that element's budget from the available budget
561574 return new ElementWithReleasableBudget (enqueuedByBudget .poll (), peekBudget );
562575 } finally {
563576 lock .unlock ();
564577 }
565578 }
566579
580+ /**
581+ * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
582+ * that are still in use. The budget of in-use elements is also updated (by re-applying the budget function).
583+ * The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element
584+ * is over this newly computed available budget.
585+ */
567586 void updateBudget (long availableBudget ) {
568587 final ReentrantLock lock = this .lock ;
569588 lock .lock ();
@@ -579,18 +598,23 @@ void updateBudget(long availableBudget) {
579598 }
580599 }
581600
582- boolean isEmpty () {
601+ boolean isQueueEmpty () {
583602 return enqueuedByBudget .isEmpty ();
584603 }
585604
586- int size () {
605+ int queueSize () {
587606 return enqueuedByBudget .size ();
588607 }
589608
590609 class ElementWithReleasableBudget implements Releasable {
591610 private final Wrap <E > wrappedElement ;
592611
593612 private ElementWithReleasableBudget (E element , long budget ) {
613+ // Wrap the element in a brand-new instance that's used as the key in the
614+ // {@link PriorityBlockingQueueWithBudget#unreleasedBudgetPerElement} identity map.
615+ // This allows the same exact "element" instance to hold budgets multiple times concurrently.
616+ // This way we allow to re-enqueue and re-take an element before a previous take completed and
617+ // released the budget.
594618 this .wrappedElement = new Wrap <>(element );
595619 assert PriorityBlockingQueueWithBudget .this .lock .isHeldByCurrentThread ();
596620 // the taken element holds up some budget
@@ -600,13 +624,16 @@ private ElementWithReleasableBudget(E element, long budget) {
600624 assert availableBudget >= 0L ;
601625 }
602626
627+ /**
628+ * Must be invoked when the caller is done with the element that it took from the queue.
629+ */
603630 @ Override
604631 public void close () {
605632 final ReentrantLock lock = PriorityBlockingQueueWithBudget .this .lock ;
606633 lock .lock ();
607634 try {
608635 assert unreleasedBudgetPerElement .containsKey (wrappedElement );
609- // when the taken element is not used anymore, the budget it hold is released
636+ // when the taken element is not used anymore, the budget it holds is released
610637 availableBudget += unreleasedBudgetPerElement .remove (wrappedElement );
611638 elementAvailable .signalAll ();
612639 } finally {
@@ -700,7 +727,7 @@ Set<MergeTask> getRunningMergeTasks() {
700727
701728 // exposed for tests
702729 int getMergeTasksQueueLength () {
703- return queuedMergeTasks .size ();
730+ return queuedMergeTasks .queueSize ();
704731 }
705732
706733 // exposed for tests and stats
0 commit comments