diff --git a/docs/changelog/127613.yaml b/docs/changelog/127613.yaml new file mode 100644 index 0000000000000..de043e209b32e --- /dev/null +++ b/docs/changelog/127613.yaml @@ -0,0 +1,5 @@ +pr: 127613 +summary: Threadpool merge executor is aware of available disk space +area: Engine +type: feature +issues: [] diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index a15d4f3049528..dea61e770b2f6 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -88,6 +88,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.indices.IndexingMemoryController; @@ -629,6 +630,9 @@ public void apply(Settings value, Settings current, Settings previous) { MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING, MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING, ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING, + ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING, + ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING, + ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING, TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE, DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING, DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index d3dff8c30449a..9e74c19d8a85e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -9,29 +9,154 @@ package org.elasticsearch.index.engine; -import org.elasticsearch.common.settings.Settings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.RelativeByteSizeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; +import org.elasticsearch.monitor.fs.FsInfo; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.LongUnaryOperator; +import java.util.function.ToLongFunction; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING; +import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING; import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.ABORT; import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.BACKLOG; import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.RUN; +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING; +import static org.elasticsearch.monitor.fs.FsProbe.getFSInfo; -public class ThreadPoolMergeExecutorService { +public class ThreadPoolMergeExecutorService implements Closeable { + /** How frequently we check disk usage (default: 5 seconds). */ + public static final Setting INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.positiveTimeSetting( + "indices.merge.disk.check_interval", + TimeValue.timeValueSeconds(5), + Property.Dynamic, + Property.NodeScope + ); + /** + * The occupied disk space threshold beyond which NO new merges are started. + * Conservatively, the estimated temporary disk space required for the to-be-started merge is counted as occupied disk space. + * Defaults to the routing allocation flood stage limit value (beyond which shards are toggled read-only). + */ + public static final Setting INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING = new Setting<>( + "indices.merge.disk.watermark.high", + CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, + (s) -> RelativeByteSizeValue.parseRelativeByteSizeValue(s, "indices.merge.disk.watermark.high"), + new Setting.Validator<>() { + @Override + public void validate(RelativeByteSizeValue value) {} + + @Override + public void validate(RelativeByteSizeValue value, Map, Object> settings, boolean isPresent) { + if (isPresent && settings.get(USE_THREAD_POOL_MERGE_SCHEDULER_SETTING).equals(Boolean.FALSE)) { + throw new IllegalArgumentException( + "indices merge watermark setting is only effective when [" + + USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey() + + "] is set to [true]" + ); + } + } + + @Override + public Iterator> settings() { + List> res = List.of(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING, USE_THREAD_POOL_MERGE_SCHEDULER_SETTING); + return res.iterator(); + } + }, + Property.Dynamic, + Property.NodeScope + ); + /** + * The available disk space headroom below which NO new merges are started. + * Conservatively, the estimated temporary disk space required for the to-be-started merge is NOT counted as available disk space. + * Defaults to the routing allocation flood stage headroom value (below which shards are toggled read-only), + * unless the merge occupied disk space threshold is specified, in which case the default headroom value here is unset. + */ + public static final Setting INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING = new Setting<>( + "indices.merge.disk.watermark.high.max_headroom", + (settings) -> { + // if the user explicitly set a value for the occupied disk space threshold, disable the implicit headroom value + if (INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.exists(settings)) { + return "-1"; + } else { + return CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING.get(settings).toString(); + } + }, + (s) -> ByteSizeValue.parseBytesSizeValue(s, "indices.merge.disk.watermark.high.max_headroom"), + new Setting.Validator<>() { + @Override + public void validate(ByteSizeValue value) {} + + @Override + public void validate(final ByteSizeValue value, final Map, Object> settings, boolean isPresent) { + if (isPresent) { + if (value.equals(ByteSizeValue.MINUS_ONE)) { + throw new IllegalArgumentException( + "setting a headroom value to less than 0 is not supported, use [null] value to unset" + ); + } + if (settings.get(USE_THREAD_POOL_MERGE_SCHEDULER_SETTING).equals(Boolean.FALSE)) { + throw new IllegalArgumentException( + "indices merge max headroom setting is only effective when [" + + USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey() + + "] is set to [true]" + ); + } + } + final RelativeByteSizeValue highWatermark = (RelativeByteSizeValue) settings.get(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING); + final ByteSizeValue highHeadroom = (ByteSizeValue) settings.get(INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING); + if (highWatermark.isAbsolute() && highHeadroom.equals(ByteSizeValue.MINUS_ONE) == false) { + throw new IllegalArgumentException( + "indices merge max headroom setting is set, but indices merge disk watermark value is not a relative value [" + + highWatermark.getStringRep() + + "]" + ); + } + } + + @Override + public Iterator> settings() { + List> res = List.of( + INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING, + INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING, + USE_THREAD_POOL_MERGE_SCHEDULER_SETTING + ); + return res.iterator(); + } + }, + Property.Dynamic, + Property.NodeScope + ); /** * Floor for IO write rate limit of individual merge tasks (we will never go any lower than this) */ @@ -52,11 +177,10 @@ public class ThreadPoolMergeExecutorService { /** * The merge tasks that are waiting execution. This does NOT include backlogged or currently executing merge tasks. * For instance, this can be empty while there are backlogged merge tasks awaiting re-enqueuing. + * The budget (estimation) for a merge task is the disk space (still) required for it to complete. As the merge progresses, + * its budget decreases (as the bytes already written have been incorporated into the filesystem stats about the used disk space). */ - private final PriorityBlockingQueue queuedMergeTasks = new PriorityBlockingQueue<>( - 64, - Comparator.comparingLong(MergeTask::estimatedMergeSize) - ); + private final MergeTaskPriorityBlockingQueue queuedMergeTasks = new MergeTaskPriorityBlockingQueue(); /** * The set of all merge tasks currently being executed by merge threads from the pool. * These are tracked notably in order to be able to update their disk IO throttle rate, after they have started, while executing. @@ -74,31 +198,45 @@ public class ThreadPoolMergeExecutorService { private final int maxConcurrentMerges; private final int concurrentMergesFloorLimitForThrottling; private final int concurrentMergesCeilLimitForThrottling; + private final AvailableDiskSpacePeriodicMonitor availableDiskSpacePeriodicMonitor; private final List mergeEventListeners = new CopyOnWriteArrayList<>(); public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService( ThreadPool threadPool, - Settings settings + ClusterSettings clusterSettings, + NodeEnvironment nodeEnvironment ) { - if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) { - return new ThreadPoolMergeExecutorService(threadPool); + if (clusterSettings.get(USE_THREAD_POOL_MERGE_SCHEDULER_SETTING)) { + return new ThreadPoolMergeExecutorService(threadPool, clusterSettings, nodeEnvironment); } else { + // register no-op setting update consumers so that setting validations work properly + // (some validations are bypassed if there are no update consumers registered), + // i.e. to reject watermark and max headroom updates if the thread pool merge scheduler is disabled + clusterSettings.addSettingsUpdateConsumer(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING, (ignored) -> {}); + clusterSettings.addSettingsUpdateConsumer(INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING, (ignored) -> {}); + clusterSettings.addSettingsUpdateConsumer(INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING, (ignored) -> {}); return null; } } - private ThreadPoolMergeExecutorService(ThreadPool threadPool) { + private ThreadPoolMergeExecutorService(ThreadPool threadPool, ClusterSettings clusterSettings, NodeEnvironment nodeEnvironment) { this.executorService = threadPool.executor(ThreadPool.Names.MERGE); this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax(); // the intent here is to throttle down whenever we submit a task and no other task is running this.concurrentMergesFloorLimitForThrottling = 2; this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2; assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling; + this.availableDiskSpacePeriodicMonitor = startDiskSpaceMonitoring( + threadPool, + nodeEnvironment.dataPaths(), + clusterSettings, + (availableDiskSpaceByteSize) -> this.queuedMergeTasks.updateBudget(availableDiskSpaceByteSize.getBytes()) + ); } boolean submitMergeTask(MergeTask mergeTask) { - assert mergeTask.isRunning() == false; + assert mergeTask.hasStartedRunning() == false; // first enqueue the runnable that runs exactly one merge task (the smallest it can find) if (enqueueMergeTaskExecution() == false) { // if the thread pool cannot run the merge, just abort it @@ -137,6 +275,7 @@ boolean submitMergeTask(MergeTask mergeTask) { } void reEnqueueBackloggedMergeTask(MergeTask mergeTask) { + assert mergeTask.hasStartedRunning() == false; enqueueMergeTask(mergeTask); } @@ -144,12 +283,12 @@ private void enqueueMergeTask(MergeTask mergeTask) { // To ensure that for a given merge onMergeQueued is called before onMergeAborted or onMergeCompleted, we call onMergeQueued // before adding the merge task to the queue. Adding to the queue should not fail. mergeEventListeners.forEach(l -> l.onMergeQueued(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes())); - boolean added = queuedMergeTasks.add(mergeTask); + boolean added = queuedMergeTasks.enqueue(mergeTask); assert added; } public boolean allDone() { - return queuedMergeTasks.isEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L; + return queuedMergeTasks.isQueueEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L; } /** @@ -162,10 +301,13 @@ private boolean enqueueMergeTaskExecution() { // one such runnable always executes a SINGLE merge task from the queue // this is important for merge queue statistics, i.e. the executor's queue size represents the current amount of merges while (true) { - MergeTask smallestMergeTask; + PriorityBlockingQueueWithBudget.ElementWithReleasableBudget smallestMergeTaskWithReleasableBudget; try { - // will block if there are backlogged merges until they're enqueued again - smallestMergeTask = queuedMergeTasks.take(); + // Will block if there are backlogged merges until they're enqueued again + // (for e.g. if the per-shard concurrent merges count limit is reached). + // Will also block if there is insufficient budget (i.e. estimated available disk space + // for the smallest merge task to run to completion) + smallestMergeTaskWithReleasableBudget = queuedMergeTasks.take(); } catch (InterruptedException e) { // An active worker thread has been interrupted while waiting for backlogged merges to be re-enqueued. // In this case, we terminate the worker thread promptly and forget about the backlogged merges. @@ -175,18 +317,24 @@ private boolean enqueueMergeTaskExecution() { // is also drained, so any queued merge tasks are also forgotten. break; } - // let the task's scheduler decide if it can actually run the merge task now - ThreadPoolMergeScheduler.Schedule schedule = smallestMergeTask.schedule(); - if (schedule == RUN) { - runMergeTask(smallestMergeTask); - break; - } else if (schedule == ABORT) { - abortMergeTask(smallestMergeTask); - break; - } else { - assert schedule == BACKLOG; - // the merge task is backlogged by the merge scheduler, try to get the next smallest one - // it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run + try (var ignored = smallestMergeTaskWithReleasableBudget) { + MergeTask smallestMergeTask = smallestMergeTaskWithReleasableBudget.element(); + // let the task's scheduler decide if it can actually run the merge task now + ThreadPoolMergeScheduler.Schedule schedule = smallestMergeTask.schedule(); + if (schedule == RUN) { + runMergeTask(smallestMergeTask); + break; + } else if (schedule == ABORT) { + abortMergeTask(smallestMergeTask); + break; + } else { + assert schedule == BACKLOG; + // The merge task is backlogged by the merge scheduler, try to get the next smallest one. + // It's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when + // itself decides that the merge task could be run. Note that it is possible that this merge + // task is re-enqueued and re-took before the budget hold-up here is released upon the next + // {@link PriorityBlockingQueueWithBudget#updateBudget} invocation. + } } } }); @@ -199,7 +347,7 @@ private boolean enqueueMergeTaskExecution() { } private void runMergeTask(MergeTask mergeTask) { - assert mergeTask.isRunning() == false; + assert mergeTask.hasStartedRunning() == false; boolean added = runningMergeTasks.add(mergeTask); assert added : "starting merge task [" + mergeTask + "] registered as already running"; try { @@ -218,7 +366,7 @@ private void runMergeTask(MergeTask mergeTask) { } private void abortMergeTask(MergeTask mergeTask) { - assert mergeTask.isRunning() == false; + assert mergeTask.hasStartedRunning() == false; assert runningMergeTasks.contains(mergeTask) == false; try { mergeTask.abort(); @@ -230,6 +378,331 @@ private void abortMergeTask(MergeTask mergeTask) { } } + /** + * Start monitoring the available disk space, and update the available budget for running merge tasks + * Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST + * available disk space. In this case, merges will NOT be blocked for shards on data paths with insufficient available + * disk space, as long as a single data path has enough available disk space to run merges for any shards that it stores + * (i.e. multiple data path is not really supported when blocking merges due to insufficient available disk space + * (but nothing blows up either, if using multiple data paths)) + */ + static AvailableDiskSpacePeriodicMonitor startDiskSpaceMonitoring( + ThreadPool threadPool, + NodeEnvironment.DataPath[] dataPaths, + ClusterSettings clusterSettings, + Consumer availableDiskSpaceUpdateConsumer + ) { + AvailableDiskSpacePeriodicMonitor availableDiskSpacePeriodicMonitor = new AvailableDiskSpacePeriodicMonitor( + dataPaths, + threadPool, + clusterSettings.get(INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING), + clusterSettings.get(INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING), + clusterSettings.get(INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING), + availableDiskSpaceByteSize -> { + if (availableDiskSpaceByteSize.equals(ByteSizeValue.MINUS_ONE)) { + // The merge executor is currently unaware of the available disk space because of an error. + // Merges are NOT blocked if the available disk space is insufficient. + availableDiskSpaceUpdateConsumer.accept(ByteSizeValue.ofBytes(Long.MAX_VALUE)); + } else { + availableDiskSpaceUpdateConsumer.accept(availableDiskSpaceByteSize); + } + } + ); + if (availableDiskSpacePeriodicMonitor.isScheduled() == false) { + // in case the disk space monitor starts off as disabled, then make sure that merging is NOT blocked + // (in the other case, merging IS blocked until the first update for the available disk space) + availableDiskSpaceUpdateConsumer.accept(ByteSizeValue.ofBytes(Long.MAX_VALUE)); + } + clusterSettings.addSettingsUpdateConsumer( + INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING, + availableDiskSpacePeriodicMonitor::setHighStageWatermark + ); + clusterSettings.addSettingsUpdateConsumer( + INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING, + availableDiskSpacePeriodicMonitor::setHighStageMaxHeadroom + ); + clusterSettings.addSettingsUpdateConsumer( + INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING, + availableDiskSpacePeriodicMonitor::setCheckInterval + ); + return availableDiskSpacePeriodicMonitor; + } + + static class AvailableDiskSpacePeriodicMonitor implements Closeable { + private static final Logger LOGGER = LogManager.getLogger(AvailableDiskSpacePeriodicMonitor.class); + private final NodeEnvironment.DataPath[] dataPaths; + private final ThreadPool threadPool; + private volatile RelativeByteSizeValue highStageWatermark; + private volatile ByteSizeValue highStageMaxHeadroom; + private volatile TimeValue checkInterval; + private final Consumer updateConsumer; + private volatile boolean closed; + private volatile Scheduler.Cancellable monitor; + + AvailableDiskSpacePeriodicMonitor( + NodeEnvironment.DataPath[] dataPaths, + ThreadPool threadPool, + RelativeByteSizeValue highStageWatermark, + ByteSizeValue highStageMaxHeadroom, + TimeValue checkInterval, + Consumer updateConsumer + ) { + this.dataPaths = dataPaths; + this.threadPool = threadPool; + this.highStageWatermark = highStageWatermark; + this.highStageMaxHeadroom = highStageMaxHeadroom; + this.checkInterval = checkInterval; + this.updateConsumer = updateConsumer; + this.closed = false; + reschedule(); + } + + void setCheckInterval(TimeValue checkInterval) { + this.checkInterval = checkInterval; + reschedule(); + } + + void setHighStageWatermark(RelativeByteSizeValue highStageWatermark) { + this.highStageWatermark = highStageWatermark; + } + + void setHighStageMaxHeadroom(ByteSizeValue highStageMaxHeadroom) { + this.highStageMaxHeadroom = highStageMaxHeadroom; + } + + private synchronized void reschedule() { + if (monitor != null) { + monitor.cancel(); + } + if (closed == false && checkInterval.duration() > 0) { + // do an eager run, + // in order to increase responsiveness in case the period is long and something blocks waiting for the first update + threadPool.generic().execute(this::run); + monitor = threadPool.scheduleWithFixedDelay(this::run, checkInterval, threadPool.generic()); + } else { + monitor = null; + } + } + + boolean isScheduled() { + return monitor != null && closed == false; + } + + @Override + public void close() throws IOException { + closed = true; + reschedule(); + } + + private void run() { + if (closed) { + return; + } + FsInfo.Path mostAvailablePath = null; + IOException fsInfoException = null; + for (NodeEnvironment.DataPath dataPath : dataPaths) { + try { + FsInfo.Path fsInfo = getFSInfo(dataPath); // uncached + if (mostAvailablePath == null || mostAvailablePath.getAvailable().getBytes() < fsInfo.getAvailable().getBytes()) { + mostAvailablePath = fsInfo; + } + } catch (IOException e) { + if (fsInfoException == null) { + fsInfoException = e; + } else { + fsInfoException.addSuppressed(e); + } + } + } + if (fsInfoException != null) { + LOGGER.warn("unexpected exception reading filesystem info", fsInfoException); + } + if (mostAvailablePath == null) { + LOGGER.error("Cannot read filesystem info for node data paths " + Arrays.toString(dataPaths)); + updateConsumer.accept(ByteSizeValue.MINUS_ONE); + return; + } + long mostAvailableDiskSpaceBytes = mostAvailablePath.getAvailable().getBytes(); + // subtract the configured free disk space threshold + mostAvailableDiskSpaceBytes -= getFreeBytesThreshold(mostAvailablePath.getTotal(), highStageWatermark, highStageMaxHeadroom) + .getBytes(); + // clamp available space to 0 + long maxMergeSizeLimit = Math.max(0L, mostAvailableDiskSpaceBytes); + updateConsumer.accept(ByteSizeValue.ofBytes(maxMergeSizeLimit)); + } + + private static ByteSizeValue getFreeBytesThreshold( + ByteSizeValue total, + RelativeByteSizeValue watermark, + ByteSizeValue maxHeadroom + ) { + // If bytes are given, they can be readily returned as free bytes. + // If percentages are given, we need to calculate the free bytes. + if (watermark.isAbsolute()) { + return watermark.getAbsolute(); + } + return ByteSizeValue.subtract(total, watermark.calculateValue(total, maxHeadroom)); + } + } + + static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget { + MergeTaskPriorityBlockingQueue() { + // start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked) + // use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is + // updated according to the remaining disk space requirements of the currently running merge tasks + super(MergeTask::estimatedRemainingMergeSize, 0L); + } + + // exposed for tests + long getAvailableBudget() { + return super.availableBudget; + } + + // exposed for tests + MergeTask peekQueue() { + return enqueuedByBudget.peek(); + } + } + + /** + * Similar to a regular priority queue, but the {@link #take()} operation will also block if the smallest element + * (according to the specified "budget" function) is larger than an updatable limit budget. + */ + static class PriorityBlockingQueueWithBudget { + private final ToLongFunction budgetFunction; + protected final PriorityQueue enqueuedByBudget; + private final IdentityHashMap unreleasedBudgetPerElement; + private final ReentrantLock lock; + private final Condition elementAvailable; + protected long availableBudget; + + PriorityBlockingQueueWithBudget(ToLongFunction budgetFunction, long initialAvailableBudget) { + this.budgetFunction = budgetFunction; + this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(budgetFunction)); + this.unreleasedBudgetPerElement = new IdentityHashMap<>(); + this.lock = new ReentrantLock(); + this.elementAvailable = lock.newCondition(); + this.availableBudget = initialAvailableBudget; + } + + boolean enqueue(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + enqueuedByBudget.offer(e); + elementAvailable.signal(); + } finally { + lock.unlock(); + } + return true; + } + + /** + * Dequeues the smallest element (according to the specified "budget" function) if its budget is below the available limit. + * This method invocation blocks if the queue is empty or the element's budget is above the available limit. + */ + ElementWithReleasableBudget take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + E peek; + long peekBudget; + // blocks until the smallest budget element fits the currently available budget + while ((peek = enqueuedByBudget.peek()) == null || (peekBudget = budgetFunction.applyAsLong(peek)) > availableBudget) { + elementAvailable.await(); + } + // deducts and holds up that element's budget from the available budget + return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget); + } finally { + lock.unlock(); + } + } + + /** + * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements + * that are still in use. The budget of in-use elements is also updated (by re-applying the budget function). + * The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element + * is over this newly computed available budget. + */ + void updateBudget(long availableBudget) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + this.availableBudget = availableBudget; + // update the per-element budget (these are all the elements that are using any budget) + unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element())); + // available budget is decreased by the used per-element budget (for all dequeued elements that are still in use) + this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum(); + elementAvailable.signalAll(); + } finally { + lock.unlock(); + } + } + + boolean isQueueEmpty() { + return enqueuedByBudget.isEmpty(); + } + + int queueSize() { + return enqueuedByBudget.size(); + } + + private ElementWithReleasableBudget newElementWithReleasableBudget(E element, long budget) { + ElementWithReleasableBudget elementWithReleasableBudget = new ElementWithReleasableBudget(element); + assert this.lock.isHeldByCurrentThread(); + // the taken element holds up some budget + var prev = this.unreleasedBudgetPerElement.put(elementWithReleasableBudget, budget); + assert prev == null; + this.availableBudget -= budget; + assert this.availableBudget >= 0L; + return elementWithReleasableBudget; + } + + private void release(ElementWithReleasableBudget elementWithReleasableBudget) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + assert elementWithReleasableBudget.isClosed() == false; + // when the taken element is not used anymore, it will not influence subsequent computations for available budget, + // but its allotted budget is not yet released + var val = unreleasedBudgetPerElement.remove(elementWithReleasableBudget); + assert val != null; + } finally { + lock.unlock(); + } + } + + private boolean isReleased(ElementWithReleasableBudget elementWithReleasableBudget) { + return unreleasedBudgetPerElement.containsKey(elementWithReleasableBudget) == false; + } + + class ElementWithReleasableBudget implements Releasable { + private final E element; + + private ElementWithReleasableBudget(E element) { + this.element = element; + } + + /** + * Must be invoked when the caller is done with the element that it previously took from the queue. + * The budget it's holding is not immediately released, but the next time {@link #updateBudget(long)} + * is invoked this element's budget won't deduct from the total available. + */ + @Override + public void close() { + PriorityBlockingQueueWithBudget.this.release(this); + } + + boolean isClosed() { + return PriorityBlockingQueueWithBudget.this.isReleased(this); + } + + E element() { + return element; + } + } + } + private static long newTargetIORateBytesPerSec( long currentTargetIORateBytesPerSec, int currentlySubmittedIOThrottledMergeTasks, @@ -302,8 +775,13 @@ Set getRunningMergeTasks() { } // exposed for tests - PriorityBlockingQueue getQueuedMergeTasks() { - return queuedMergeTasks; + int getMergeTasksQueueLength() { + return queuedMergeTasks.queueSize(); + } + + // exposed for tests + long getDiskSpaceAvailableForNewMergeTasks() { + return queuedMergeTasks.getAvailableBudget(); } // exposed for tests and stats @@ -315,4 +793,9 @@ long getTargetIORateBytesPerSec() { int getMaxConcurrentMerges() { return maxConcurrentMerges; } + + @Override + public void close() throws IOException { + availableDiskSpacePeriodicMonitor.close(); + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 33ef06699c8c7..78a9695bea540 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -55,7 +55,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final PriorityQueue backloggedMergeTasks = new PriorityQueue<>( 16, - Comparator.comparingLong(MergeTask::estimatedMergeSize) + Comparator.comparingLong(MergeTask::estimatedRemainingMergeSize) ); private final Map runningMergeTasks = new HashMap<>(); // set when incoming merges should be throttled (i.e. restrict the indexing rate) @@ -266,7 +266,7 @@ private void checkMergeTaskThrottling() { // exposed for tests // synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically synchronized Schedule schedule(MergeTask mergeTask) { - assert mergeTask.isRunning() == false; + assert mergeTask.hasStartedRunning() == false; if (closed) { // do not run or backlog tasks when closing the merge scheduler, instead abort them return Schedule.ABORT; @@ -280,6 +280,7 @@ synchronized Schedule schedule(MergeTask mergeTask) { assert added : "starting merge task [" + mergeTask + "] registered as already running"; return Schedule.RUN; } else { + assert mergeTask.hasStartedRunning() == false; backloggedMergeTasks.add(mergeTask); return Schedule.BACKLOG; } @@ -403,8 +404,14 @@ public void setIORateLimit(long ioRateLimitBytesPerSec) { this.rateLimiter.setMBPerSec(ByteSizeValue.ofBytes(ioRateLimitBytesPerSec).getMbFrac()); } - public boolean isRunning() { - return mergeStartTimeNS.get() > 0L; + /** + * Returns {@code true} if this task is currently running, or was run in the past. + * An aborted task (see {@link #abort()}) is considered as NOT run. + */ + public boolean hasStartedRunning() { + boolean isRunning = mergeStartTimeNS.get() > 0L; + assert isRunning != false || rateLimiter.getTotalBytesWritten() == 0L; + return isRunning; } /** @@ -415,7 +422,7 @@ public boolean isRunning() { */ @Override public void run() { - assert isRunning() == false; + assert hasStartedRunning() == false; assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) : "runNowOrBacklog must be invoked before actually running the merge task"; try { @@ -480,7 +487,7 @@ public void run() { * (by the {@link org.apache.lucene.index.IndexWriter}) to any subsequent merges. */ void abort() { - assert isRunning() == false; + assert hasStartedRunning() == false; assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) == false : "cannot abort a merge task that's already running"; if (verbose()) { @@ -509,10 +516,17 @@ void abort() { } } - long estimatedMergeSize() { + /** + * Before the merge task started running, this returns the estimated required disk space for the merge to complete + * (i.e. the estimated disk space size of the resulting segment following the merge). + * While the merge is running, the returned estimation is updated to take into account the data that's already been written. + * After the merge completes, the estimation returned here should ideally be close to "0". + */ + long estimatedRemainingMergeSize() { // TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges, // or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized? - return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); + long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); + return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten()); } public long getMergeMemoryEstimateBytes() { diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 73747bc798d30..7825b3513a6b3 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -295,10 +295,6 @@ protected void doStart() { IndicesService(IndicesServiceBuilder builder) { this.settings = builder.settings; this.threadPool = builder.threadPool; - this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( - threadPool, - settings - ); this.pluginsService = builder.pluginsService; this.nodeEnv = builder.nodeEnv; this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE) @@ -321,6 +317,11 @@ protected void doStart() { this.bigArrays = builder.bigArrays; this.scriptService = builder.scriptService; this.clusterService = builder.clusterService; + this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( + threadPool, + clusterService.getClusterSettings(), + nodeEnv + ); this.projectResolver = builder.projectResolver; this.client = builder.client; this.idFieldDataEnabled = INDICES_ID_FIELD_DATA_ENABLED_SETTING.get(clusterService.getSettings()); @@ -368,7 +369,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon indicesFieldDataCache, cacheCleaner, indicesRequestCache, - indicesQueryCache + indicesQueryCache, + threadPoolMergeExecutorService ); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 043b982ad4344..f53d01d4772a1 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -194,13 +194,17 @@ public void setUp() throws Exception { emptyMap() ); threadPool = new TestThreadPool("test"); - threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings); circuitBreakerService = new NoneCircuitBreakerService(); PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST); scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap(), () -> 1L); - clusterService = ClusterServiceUtils.createClusterService(threadPool); + clusterService = ClusterServiceUtils.createClusterService(threadPool, ClusterSettings.createBuiltInClusterSettings(settings)); nodeEnvironment = new NodeEnvironment(settings, environment); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( + threadPool, + clusterService.getClusterSettings(), + nodeEnvironment + ); mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext()); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java new file mode 100644 index 0000000000000..97943101758fe --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java @@ -0,0 +1,1023 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.tests.mockfile.FilterFileSystemProvider; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.PathUtils; +import org.elasticsearch.core.PathUtilsForTesting; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.nio.file.attribute.FileAttributeView; +import java.nio.file.attribute.FileStoreAttributeView; +import java.nio.file.spi.FileSystemProvider; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.ABORT; +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.BACKLOG; +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.RUN; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase { + + private static TestMockFileStore aFileStore = new TestMockFileStore("mocka"); + private static TestMockFileStore bFileStore = new TestMockFileStore("mockb"); + private static String aPathPart; + private static String bPathPart; + private static int mergeExecutorThreadCount; + private static Settings settings; + private static TestCapturingThreadPool testThreadPool; + private static NodeEnvironment nodeEnvironment; + + @BeforeClass + public static void installMockUsableSpaceFS() throws Exception { + FileSystem current = PathUtils.getDefaultFileSystem(); + aPathPart = "a-" + randomUUID(); + bPathPart = "b-" + randomUUID(); + FileSystemProvider mock = new TestMockUsableSpaceFileSystemProvider(current); + PathUtilsForTesting.installMock(mock.getFileSystem(null)); + Path path = PathUtils.get(createTempDir().toString()); + // use 2 data paths + String[] paths = new String[] { path.resolve(aPathPart).toString(), path.resolve(bPathPart).toString() }; + // some tests hold one merge thread blocked, and need at least one other runnable + mergeExecutorThreadCount = randomIntBetween(2, 9); + Settings.Builder settingsBuilder = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), path) + .putList(Environment.PATH_DATA_SETTING.getKey(), paths) + // the default of "5s" slows down testing + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "50ms") + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount); + if (randomBoolean()) { + settingsBuilder.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true); + } + settings = settingsBuilder.build(); + testThreadPool = new TestCapturingThreadPool("test", settings); + nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); + } + + @AfterClass + public static void removeMockUsableSpaceFS() { + PathUtilsForTesting.teardown(); + aFileStore = null; + bFileStore = null; + testThreadPool.close(); + nodeEnvironment.close(); + } + + @After + public void cleanupThreadPool() { + testThreadPool.scheduledTasks.clear(); + } + + static class TestCapturingThreadPool extends TestThreadPool { + final List> scheduledTasks = new ArrayList<>(); + + TestCapturingThreadPool(String name, Settings settings) { + super(name, settings); + } + + @Override + public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, Executor executor) { + Cancellable cancellable = super.scheduleWithFixedDelay(command, interval, executor); + scheduledTasks.add(new Tuple<>(interval, cancellable)); + return cancellable; + } + } + + static class TestMockUsableSpaceFileSystemProvider extends FilterFileSystemProvider { + + TestMockUsableSpaceFileSystemProvider(FileSystem inner) { + super("mockusablespace://", inner); + } + + @Override + public FileStore getFileStore(Path path) { + if (path.toString().contains(path.getFileSystem().getSeparator() + aPathPart)) { + return aFileStore; + } else { + assert path.toString().contains(path.getFileSystem().getSeparator() + bPathPart); + return bFileStore; + } + } + } + + static class TestMockFileStore extends FileStore { + + public volatile long totalSpace; + public volatile long freeSpace; + public volatile long usableSpace; + public volatile boolean throwIoException; + + private final String desc; + + TestMockFileStore(String desc) { + this.desc = desc; + } + + @Override + public String type() { + return "mock"; + } + + @Override + public String name() { + return desc; + } + + @Override + public String toString() { + return desc; + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public long getTotalSpace() throws IOException { + if (throwIoException) { + throw new IOException("Test IO Exception"); + } + return totalSpace; + } + + @Override + public long getUnallocatedSpace() throws IOException { + if (throwIoException) { + throw new IOException("Test IO Exception"); + } + return freeSpace; + } + + @Override + public long getUsableSpace() throws IOException { + if (throwIoException) { + throw new IOException("Test IO Exception"); + } + return usableSpace; + } + + @Override + public boolean supportsFileAttributeView(Class type) { + return false; + } + + @Override + public boolean supportsFileAttributeView(String name) { + return false; + } + + @Override + public V getFileStoreAttributeView(Class type) { + return null; + } + + @Override + public Object getAttribute(String attribute) { + return null; + } + } + + public void testAvailableDiskSpaceMonitorWithDefaultSettings() throws Exception { + // path "a" has lots of free space, and "b" has little + aFileStore.usableSpace = 100_000L; + aFileStore.totalSpace = aFileStore.usableSpace * 2; + bFileStore.usableSpace = 1_000L; + bFileStore.totalSpace = bFileStore.usableSpace * 2; + LinkedHashSet availableDiskSpaceUpdates = new LinkedHashSet<>(); + try ( + var diskSpacePeriodicMonitor = ThreadPoolMergeExecutorService.startDiskSpaceMonitoring( + testThreadPool, + nodeEnvironment.dataPaths(), + ClusterSettings.createBuiltInClusterSettings(settings), + (availableDiskSpace) -> { + synchronized (availableDiskSpaceUpdates) { + availableDiskSpaceUpdates.add(availableDiskSpace); + } + } + ) + ) { + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(1)); + // 100_000 (available) - 5% (default flood stage level) * 200_000 (total space) + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(90_000L)); + } + }); + // "b" now has more available space + bFileStore.usableSpace = 110_000L; + bFileStore.totalSpace = 130_000L; + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(2)); + // 110_000 (available) - 5% (default flood stage level) * 130_000 (total space) + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(103_500L)); + } + }); + // available space for "a" and "b" is below the limit => it's clamp down to "0" + aFileStore.usableSpace = 100L; + bFileStore.usableSpace = 1_000L; + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(3)); + // 1_000 (available) - 5% (default flood stage level) * 130_000 (total space) < 0 + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(0L)); + } + }); + } + } + + public void testDiskSpaceMonitorStartsAsDisabled() throws Exception { + aFileStore.usableSpace = randomLongBetween(1L, 100L); + aFileStore.totalSpace = randomLongBetween(1L, 100L); + aFileStore.throwIoException = randomBoolean(); + bFileStore.usableSpace = randomLongBetween(1L, 100L); + bFileStore.totalSpace = randomLongBetween(1L, 100L); + bFileStore.throwIoException = randomBoolean(); + Settings.Builder settingsBuilder = Settings.builder().put(settings); + if (randomBoolean()) { + settingsBuilder.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0"); + } else { + settingsBuilder.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s"); + } + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); + LinkedHashSet availableDiskSpaceUpdates = new LinkedHashSet<>(); + try ( + var diskSpacePeriodicMonitor = ThreadPoolMergeExecutorService.startDiskSpaceMonitoring( + testThreadPool, + nodeEnvironment.dataPaths(), + clusterSettings, + (availableDiskSpace) -> { + synchronized (availableDiskSpaceUpdates) { + availableDiskSpaceUpdates.add(availableDiskSpace); + } + } + ) + ) { + assertThat(diskSpacePeriodicMonitor.isScheduled(), is(false)); + assertThat(availableDiskSpaceUpdates.size(), is(1)); + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(Long.MAX_VALUE)); + // updating monitoring interval should enable the monitor + String intervalSettingValue = randomFrom("1s", "123ms", "5nanos", "2h"); + clusterSettings.applySettings( + Settings.builder() + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), intervalSettingValue) + .build() + ); + assertThat(diskSpacePeriodicMonitor.isScheduled(), is(true)); + assertThat(testThreadPool.scheduledTasks.size(), is(1)); + assertThat( + testThreadPool.scheduledTasks.getLast().v1(), + is( + TimeValue.parseTimeValue( + intervalSettingValue, + ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey() + ) + ) + ); + } + aFileStore.throwIoException = false; + bFileStore.throwIoException = false; + } + + public void testAvailableDiskSpaceMonitorWhenFileSystemStatErrors() throws Exception { + aFileStore.usableSpace = randomLongBetween(1L, 100L); + aFileStore.totalSpace = randomLongBetween(1L, 100L); + bFileStore.usableSpace = randomLongBetween(1L, 100L); + bFileStore.totalSpace = randomLongBetween(1L, 100L); + boolean aErrorsFirst = randomBoolean(); + if (aErrorsFirst) { + // the "a" file system will error when collecting stats + aFileStore.throwIoException = true; + bFileStore.throwIoException = false; + } else { + aFileStore.throwIoException = false; + bFileStore.throwIoException = true; + } + LinkedHashSet availableDiskSpaceUpdates = new LinkedHashSet<>(); + try ( + var diskSpacePeriodicMonitor = ThreadPoolMergeExecutorService.startDiskSpaceMonitoring( + testThreadPool, + nodeEnvironment.dataPaths(), + ClusterSettings.createBuiltInClusterSettings(settings), + (availableDiskSpace) -> { + synchronized (availableDiskSpaceUpdates) { + availableDiskSpaceUpdates.add(availableDiskSpace); + } + } + ) + ) { + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(1)); + if (aErrorsFirst) { + // uses the stats from "b" + assertThat( + availableDiskSpaceUpdates.getLast().getBytes(), + // the default 5% (same as flood stage level) + is(Math.max(bFileStore.usableSpace - bFileStore.totalSpace / 20, 0L)) + ); + } else { + // uses the stats from "a" + assertThat( + availableDiskSpaceUpdates.getLast().getBytes(), + // the default 5% (same as flood stage level) + is(Math.max(aFileStore.usableSpace - aFileStore.totalSpace / 20, 0L)) + ); + } + } + }); + if (aErrorsFirst) { + // the "b" file system will also now error when collecting stats + bFileStore.throwIoException = true; + } else { + // the "a" file system will also now error when collecting stats + aFileStore.throwIoException = true; + } + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(2)); + // consider the available disk space as unlimited when no fs stats can be collected + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(Long.MAX_VALUE)); + } + }); + if (aErrorsFirst) { + // "a" fs stats collection recovered + aFileStore.throwIoException = false; + } else { + // "b" fs stats collection recovered + bFileStore.throwIoException = false; + } + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(3)); + if (aErrorsFirst) { + // uses the stats from "a" + assertThat( + availableDiskSpaceUpdates.getLast().getBytes(), + // the default 5% (same as flood stage level) + is(Math.max(aFileStore.usableSpace - aFileStore.totalSpace / 20, 0L)) + ); + } else { + // uses the stats from "b" + assertThat( + availableDiskSpaceUpdates.getLast().getBytes(), + // the default 5% (same as flood stage level) + is(Math.max(bFileStore.usableSpace - bFileStore.totalSpace / 20, 0L)) + ); + } + } + }); + } + aFileStore.throwIoException = false; + bFileStore.throwIoException = false; + } + + public void testAvailableDiskSpaceMonitorSettingsUpdate() throws Exception { + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); + // path "b" has more usable (available) space, but path "a" has more total space + aFileStore.usableSpace = 900_000L; + aFileStore.totalSpace = 1_200_000L; + bFileStore.usableSpace = 1_000_000L; + bFileStore.totalSpace = 1_100_000L; + LinkedHashSet availableDiskSpaceUpdates = new LinkedHashSet<>(); + try ( + var diskSpacePeriodicMonitor = ThreadPoolMergeExecutorService.startDiskSpaceMonitoring( + testThreadPool, + nodeEnvironment.dataPaths(), + clusterSettings, + (availableDiskSpace) -> { + synchronized (availableDiskSpaceUpdates) { + availableDiskSpaceUpdates.add(availableDiskSpace); + } + } + ) + ) { + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(1)); + // 1_000_000 (available) - 5% (default flood stage level) * 1_100_000 (total space) + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(945_000L)); + } + }, 5, TimeUnit.SECONDS); + // updated the ration for the watermark + clusterSettings.applySettings( + Settings.builder().put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "90%").build() + ); + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(2)); + // 1_000_000 (available) - 10% (indices.merge.disk.watermark.high) * 1_100_000 (total space) + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(890_000L)); + } + }, 5, TimeUnit.SECONDS); + // absolute value for the watermark limit + clusterSettings.applySettings( + Settings.builder().put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "3000b").build() + ); + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(3)); + // 1_000_000 (available) - 3_000 (indices.merge.disk.watermark.high) + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(997_000L)); + } + }, 5, TimeUnit.SECONDS); + // headroom value that takes priority over the watermark + clusterSettings.applySettings( + Settings.builder() + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "50%") + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING.getKey(), "11111b") + .build() + ); + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(4)); + // 1_000_000 (available) - 11_111 (indices.merge.disk.watermark.high) + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(988_889L)); + } + }, 5, TimeUnit.SECONDS); + // watermark limit that takes priority over the headroom + clusterSettings.applySettings( + Settings.builder() + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "98%") + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING.getKey(), "22222b") + .build() + ); + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(5)); + // 1_000_000 (available) - 2% (indices.merge.disk.watermark.high) * 1_100_000 (total space) + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(978_000L)); + } + }, 5, TimeUnit.SECONDS); + // headroom takes priority over the default watermark of 95% + clusterSettings.applySettings( + Settings.builder() + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING.getKey(), "22222b") + .build() + ); + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(6)); + // 1_000_000 (available) - 22_222 + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(977_778L)); + } + }, 5, TimeUnit.SECONDS); + // watermark from routing allocation takes priority + clusterSettings.applySettings( + Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "99%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING.getKey(), "2b") + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING.getKey(), "22222b") + .build() + ); + assertBusy(() -> { + synchronized (availableDiskSpaceUpdates) { + assertThat(availableDiskSpaceUpdates.size(), is(7)); + // 1_000_000 (available) - 1% (cluster.routing.allocation.disk.watermark.flood_stage) * 1_100_000 (total space) + assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(989_000L)); + } + }, 5, TimeUnit.SECONDS); + } + } + + public void testAbortingOrRunningMergeTaskHoldsUpBudget() throws Exception { + aFileStore.totalSpace = randomLongBetween(1_000L, 10_000L); + bFileStore.totalSpace = randomLongBetween(1_000L, 10_000L); + aFileStore.usableSpace = randomLongBetween(900L, aFileStore.totalSpace); + bFileStore.usableSpace = randomLongBetween(900L, bFileStore.totalSpace); + boolean aHasMoreSpace = aFileStore.usableSpace > bFileStore.usableSpace; + try ( + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService + .maybeCreateThreadPoolMergeExecutorService( + testThreadPool, + ClusterSettings.createBuiltInClusterSettings(settings), + nodeEnvironment + ) + ) { + assert threadPoolMergeExecutorService != null; + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), greaterThanOrEqualTo(1)); + // assumes the 5% default value for the remaining space watermark + final long availableInitialBudget = aHasMoreSpace + ? aFileStore.usableSpace - aFileStore.totalSpace / 20 + : bFileStore.usableSpace - bFileStore.totalSpace / 20; + final AtomicLong expectedAvailableBudget = new AtomicLong(availableInitialBudget); + // wait for the merge scheduler to learn about the available disk space + assertBusy( + () -> assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())) + ); + ThreadPoolMergeScheduler.MergeTask stallingMergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + long taskBudget = randomLongBetween(1L, expectedAvailableBudget.get()); + when(stallingMergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget); + when(stallingMergeTask.schedule()).thenReturn(randomFrom(RUN, ABORT)); + CountDownLatch testDoneLatch = new CountDownLatch(1); + doAnswer(mock -> { + // wait to be signalled before completing (this holds up budget) + testDoneLatch.await(); + return null; + }).when(stallingMergeTask).run(); + doAnswer(mock -> { + // wait to be signalled before completing (this holds up budget) + testDoneLatch.await(); + return null; + }).when(stallingMergeTask).abort(); + threadPoolMergeExecutorService.submitMergeTask(stallingMergeTask); + // assert the merge task is holding up disk space budget + expectedAvailableBudget.set(expectedAvailableBudget.get() - taskBudget); + assertBusy( + () -> assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())) + ); + // double check that submitting a runnable merge task under budget works correctly + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(0L, expectedAvailableBudget.get())); + when(mergeTask.schedule()).thenReturn(RUN); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + assertBusy(() -> { + verify(mergeTask).schedule(); + verify(mergeTask).run(); + verify(mergeTask, times(0)).abort(); + }); + // let the test finish + testDoneLatch.countDown(); + assertBusy(() -> { + // available budget is back to the initial value + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(availableInitialBudget)); + if (stallingMergeTask.schedule() == RUN) { + verify(stallingMergeTask).run(); + verify(stallingMergeTask, times(0)).abort(); + } else { + verify(stallingMergeTask).abort(); + verify(stallingMergeTask, times(0)).run(); + } + assertThat(threadPoolMergeExecutorService.allDone(), is(true)); + }); + } + } + + public void testBackloggedMergeTasksDoNotHoldUpBudget() throws Exception { + aFileStore.totalSpace = randomLongBetween(1_000L, 10_000L); + bFileStore.totalSpace = randomLongBetween(1_000L, 10_000L); + aFileStore.usableSpace = randomLongBetween(900L, aFileStore.totalSpace); + bFileStore.usableSpace = randomLongBetween(900L, bFileStore.totalSpace); + boolean aHasMoreSpace = aFileStore.usableSpace > bFileStore.usableSpace; + try ( + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService + .maybeCreateThreadPoolMergeExecutorService( + testThreadPool, + ClusterSettings.createBuiltInClusterSettings(settings), + nodeEnvironment + ) + ) { + assert threadPoolMergeExecutorService != null; + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), greaterThanOrEqualTo(1)); + // assumes the 5% default value for the remaining space watermark + final long availableInitialBudget = aHasMoreSpace + ? aFileStore.usableSpace - aFileStore.totalSpace / 20 + : bFileStore.usableSpace - bFileStore.totalSpace / 20; + final AtomicLong expectedAvailableBudget = new AtomicLong(availableInitialBudget); + assertBusy( + () -> assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())) + ); + long backloggedMergeTaskDiskSpaceBudget = randomLongBetween(1L, expectedAvailableBudget.get()); + CountDownLatch testDoneLatch = new CountDownLatch(1); + // take care that there's still at least one thread available to run merges + int maxBlockingTasksToSubmit = mergeExecutorThreadCount - 1; + // first maybe submit some running or aborting merge tasks that hold up some budget while running or aborting + List runningMergeTasks = new ArrayList<>(); + List abortingMergeTasks = new ArrayList<>(); + while (expectedAvailableBudget.get() - backloggedMergeTaskDiskSpaceBudget > 0L + && maxBlockingTasksToSubmit-- > 0 + && randomBoolean()) { + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + long taskBudget = randomLongBetween(1L, expectedAvailableBudget.get() - backloggedMergeTaskDiskSpaceBudget); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget); + when(mergeTask.schedule()).thenReturn(randomFrom(RUN, ABORT)); + // this task runs/aborts, and it's going to hold up some budget for it + expectedAvailableBudget.set(expectedAvailableBudget.get() - taskBudget); + // this task will hold up budget because it blocks when it runs (to simulate it running for a long time) + doAnswer(mock -> { + // wait to be signalled before completing (this holds up budget) + testDoneLatch.await(); + return null; + }).when(mergeTask).run(); + doAnswer(mock -> { + // wait to be signalled before completing (this holds up budget) + testDoneLatch.await(); + return null; + }).when(mergeTask).abort(); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + if (mergeTask.schedule() == RUN) { + runningMergeTasks.add(mergeTask); + } else { + abortingMergeTasks.add(mergeTask); + } + } + assertBusy( + () -> assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())) + ); + // submit some backlogging merge tasks which should NOT hold up any budget + IdentityHashMap backloggingMergeTasksScheduleCountMap = new IdentityHashMap<>(); + int backloggingTaskCount = randomIntBetween(1, 10); + while (backloggingTaskCount-- > 0) { + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + long taskBudget = randomLongBetween(1L, backloggedMergeTaskDiskSpaceBudget); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget); + doAnswer(mock -> { + // task always backlogs (as long as the test hasn't finished) + if (testDoneLatch.getCount() > 0) { + return BACKLOG; + } else { + return RUN; + } + }).when(mergeTask).schedule(); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + backloggingMergeTasksScheduleCountMap.put(mergeTask, 1); + } + int checkRounds = randomIntBetween(1, 10); + // assert all backlogging merge tasks have been scheduled while possibly re-enqueued, + // BUT none run and none aborted, AND the available budget is left unchanged + while (true) { + assertBusy(() -> { + for (ThreadPoolMergeScheduler.MergeTask mergeTask : backloggingMergeTasksScheduleCountMap.keySet()) { + verify(mergeTask, times(backloggingMergeTasksScheduleCountMap.get(mergeTask))).schedule(); + } + for (ThreadPoolMergeScheduler.MergeTask mergeTask : backloggingMergeTasksScheduleCountMap.keySet()) { + verify(mergeTask, times(0)).run(); + verify(mergeTask, times(0)).abort(); + } + // budget hasn't changed! + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())); + }); + if (checkRounds-- <= 0) { + break; + } + // maybe re-enqueue backlogged merge task + for (ThreadPoolMergeScheduler.MergeTask backlogged : backloggingMergeTasksScheduleCountMap.keySet()) { + if (randomBoolean()) { + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(backlogged); + backloggingMergeTasksScheduleCountMap.put(backlogged, backloggingMergeTasksScheduleCountMap.get(backlogged) + 1); + } + } + // double check that submitting a runnable merge task under budget works correctly + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + long taskBudget = randomLongBetween(1L, backloggedMergeTaskDiskSpaceBudget); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget); + when(mergeTask.schedule()).thenReturn(RUN); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + assertBusy(() -> { + verify(mergeTask).schedule(); + verify(mergeTask).run(); + verify(mergeTask, times(0)).abort(); + }); + } + // let the test finish + testDoneLatch.countDown(); + for (ThreadPoolMergeScheduler.MergeTask backlogged : backloggingMergeTasksScheduleCountMap.keySet()) { + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(backlogged); + } + assertBusy(() -> { + for (ThreadPoolMergeScheduler.MergeTask mergeTask : runningMergeTasks) { + verify(mergeTask).run(); + } + for (ThreadPoolMergeScheduler.MergeTask mergeTask : abortingMergeTasks) { + verify(mergeTask).abort(); + } + for (ThreadPoolMergeScheduler.MergeTask backlogged : backloggingMergeTasksScheduleCountMap.keySet()) { + verify(backlogged).run(); + } + // available budget is restored + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(availableInitialBudget)); + assertThat(threadPoolMergeExecutorService.allDone(), is(true)); + }); + } + } + + public void testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution() throws Exception { + aFileStore.totalSpace = 150_000L; + bFileStore.totalSpace = 140_000L; + boolean aHasMoreSpace = randomBoolean(); + if (aHasMoreSpace) { + // "a" has more available space + aFileStore.usableSpace = 120_000L; + bFileStore.usableSpace = 100_000L; + } else { + // "b" has more available space + aFileStore.usableSpace = 90_000L; + bFileStore.usableSpace = 110_000L; + } + try ( + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService + .maybeCreateThreadPoolMergeExecutorService( + testThreadPool, + ClusterSettings.createBuiltInClusterSettings(settings), + nodeEnvironment + ) + ) { + assert threadPoolMergeExecutorService != null; + // wait for the budget to be updated from the available disk space + AtomicLong expectedAvailableBudget = new AtomicLong(); + assertBusy(() -> { + if (aHasMoreSpace) { + // 120_000L (available) - 5% (default flood stage level) * 150_000L (total) + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(112_500L)); + expectedAvailableBudget.set(112_500L); + } else { + // 110_000L (available) - 5% (default flood stage level) * 140_000L (total) + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(103_000L)); + expectedAvailableBudget.set(103_000L); + } + }); + List runningOrAbortingMergeTasksList = new ArrayList<>(); + List latchesBlockingMergeTasksList = new ArrayList<>(); + int submittedMergesCount = randomIntBetween(1, mergeExecutorThreadCount - 1); + // submit merge tasks that don't finish, in order to deplete the available budget + while (submittedMergesCount > 0 && expectedAvailableBudget.get() > 0L) { + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + doAnswer(mock -> { + Schedule schedule = randomFrom(Schedule.values()); + if (schedule == BACKLOG) { + testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + // re-enqueue backlogged merge task + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask); + }); + } + return schedule; + }).when(mergeTask).schedule(); + // let some task complete, which will NOT hold up any budget + if (randomBoolean()) { + // this task will NOT hold up any budget because it runs quickly (it is not blocked) + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(1_000L, 10_000L)); + } else { + CountDownLatch blockMergeTaskLatch = new CountDownLatch(1); + long taskBudget = randomLongBetween(1L, expectedAvailableBudget.get()); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget); + expectedAvailableBudget.set(expectedAvailableBudget.get() - taskBudget); + submittedMergesCount--; + // this task will hold up budget because it blocks when it runs (to simulate it running for a long time) + doAnswer(mock -> { + // wait to be signalled before completing (this holds up budget) + blockMergeTaskLatch.await(); + return null; + }).when(mergeTask).run(); + doAnswer(mock -> { + // wait to be signalled before completing (this holds up budget) + blockMergeTaskLatch.await(); + return null; + }).when(mergeTask).abort(); + runningOrAbortingMergeTasksList.add(mergeTask); + latchesBlockingMergeTasksList.add(blockMergeTaskLatch); + } + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + } + // currently running (or aborting) merge tasks have consumed some of the available budget + while (runningOrAbortingMergeTasksList.isEmpty() == false) { + assertBusy( + () -> assertThat( + threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), + is(expectedAvailableBudget.get()) + ) + ); + ThreadPoolMergeScheduler.MergeTask mergeTask1 = mock(ThreadPoolMergeScheduler.MergeTask.class); + when(mergeTask1.supportsIOThrottling()).thenReturn(randomBoolean()); + when(mergeTask1.schedule()).thenReturn(RUN); + ThreadPoolMergeScheduler.MergeTask mergeTask2 = mock(ThreadPoolMergeScheduler.MergeTask.class); + when(mergeTask2.supportsIOThrottling()).thenReturn(randomBoolean()); + when(mergeTask2.schedule()).thenReturn(RUN); + boolean task1Runs = randomBoolean(); + long currentAvailableBudget = expectedAvailableBudget.get(); + long overBudget = randomLongBetween(currentAvailableBudget + 1L, currentAvailableBudget + 100L); + long underBudget = randomLongBetween(0L, currentAvailableBudget); + if (task1Runs) { + // merge task 1 can run because it is under budget + when(mergeTask1.estimatedRemainingMergeSize()).thenReturn(underBudget); + // merge task 2 cannot run because it is over budget + when(mergeTask2.estimatedRemainingMergeSize()).thenReturn(overBudget); + } else { + // merge task 1 cannot run because it is over budget + when(mergeTask1.estimatedRemainingMergeSize()).thenReturn(overBudget); + // merge task 2 can run because it is under budget + when(mergeTask2.estimatedRemainingMergeSize()).thenReturn(underBudget); + } + threadPoolMergeExecutorService.submitMergeTask(mergeTask1); + threadPoolMergeExecutorService.submitMergeTask(mergeTask2); + assertBusy(() -> { + if (task1Runs) { + verify(mergeTask1).schedule(); + verify(mergeTask1).run(); + verify(mergeTask2, times(0)).schedule(); + verify(mergeTask2, times(0)).run(); + } else { + verify(mergeTask2).schedule(); + verify(mergeTask2).run(); + verify(mergeTask1, times(0)).schedule(); + verify(mergeTask1, times(0)).run(); + } + }); + // let one task finish from the bunch that is holding up budget + int index = randomIntBetween(0, runningOrAbortingMergeTasksList.size() - 1); + latchesBlockingMergeTasksList.remove(index).countDown(); + ThreadPoolMergeScheduler.MergeTask completedMergeTask = runningOrAbortingMergeTasksList.remove(index); + // update the expected budget given that one task now finished + expectedAvailableBudget.set(expectedAvailableBudget.get() + completedMergeTask.estimatedRemainingMergeSize()); + } + // let the test finish cleanly + assertBusy(() -> { + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(aHasMoreSpace ? 112_500L : 103_000L)); + assertThat(threadPoolMergeExecutorService.allDone(), is(true)); + }); + } + } + + public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable() throws Exception { + aFileStore.totalSpace = randomLongBetween(300L, 1_000L); + bFileStore.totalSpace = randomLongBetween(300L, 1_000L); + long grantedUsableSpaceBuffer = randomLongBetween(10L, 50L); + aFileStore.usableSpace = randomLongBetween(200L, aFileStore.totalSpace - grantedUsableSpaceBuffer); + bFileStore.usableSpace = randomLongBetween(200L, bFileStore.totalSpace - grantedUsableSpaceBuffer); + boolean aHasMoreSpace = aFileStore.usableSpace > bFileStore.usableSpace; + Settings.Builder settingsBuilder = Settings.builder().put(settings); + // change the watermark level, just for coverage and it's easier with the calculations + if (randomBoolean()) { + settingsBuilder.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "90%"); + } else { + settingsBuilder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "90%"); + } + try ( + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService + .maybeCreateThreadPoolMergeExecutorService( + testThreadPool, + ClusterSettings.createBuiltInClusterSettings(settingsBuilder.build()), + nodeEnvironment + ) + ) { + assert threadPoolMergeExecutorService != null; + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), greaterThanOrEqualTo(1)); + // uses the 10% watermark limit + final long availableInitialBudget = aHasMoreSpace + ? aFileStore.usableSpace - aFileStore.totalSpace / 10 + : bFileStore.usableSpace - bFileStore.totalSpace / 10; + final AtomicLong expectedAvailableBudget = new AtomicLong(availableInitialBudget); + assertBusy( + () -> assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())) + ); + // maybe let some merge tasks hold up some budget + // take care that there's still at least one thread available to run merges + int maxBlockingTasksToSubmit = mergeExecutorThreadCount - 1; + // first maybe submit some running or aborting merge tasks that hold up some budget while running or aborting + List runningMergeTasks = new ArrayList<>(); + List abortingMergeTasks = new ArrayList<>(); + CountDownLatch testDoneLatch = new CountDownLatch(1); + while (expectedAvailableBudget.get() > 0L && maxBlockingTasksToSubmit-- > 0 && randomBoolean()) { + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + long taskBudget = randomLongBetween(1L, expectedAvailableBudget.get()); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget); + when(mergeTask.schedule()).thenReturn(randomFrom(RUN, ABORT)); + // this task runs/aborts, and it's going to hold up some budget for it + expectedAvailableBudget.set(expectedAvailableBudget.get() - taskBudget); + // this task will hold up budget because it blocks when it runs (to simulate it running for a long time) + doAnswer(mock -> { + // wait to be signalled before completing (this holds up budget) + testDoneLatch.await(); + return null; + }).when(mergeTask).run(); + doAnswer(mock -> { + // wait to be signalled before completing (this holds up budget) + testDoneLatch.await(); + return null; + }).when(mergeTask).abort(); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + if (mergeTask.schedule() == RUN) { + runningMergeTasks.add(mergeTask); + } else { + abortingMergeTasks.add(mergeTask); + } + } + assertBusy(() -> { + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0)); + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())); + }); + // send some runnable merge tasks that although runnable are currently over budget + int overBudgetTaskCount = randomIntBetween(1, 5); + List overBudgetTasksToRunList = new ArrayList<>(); + List overBudgetTasksToAbortList = new ArrayList<>(); + while (overBudgetTaskCount-- > 0) { + ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class); + // currently over-budget + long taskBudget = randomLongBetween( + expectedAvailableBudget.get() + 1L, + expectedAvailableBudget.get() + grantedUsableSpaceBuffer + ); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(taskBudget); + Schedule schedule = randomFrom(RUN, ABORT); + when(mergeTask.schedule()).thenReturn(schedule); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + if (schedule == RUN) { + overBudgetTasksToRunList.add(mergeTask); + } else { + overBudgetTasksToAbortList.add(mergeTask); + } + } + // over-budget tasks did not run, are enqueued, and budget is unchanged + assertBusy(() -> { + for (ThreadPoolMergeScheduler.MergeTask mergeTask : overBudgetTasksToAbortList) { + verify(mergeTask, times(0)).schedule(); + verify(mergeTask, times(0)).run(); + verify(mergeTask, times(0)).abort(); + } + for (ThreadPoolMergeScheduler.MergeTask mergeTask : overBudgetTasksToRunList) { + verify(mergeTask, times(0)).schedule(); + verify(mergeTask, times(0)).run(); + verify(mergeTask, times(0)).abort(); + } + assertThat( + threadPoolMergeExecutorService.getMergeTasksQueueLength(), + is(overBudgetTasksToAbortList.size() + overBudgetTasksToRunList.size()) + ); + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())); + }); + // more disk space becomes available + if (aHasMoreSpace) { + aFileStore.usableSpace += grantedUsableSpaceBuffer; + } else { + bFileStore.usableSpace += grantedUsableSpaceBuffer; + } + expectedAvailableBudget.set(expectedAvailableBudget.get() + grantedUsableSpaceBuffer); + // all over-budget tasks can now run because more disk space became available + assertBusy(() -> { + for (ThreadPoolMergeScheduler.MergeTask mergeTask : overBudgetTasksToRunList) { + verify(mergeTask).schedule(); + verify(mergeTask).run(); + verify(mergeTask, times(0)).abort(); + } + for (ThreadPoolMergeScheduler.MergeTask mergeTask : overBudgetTasksToAbortList) { + verify(mergeTask).schedule(); + verify(mergeTask, times(0)).run(); + verify(mergeTask).abort(); + } + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0)); + assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get())); + }); + // let test finish cleanly + testDoneLatch.countDown(); + assertBusy(() -> { + for (ThreadPoolMergeScheduler.MergeTask mergeTask : runningMergeTasks) { + verify(mergeTask).run(); + } + for (ThreadPoolMergeScheduler.MergeTask mergeTask : abortingMergeTasks) { + verify(mergeTask).abort(); + } + assertThat( + threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), + is(availableInitialBudget + grantedUsableSpaceBuffer) + ); + assertThat(threadPoolMergeExecutorService.allDone(), is(true)); + assertThat( + threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), + is(availableInitialBudget + grantedUsableSpaceBuffer) + ); + }); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index bcc5250ea098d..9b74d68326108 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -9,21 +9,28 @@ package org.elasticsearch.index.engine; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.MergeTaskPriorityBlockingQueue; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.PriorityBlockingQueueWithBudget; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; import org.mockito.ArgumentCaptor; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.IdentityHashMap; import java.util.List; import java.util.PriorityQueue; import java.util.Set; @@ -43,6 +50,7 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -56,9 +64,24 @@ public class ThreadPoolMergeExecutorServiceTests extends ESTestCase { - public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() { - TestThreadPool testThreadPool = new TestThreadPool("test"); - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + private NodeEnvironment nodeEnvironment; + + @After + public void closeNodeEnv() { + if (nodeEnvironment != null) { + nodeEnvironment.close(); + nodeEnvironment = null; + } + } + + public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() throws IOException { + TestThreadPool testThreadPool = new TestThreadPool("test", Settings.EMPTY); + nodeEnvironment = newNodeEnvironment(Settings.EMPTY); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + testThreadPool, + Settings.EMPTY, + nodeEnvironment + ); // shutdown the thread pool testThreadPool.shutdown(); MergeTask mergeTask = mock(MergeTask.class); @@ -78,9 +101,16 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd Settings settings = Settings.builder() .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); TestThreadPool testThreadPool = new TestThreadPool("test", settings); - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + nodeEnvironment = newNodeEnvironment(settings); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + testThreadPool, + settings, + nodeEnvironment + ); var countingListener = new CountingMergeEventListener(); threadPoolMergeExecutorService.registerMergeEventListener(countingListener); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); @@ -189,9 +219,16 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { Settings settings = Settings.builder() .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + testThreadPool, + settings, + nodeEnvironment + ); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); Semaphore runMergeSemaphore = new Semaphore(0); AtomicInteger submittedIOThrottledMergeTasks = new AtomicInteger(); @@ -269,9 +306,16 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { Settings settings = Settings.builder() .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + testThreadPool, + settings, + nodeEnvironment + ); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); Semaphore runMergeSemaphore = new Semaphore(0); @@ -333,7 +377,7 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { } } - public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy() { + public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy() throws IOException { // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted int submittedVsExecutedRateOutOf1000 = randomIntBetween(0, 250); testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5)); @@ -341,7 +385,7 @@ public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy() { testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50)); } - public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish() { + public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish() throws IOException { // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted int submittedVsExecutedRateOutOf1000 = randomIntBetween(750, 1000); testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5)); @@ -349,7 +393,7 @@ public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish() { testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50)); } - public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar() { + public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar() throws IOException { // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted int submittedVsExecutedRateOutOf1000 = randomIntBetween(250, 750); testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5)); @@ -357,14 +401,24 @@ public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar() { testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50)); } - private void testIORateAdjustedForSubmittedTasks( - int totalTasksToSubmit, - int submittedVsExecutedRateOutOf1000, - int initialTasksToSubmit - ) { + private void testIORateAdjustedForSubmittedTasks(int totalTasksToSubmit, int submittedVsExecutedRateOutOf1000, int initialTasksToSubmit) + throws IOException { DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue(); ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue.getThreadPool(); - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(mergeExecutorThreadPool); + Settings settings = Settings.builder() + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") + .build(); + if (nodeEnvironment != null) { + nodeEnvironment.close(); + nodeEnvironment = null; + } + nodeEnvironment = newNodeEnvironment(settings); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + mergeExecutorThreadPool, + settings, + nodeEnvironment + ); final AtomicInteger currentlySubmittedMergeTaskCount = new AtomicInteger(); final AtomicLong targetIORateLimit = new AtomicLong(ThreadPoolMergeExecutorService.START_IO_RATE.getBytes()); final AtomicReference lastRunTask = new AtomicReference<>(); @@ -422,9 +476,16 @@ public void testMergeTasksRunConcurrently() throws Exception { Settings settings = Settings.builder() .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + testThreadPool, + settings, + nodeEnvironment + ); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); // more merge tasks than max concurrent merges allowed to run concurrently int totalMergeTasksCount = mergeExecutorThreadCount + randomIntBetween(1, 5); @@ -465,7 +526,7 @@ public void testMergeTasksRunConcurrently() throws Exception { assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(mergeExecutorThreadCount)); // with the other merge tasks enqueued assertThat( - threadPoolMergeExecutorService.getQueuedMergeTasks().size(), + threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(totalMergeTasksCount - mergeExecutorThreadCount - finalCompletedTasksCount) ); // also check thread-pool stats for the same @@ -485,7 +546,7 @@ public void testMergeTasksRunConcurrently() throws Exception { // there are fewer available merges than available threads assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(finalRemainingMergeTasksCount)); // no more merges enqueued - assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0)); + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0)); // also check thread-pool stats for the same assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergeTasksCount)); assertThat(threadPoolExecutor.getQueue().size(), is(0)); @@ -502,9 +563,16 @@ public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception { Settings settings = Settings.builder() .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + testThreadPool, + settings, + nodeEnvironment + ); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); int totalMergeTasksCount = randomIntBetween(1, 10); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); @@ -533,7 +601,7 @@ public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception { assertThat(threadPoolExecutor.getActiveCount(), is(backloggedMergeTasksList.size())); assertThat(threadPoolExecutor.getQueue().size(), is(0)); } - assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0)); + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0)); }); // re-enqueue backlogged merge tasks for (MergeTask backloggedMergeTask : backloggedMergeTasksList) { @@ -555,9 +623,16 @@ public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception { .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) // few merge threads, in order to increase contention .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + testThreadPool, + settings, + nodeEnvironment + ); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); // many merge tasks concurrently int mergeTaskCount = randomIntBetween(10, 100); @@ -613,22 +688,31 @@ public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception { } } - public void testMergeTasksExecuteInSizeOrder() { + public void testMergeTasksExecuteInSizeOrder() throws IOException { DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue(); ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue.getThreadPool(); - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(mergeExecutorThreadPool); + Settings settings = Settings.builder() + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") + .build(); + nodeEnvironment = newNodeEnvironment(settings); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService( + mergeExecutorThreadPool, + settings, + nodeEnvironment + ); DeterministicTaskQueue reEnqueueBackloggedTaskQueue = new DeterministicTaskQueue(); int mergeTaskCount = randomIntBetween(10, 100); // sort merge tasks available to run by size PriorityQueue mergeTasksAvailableToRun = new PriorityQueue<>( mergeTaskCount, - Comparator.comparingLong(MergeTask::estimatedMergeSize) + Comparator.comparingLong(MergeTask::estimatedRemainingMergeSize) ); for (int i = 0; i < mergeTaskCount; i++) { MergeTask mergeTask = mock(MergeTask.class); when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); // merge tasks of various sizes (0 might be a valid value) - when(mergeTask.estimatedMergeSize()).thenReturn(randomLongBetween(0, 10)); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(0, 10)); doAnswer(mock -> { // each individual merge task can either "run" or be "backlogged" at any point in time Schedule schedule = randomFrom(Schedule.values()); @@ -650,7 +734,10 @@ public void testMergeTasksExecuteInSizeOrder() { } if (schedule == RUN && mergeTasksAvailableToRun.isEmpty() == false) { // assert the merge task that's now going to run is the smallest of the ones currently available to run - assertThat(mergeTask.estimatedMergeSize(), lessThanOrEqualTo(mergeTasksAvailableToRun.peek().estimatedMergeSize())); + assertThat( + mergeTask.estimatedRemainingMergeSize(), + lessThanOrEqualTo(mergeTasksAvailableToRun.peek().estimatedRemainingMergeSize()) + ); } return schedule; }).when(mergeTask).schedule(); @@ -675,6 +762,123 @@ public void testMergeTasksExecuteInSizeOrder() { } } + public void testMergeTaskQueueAvailableBudgetTracking() throws Exception { + MergeTaskPriorityBlockingQueue mergeTaskPriorityBlockingQueue = new MergeTaskPriorityBlockingQueue(); + assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(0L)); + long availableBudget = randomLongBetween(1, 10); + mergeTaskPriorityBlockingQueue.updateBudget(availableBudget); + assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(availableBudget)); + + int taskCount = randomIntBetween(5, 15); + for (int i = 0; i < taskCount; i++) { + MergeTask mergeTask = mock(MergeTask.class); + when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(1, 10)); + mergeTaskPriorityBlockingQueue.enqueue(mergeTask); + } + assertThat(mergeTaskPriorityBlockingQueue.queueSize(), is(taskCount)); + assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(availableBudget)); + + List.ElementWithReleasableBudget> tookElements = new ArrayList<>(); + + while (mergeTaskPriorityBlockingQueue.isQueueEmpty() == false) { + if (mergeTaskPriorityBlockingQueue.peekQueue().estimatedRemainingMergeSize() <= mergeTaskPriorityBlockingQueue + .getAvailableBudget() && randomBoolean()) { + // take another element (merge task) from the queue + long prevBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + tookElements.add(mergeTaskPriorityBlockingQueue.take()); + long afterBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + assertThat(afterBudget, greaterThanOrEqualTo(0L)); + assertThat(prevBudget - afterBudget, is(tookElements.getLast().element().estimatedRemainingMergeSize())); + } else if (tookElements.stream().anyMatch(e -> e.isClosed() == false) && randomBoolean()) { + // "closes" a previously took element to simulate it has gone out of scope + int index = randomValueOtherThanMany( + i -> tookElements.get(i).isClosed(), + () -> randomIntBetween(0, tookElements.size() - 1) + ); + var elementToClose = tookElements.remove(index); + long prevBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + elementToClose.close(); + long afterBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + // budget hasn't yet changed, the update budget method needs to be invoked before it does + assertThat(afterBudget, is(prevBudget)); + } else if (randomBoolean()) { + // update (possibly increment) the available budget + long budgetIncrement = randomLongBetween(0, 3); + availableBudget += budgetIncrement; + mergeTaskPriorityBlockingQueue.updateBudget(availableBudget); + // "closed" took elements should not impact budget computation + tookElements.removeIf(PriorityBlockingQueueWithBudget.ElementWithReleasableBudget::isClosed); + long expectedBudget = availableBudget - tookElements.stream() + .mapToLong(e -> e.element().estimatedRemainingMergeSize()) + .sum(); + long afterBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + assertThat(afterBudget, is(expectedBudget)); + } + } + } + + public void testMergeTaskQueueBudgetTrackingWhenEstimatedRemainingMergeSizeChanges() throws Exception { + MergeTaskPriorityBlockingQueue mergeTaskPriorityBlockingQueue = new MergeTaskPriorityBlockingQueue(); + assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(0L)); + // plenty of available budget (this should be fixed for this test) + final long availableBudget = randomLongBetween(1000L, 2000L); + mergeTaskPriorityBlockingQueue.updateBudget(availableBudget); + assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(availableBudget)); + + IdentityHashMap budgetMap = new IdentityHashMap<>(); + int taskCount = randomIntBetween(5, 15); + for (int i = 0; i < taskCount; i++) { + MergeTask mergeTask = mock(MergeTask.class); + budgetMap.put(mergeTask, randomLongBetween(1L, 10L)); + doAnswer(invocation -> budgetMap.get((MergeTask) invocation.getMock())).when(mergeTask).estimatedRemainingMergeSize(); + mergeTaskPriorityBlockingQueue.enqueue(mergeTask); + } + assertThat(mergeTaskPriorityBlockingQueue.queueSize(), is(taskCount)); + assertThat(mergeTaskPriorityBlockingQueue.getAvailableBudget(), is(availableBudget)); + + List.ElementWithReleasableBudget> tookElements = new ArrayList<>(); + + while (mergeTaskPriorityBlockingQueue.isQueueEmpty() == false) { + if (tookElements.stream().allMatch(PriorityBlockingQueueWithBudget.ElementWithReleasableBudget::isClosed) || randomBoolean()) { + // take another element (merge task) from the queue + long prevBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + tookElements.add(mergeTaskPriorityBlockingQueue.take()); + long afterBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + assertThat(afterBudget, greaterThanOrEqualTo(0L)); + assertThat(prevBudget - afterBudget, is(tookElements.getLast().element().estimatedRemainingMergeSize())); + } else if (randomBoolean()) { + // "closes" a previously took element to simulate it has gone out of scope + int index = randomValueOtherThanMany( + i -> tookElements.get(i).isClosed(), + () -> randomIntBetween(0, tookElements.size() - 1) + ); + var elementToClose = tookElements.remove(index); + long prevBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + elementToClose.close(); + long afterBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + // budget hasn't yet changed, the update budget method needs to be invoked before it does + assertThat(afterBudget, is(prevBudget)); + } else { + // update the remaining merge size of a took (but not "closed") merge task + int index = randomValueOtherThanMany( + i -> tookElements.get(i).isClosed(), + () -> randomIntBetween(0, tookElements.size() - 1) + ); + var elementToUpdate = tookElements.get(index); + long prevElementBudget = elementToUpdate.element().estimatedRemainingMergeSize(); + long afterElementBudget = randomValueOtherThan(prevElementBudget, () -> randomLongBetween(1L, 10L)); + budgetMap.put(elementToUpdate.element(), afterElementBudget); + assertThat(elementToUpdate.element().estimatedRemainingMergeSize(), is(afterElementBudget)); + // "closed" took elements should not impact budget computation + tookElements.removeIf(PriorityBlockingQueueWithBudget.ElementWithReleasableBudget::isClosed); + long expectedBudget = availableBudget - tookElements.stream().mapToLong(e -> budgetMap.get(e.element())).sum(); + mergeTaskPriorityBlockingQueue.updateBudget(availableBudget); + long afterBudget = mergeTaskPriorityBlockingQueue.getAvailableBudget(); + assertThat(afterBudget, is(expectedBudget)); + } + } + } + private static class CountingMergeEventListener implements MergeEventListener { AtomicInteger queued = new AtomicInteger(); AtomicInteger aborted = new AtomicInteger(); @@ -696,14 +900,13 @@ public void onMergeAborted(OnGoingMerge merge) { } } - static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) { + static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService( + ThreadPool threadPool, + Settings settings, + NodeEnvironment nodeEnvironment + ) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService - .maybeCreateThreadPoolMergeExecutorService( - threadPool, - randomBoolean() - ? Settings.EMPTY - : Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build() - ); + .maybeCreateThreadPoolMergeExecutorService(threadPool, ClusterSettings.createBuiltInClusterSettings(settings), nodeEnvironment); assertNotNull(threadPoolMergeExecutorService); assertTrue(threadPoolMergeExecutorService.allDone()); return threadPoolMergeExecutorService; diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 01a6150fd140c..156dcf581ec9c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; @@ -26,6 +27,7 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; import org.mockito.ArgumentCaptor; import java.io.IOException; @@ -53,10 +55,25 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase { + private NodeEnvironment nodeEnvironment; + + @After + public void closeNodeEnv() { + if (nodeEnvironment != null) { + nodeEnvironment.close(); + nodeEnvironment = null; + } + } + public void testMergesExecuteInSizeOrder() throws IOException { DeterministicTaskQueue threadPoolTaskQueue = new DeterministicTaskQueue(); + Settings settings = Settings.builder() + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") + .build(); + nodeEnvironment = newNodeEnvironment(settings); ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests - .getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool()); + .getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), settings, nodeEnvironment); try ( ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), @@ -142,7 +159,10 @@ public void testSimpleMergeTaskReEnqueueingBySize() { merge -> 0 ); // sort backlogged merges by size - PriorityQueue backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize)); + PriorityQueue backloggedMergeTasks = new PriorityQueue<>( + 16, + Comparator.comparingLong(MergeTask::estimatedRemainingMergeSize) + ); // more merge tasks than merge threads int mergeCount = mergeExecutorThreadCount + randomIntBetween(2, 10); for (int i = 0; i < mergeCount; i++) { @@ -341,10 +361,13 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception Settings settings = Settings.builder() .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests - .getThreadPoolMergeExecutorService(testThreadPool); + .getThreadPoolMergeExecutorService(testThreadPool, settings, nodeEnvironment); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); try ( ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( @@ -414,10 +437,13 @@ public void testMergesRunConcurrently() throws Exception { Settings settings = Settings.builder() .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeSchedulerMaxThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests - .getThreadPoolMergeExecutorService(testThreadPool); + .getThreadPoolMergeExecutorService(testThreadPool, settings, nodeEnvironment); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); try ( @@ -460,7 +486,7 @@ public void testMergesRunConcurrently() throws Exception { // also check the same for the thread-pool executor assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(mergeSchedulerMaxThreadCount)); // queued merge tasks do not include backlogged merges - assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0)); + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0)); // also check thread-pool stats for the same // there are active thread-pool threads waiting for the backlogged merge tasks to be re-enqueued int activeMergeThreads = Math.min(mergeCount - finalCompletedMergesCount, mergeExecutorThreadCount); @@ -481,7 +507,7 @@ public void testMergesRunConcurrently() throws Exception { // also check thread-pool executor for the same assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(finalRemainingMergesCount)); // no more backlogged merges - assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0)); + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0)); // also check thread-pool stats for the same assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergesCount)); assertThat(threadPoolExecutor.getQueue().size(), is(0)); @@ -500,10 +526,13 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception { Settings settings = Settings.builder() .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeSchedulerMaxThreadCount) + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests - .getThreadPoolMergeExecutorService(testThreadPool); + .getThreadPoolMergeExecutorService(testThreadPool, settings, nodeEnvironment); assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); try ( ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 699b5e93d79f9..e21b046d7a9f8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -33,6 +34,7 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; @@ -91,6 +93,7 @@ public class RefreshListenersTests extends ESTestCase { private Engine engine; private volatile int maxListeners; private ThreadPool threadPool; + private NodeEnvironment nodeEnvironment; private ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private Store store; @@ -104,7 +107,12 @@ public void setupListeners() throws Exception { .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()) .build(); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", settings); - threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings); + nodeEnvironment = newNodeEnvironment(settings); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( + threadPool, + ClusterSettings.createBuiltInClusterSettings(settings), + nodeEnvironment + ); listeners = new RefreshListeners( () -> maxListeners, () -> engine.refresh("too-many-listeners"), @@ -178,8 +186,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { @After public void tearDownListeners() throws Exception { - IOUtils.close(engine, store); - terminate(threadPool); + IOUtils.close(engine, store, nodeEnvironment, () -> terminate(threadPool)); } public void testBeforeRefresh() throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 9bb6696b1ee6d..3a134a3d0d9e5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -61,6 +61,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; @@ -68,6 +69,7 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; @@ -156,6 +158,7 @@ public abstract class EngineTestCase extends ESTestCase { protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); protected ThreadPool threadPool; + protected NodeEnvironment nodeEnvironment; protected ThreadPoolMergeExecutorService threadPoolMergeExecutorService; protected TranslogHandler translogHandler; @@ -246,9 +249,11 @@ public void setUp() throws Exception { } defaultSettings = IndexSettingsModule.newIndexSettings("index", indexSettings()); threadPool = new TestThreadPool(getClass().getName()); + nodeEnvironment = newNodeEnvironment(defaultSettings.getNodeSettings()); threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( threadPool, - defaultSettings.getNodeSettings() + ClusterSettings.createBuiltInClusterSettings(defaultSettings.getNodeSettings()), + nodeEnvironment ); store = createStore(); @@ -400,7 +405,7 @@ public void tearDown() throws Exception { assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine); } } finally { - IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool)); + IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool), nodeEnvironment); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 89ce1f4eb06cd..47d9520c5aabb 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -154,6 +154,7 @@ public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailur }; protected ThreadPool threadPool; + protected NodeEnvironment nodeEnvironment; protected ThreadPoolMergeExecutorService threadPoolMergeExecutorService; protected Executor writeExecutor; protected long primaryTerm; @@ -171,7 +172,12 @@ public void setUp() throws Exception { super.setUp(); Settings settings = threadPoolSettings(); threadPool = setUpThreadPool(settings); - threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings); + nodeEnvironment = newNodeEnvironment(settings); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( + threadPool, + ClusterSettings.createBuiltInClusterSettings(settings), + nodeEnvironment + ); writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards failOnShardFailures(); @@ -184,7 +190,7 @@ protected ThreadPool setUpThreadPool(Settings settings) { @Override public void tearDown() throws Exception { try { - tearDownThreadPool(); + IOUtils.close(nodeEnvironment, this::tearDownThreadPool); } finally { super.tearDown(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 64d94f611f7b2..5e7481602a77a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -18,10 +18,13 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexModule; @@ -85,6 +88,7 @@ public class FollowingEngineTests extends ESTestCase { private ThreadPool threadPool; + private NodeEnvironment nodeEnvironment; private ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private Index index; private ShardId shardId; @@ -99,7 +103,12 @@ public void setUp() throws Exception { .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()) .build(); threadPool = new TestThreadPool("following-engine-tests", settings); - threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings); + nodeEnvironment = newNodeEnvironment(settings); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( + threadPool, + ClusterSettings.createBuiltInClusterSettings(settings), + nodeEnvironment + ); index = new Index("index", "uuid"); shardId = new ShardId(index, 0); primaryTerm.set(randomLongBetween(1, Long.MAX_VALUE)); @@ -108,7 +117,7 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { - terminate(threadPool); + IOUtils.close(nodeEnvironment, () -> terminate(threadPool)); super.tearDown(); }