3030import java .util .Iterator ;
3131import java .util .List ;
3232import java .util .Map ;
33+ import java .util .PriorityQueue ;
3334import java .util .Set ;
3435import java .util .concurrent .ExecutorService ;
3536import java .util .concurrent .PriorityBlockingQueue ;
3637import java .util .concurrent .RejectedExecutionException ;
3738import java .util .concurrent .atomic .AtomicInteger ;
3839import java .util .concurrent .atomic .AtomicLong ;
40+ import java .util .concurrent .locks .Condition ;
41+ import java .util .concurrent .locks .ReentrantLock ;
3942import java .util .function .LongUnaryOperator ;
43+ import java .util .function .ToLongFunction ;
4044
4145import static org .elasticsearch .index .engine .ThreadPoolMergeScheduler .Schedule .ABORT ;
4246import static org .elasticsearch .index .engine .ThreadPoolMergeScheduler .Schedule .BACKLOG ;
@@ -150,9 +154,9 @@ public Iterator<Setting<?>> settings() {
150154 * The merge tasks that are waiting execution. This does NOT include backlogged or currently executing merge tasks.
151155 * For instance, this can be empty while there are backlogged merge tasks awaiting re-enqueuing.
152156 */
153- private final PriorityBlockingQueue <MergeTask > queuedMergeTasks = new PriorityBlockingQueue <>(
154- 64 ,
155- Comparator . comparingLong ( MergeTask :: estimatedMergeSize )
157+ private final PriorityBlockingQueueWithMaxLimit <MergeTask > queuedMergeTasks = new PriorityBlockingQueueWithMaxLimit <>(
158+ MergeTask :: estimatedMergeSize ,
159+ Long . MAX_VALUE
156160 );
157161 /**
158162 * The set of all merge tasks currently being executed by merge threads from the pool.
@@ -172,7 +176,6 @@ public Iterator<Setting<?>> settings() {
172176 private final int concurrentMergesFloorLimitForThrottling ;
173177 private final int concurrentMergesCeilLimitForThrottling ;
174178 private final AtomicLong leastAvailableDiskSpaceBytes ;
175- private final NodeEnvironment .DataPath [] dataPaths ;
176179 private final Scheduler .Cancellable diskSpaceMonitor ;
177180
178181 public static @ Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService (
@@ -195,7 +198,6 @@ private ThreadPoolMergeExecutorService(ThreadPool threadPool, Settings settings,
195198 this .concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2 ;
196199 assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling ;
197200 this .leastAvailableDiskSpaceBytes = new AtomicLong ();
198- this .dataPaths = nodeEnvironment .dataPaths ();
199201 this .diskSpaceMonitor = threadPool .scheduleWithFixedDelay (
200202 new DiskSpaceMonitor (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING .get (settings ),
201203 INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING .get (settings ), nodeEnvironment .dataPaths ()),
@@ -328,7 +330,6 @@ private void abortMergeTask(MergeTask mergeTask) {
328330 }
329331
330332 class DiskSpaceMonitor implements Runnable {
331-
332333 private final RelativeByteSizeValue highStageWatermark ;
333334 private final ByteSizeValue highStageMaxHeadroom ;
334335 private final NodeEnvironment .DataPath [] dataPaths ;
@@ -373,13 +374,12 @@ public void run() {
373374 for (MergeTask mergeTask : runningMergeTasks ) {
374375 leastAvailableDiskSpaceBytes -= mergeTask .estimatedRemainingMergeSize ();
375376 }
376- // also subtract the configured headroom space
377+ // also subtract the configured free disk space threshold
377378 leastAvailableDiskSpaceBytes -= getFreeBytesThreshold (leastAvailablePath .getTotal (), highStageWatermark , highStageMaxHeadroom )
378379 .getBytes ();
379- // this is the maximum disk space available for a new merge task
380- leastAvailableDiskSpaceBytes = Math .max (0L , leastAvailableDiskSpaceBytes );
381- // TODO update the priority queue
382- ThreadPoolMergeExecutorService .this .leastAvailableDiskSpaceBytes .set (leastAvailableDiskSpaceBytes );
380+ // the rest is the maximum disk space available for a new merge task
381+ long maxMergeSizeLimit = Math .max (0L , leastAvailableDiskSpaceBytes );
382+ queuedMergeTasks .updateMaxPriorityLimit (maxMergeSizeLimit );
383383 }
384384
385385 private static ByteSizeValue getFreeBytesThreshold (
@@ -396,6 +396,62 @@ private static ByteSizeValue getFreeBytesThreshold(
396396 }
397397 }
398398
399+ static class PriorityBlockingQueueWithMaxLimit <E > {
400+ private final ToLongFunction <? super E > priorityFunction ;
401+ private final PriorityQueue <E > priorityQueue ;
402+ private final ReentrantLock lock ;
403+ private final Condition elementAvailable ;
404+ private long maxPriorityLimit ;
405+
406+ PriorityBlockingQueueWithMaxLimit (ToLongFunction <? super E > priorityFunction , long maxPriorityLimit ) {
407+ this .priorityFunction = priorityFunction ;
408+ this .priorityQueue = new PriorityQueue <E >(64 , Comparator .comparingLong (priorityFunction ));
409+ this .lock = new ReentrantLock ();
410+ this .elementAvailable = lock .newCondition ();
411+ this .maxPriorityLimit = maxPriorityLimit ;
412+ }
413+
414+ boolean add (E e ) {
415+ final ReentrantLock lock = this .lock ;
416+ lock .lock ();
417+ try {
418+ priorityQueue .offer (e );
419+ elementAvailable .signal ();
420+ } finally {
421+ lock .unlock ();
422+ }
423+ return true ;
424+ }
425+
426+ E take () throws InterruptedException {
427+ final ReentrantLock lock = this .lock ;
428+ lock .lockInterruptibly ();
429+ E peek ;
430+ try {
431+ while ((peek = priorityQueue .peek ()) == null || priorityFunction .applyAsLong (peek ) > maxPriorityLimit )
432+ elementAvailable .await ();
433+ return priorityQueue .poll ();
434+ } finally {
435+ lock .unlock ();
436+ }
437+ }
438+
439+ void updateMaxPriorityLimit (long maxPriorityLimit ) {
440+ final ReentrantLock lock = this .lock ;
441+ lock .lock ();
442+ try {
443+ this .maxPriorityLimit = maxPriorityLimit ;
444+ elementAvailable .signalAll ();
445+ } finally {
446+ lock .unlock ();
447+ }
448+ }
449+
450+ boolean isEmpty () {
451+ return priorityQueue .isEmpty ();
452+ }
453+ }
454+
399455 @ Override
400456 public void close () throws IOException {
401457 diskSpaceMonitor .cancel ();
0 commit comments