@@ -161,10 +161,11 @@ public Iterator<Setting<?>> settings() {
161161 /**
162162 * The merge tasks that are waiting execution. This does NOT include backlogged or currently executing merge tasks.
163163 * For instance, this can be empty while there are backlogged merge tasks awaiting re-enqueuing.
164+ * The budget (estimation) for a merge task is the disk space (still) required for it to complete. As the merge progresses,
165+ * its budget decreases (as the bytes already written have been incorporated into the filesystem stats about the used disk space).
164166 */
165167 private final PriorityBlockingQueueWithBudget <MergeTask > queuedMergeTasks = new PriorityBlockingQueueWithBudget <>(
166- MergeTask ::estimatedRemainingMergeSize ,
167- Long .MAX_VALUE
168+ MergeTask ::estimatedRemainingMergeSize
168169 );
169170 /**
170171 * The set of all merge tasks currently being executed by merge threads from the pool.
@@ -207,9 +208,11 @@ private ThreadPoolMergeExecutorService(ThreadPool threadPool, ClusterSettings cl
207208 this .concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2 ;
208209 assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling ;
209210 // start monitoring the available disk space, and update the available budget for running merge tasks
210- // note that this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
211- // available disk space, so merges will be blocked for shards on data paths with no available disk space, as long as there is
212- // one data path that has enough disk space to run merges for the shards that it stores
211+ // Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
212+ // available disk space. In this case, merges will NOT be blocked for shards on data paths with insufficient available
213+ // disk space, as long as a single data path has enough available disk space to run merges for any shards that it stores
214+ // (i.e. multiple data path is not really supported when blocking merges due to insufficient available disk space
215+ // (but nothing blows up either, if using multiple data paths))
213216 this .availableDiskSpacePeriodicMonitor = new AvailableDiskSpacePeriodicMonitor (
214217 nodeEnvironment .dataPaths (),
215218 threadPool ,
@@ -300,7 +303,10 @@ private boolean enqueueMergeTaskExecution() {
300303 while (true ) {
301304 PriorityBlockingQueueWithBudget <MergeTask >.ElementWithReleasableBudget smallestMergeTaskWithReleasableBudget ;
302305 try {
303- // will block if there are backlogged merges until they're enqueued again
306+ // Will block if there are backlogged merges until they're enqueued again
307+ // (for e.g. if the per-shard concurrent merges count limit is reached).
308+ // Will also block if there is insufficient budget (i.e. estimated available disk space
309+ // for the smallest merge task to run to completion)
304310 smallestMergeTaskWithReleasableBudget = queuedMergeTasks .take ();
305311 } catch (InterruptedException e ) {
306312 // An active worker thread has been interrupted while waiting for backlogged merges to be re-enqueued.
@@ -312,8 +318,8 @@ private boolean enqueueMergeTaskExecution() {
312318 break ;
313319 }
314320 try {
315- // let the task's scheduler decide if it can actually run the merge task now
316321 MergeTask smallestMergeTask = smallestMergeTaskWithReleasableBudget .element ();
322+ // let the task's scheduler decide if it can actually run the merge task now
317323 ThreadPoolMergeScheduler .Schedule schedule = smallestMergeTask .schedule ();
318324 if (schedule == RUN ) {
319325 runMergeTask (smallestMergeTask );
@@ -323,10 +329,12 @@ private boolean enqueueMergeTaskExecution() {
323329 break ;
324330 } else {
325331 assert schedule == BACKLOG ;
326- // the merge task is backlogged by the merge scheduler, try to get the next smallest one
327- // it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run
332+ // The merge task is backlogged by the merge scheduler, try to get the next smallest one.
333+ // It's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when
334+ // itself decides that the merge task could be run.
328335 }
329336 } finally {
337+ // releases any budget that is still being allocated for the merge task
330338 smallestMergeTaskWithReleasableBudget .close ();
331339 }
332340 }
@@ -486,6 +494,10 @@ static class PriorityBlockingQueueWithBudget<E> {
486494 private final Condition elementAvailable ;
487495 private long availableBudget ;
488496
497+ PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction ) {
498+ this (budgetFunction , Long .MAX_VALUE );
499+ }
500+
489501 PriorityBlockingQueueWithBudget (ToLongFunction <? super E > budgetFunction , long availableBudget ) {
490502 this .budgetFunction = budgetFunction ;
491503 this .enqueuedByBudget = new PriorityQueue <>(64 , Comparator .comparingLong (budgetFunction ));
0 commit comments