diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index a130a5b869adc..0965ef646a5eb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -625,6 +625,7 @@ public static final IndexShard newIndexShard( indexService.getIndexEventListener(), wrapper, indexService.getThreadPool(), + indexService.getThreadPoolMergeExecutor(), indexService.getBigArrays(), null, Collections.emptyList(), diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 74ccdce19d3ad..b53d8749248e5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -59,6 +59,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutor(), indexSettings, config.getWarmer(), config.getStore(), diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 5512dffdda53e..b435b3ed64bf8 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutor; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldDataCache; @@ -154,6 +155,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final AsyncTrimTranslogTask trimTranslogTask; private final ThreadPool threadPool; + private final ThreadPoolMergeExecutor threadPoolMergeExecutor; private final BigArrays bigArrays; private final ScriptService scriptService; private final ClusterService clusterService; @@ -260,6 +262,7 @@ public IndexService( this.indexFoldersDeletionListener = indexFoldersDeletionListener; this.bigArrays = bigArrays; this.threadPool = threadPool; + this.threadPoolMergeExecutor = new ThreadPoolMergeExecutor(threadPool); this.scriptService = scriptService; this.clusterService = clusterService; this.client = client; @@ -555,6 +558,7 @@ public synchronized IndexShard createShard( eventListener, readerWrapper, threadPool, + threadPoolMergeExecutor, bigArrays, engineWarmer, searchOperationListeners, @@ -819,6 +823,10 @@ public ThreadPool getThreadPool() { return threadPool; } + public ThreadPoolMergeExecutor getThreadPoolMergeExecutor() { + return threadPoolMergeExecutor; + } + /** * The {@link BigArrays} to use for this index. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index af3c2cd5172f6..dc80430408ffe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -58,6 +58,7 @@ public final class EngineConfig { private final MapperService mapperService; private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier; private final ThreadPool threadPool; + private final ThreadPoolMergeExecutor threadPoolMergeExecutor; private final Engine.Warmer warmer; private final Store store; private final MergePolicy mergePolicy; @@ -150,6 +151,7 @@ public Supplier retentionLeasesSupplier() { public EngineConfig( ShardId shardId, ThreadPool threadPool, + ThreadPoolMergeExecutor threadPoolMergeExecutor, IndexSettings indexSettings, Engine.Warmer warmer, Store store, @@ -179,6 +181,7 @@ public EngineConfig( this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; + this.threadPoolMergeExecutor = threadPoolMergeExecutor; this.warmer = warmer == null ? (a) -> {} : warmer; this.store = store; this.mergePolicy = mergePolicy; @@ -287,6 +290,10 @@ public ThreadPool getThreadPool() { return threadPool; } + public ThreadPoolMergeExecutor getThreadPoolMergeExecutor() { + return threadPoolMergeExecutor; + } + /** * Returns an {@link org.elasticsearch.index.engine.Engine.Warmer} used to warm new searchers before they are used for searching. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d3d7dcd8e930f..b8235ce9e4abc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -253,7 +253,11 @@ public InternalEngine(EngineConfig engineConfig) { boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); - mergeScheduler = createMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); + mergeScheduler = createMergeScheduler( + engineConfig.getShardId(), + engineConfig.getIndexSettings(), + engineConfig.getThreadPoolMergeExecutor() + ); scheduler = mergeScheduler.getMergeScheduler(); throttle = new IndexThrottle(); try { @@ -2823,8 +2827,72 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() { return indexWriter.getConfig(); } - protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) { - return new EngineMergeScheduler(shardId, indexSettings); + protected ElasticsearchMergeScheduler createMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + ThreadPoolMergeExecutor threadPoolMergeExecutor + ) { + // return new EngineMergeScheduler(shardId, indexSettings); + return new ThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutor) { + + @Override + protected synchronized void activateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { + logger.info( + "now throttling indexing: numRunningMerges={}, numQueuedMerges={}, maxNumMergesConfigured={}", + numRunningMerges, + numQueuedMerges, + configuredMaxMergeCount + ); + InternalEngine.this.activateThrottling(); + } + + @Override + protected synchronized void deactivateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { + logger.info( + "stop throttling indexing: numRunningMerges={}, numQueuedMerges={}, maxNumMergesConfigured={}", + numRunningMerges, + numQueuedMerges, + configuredMaxMergeCount + ); + InternalEngine.this.deactivateThrottling(); + } + + @Override + public synchronized void afterMerge(OnGoingMerge merge) { + if (indexWriter.hasPendingMerges() == false + && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { + // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer + // we deadlock on engine#close for instance. + engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (isClosed.get() == false) { + logger.warn("failed to flush after merge has finished", e); + } else { + logger.info("failed to flush after merge has finished during shard close"); + } + } + + @Override + protected void doRun() { + // if we have no pending merges and we are supposed to flush once merges have finished to + // free up transient disk usage of the (presumably biggish) segments that were just merged + flush(); + } + }); + } else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) { + // we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change + // we should execute a flush on the next operation if that's a flush after inactive or indexing a document. + // we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events. + shouldPeriodicallyFlushAfterBigMerge.set(true); + } + } + + @Override + protected void handleMergeException(final Throwable exc) { + mergeException(exc); + } + }; } private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutor.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutor.java new file mode 100644 index 0000000000000..9114e9fbe92a7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutor.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +public class ThreadPoolMergeExecutor { + /** + * Floor for IO write rate limit (we will never go any lower than this) + */ + private static final double MIN_MERGE_MB_PER_SEC = 5.0; + /** + * Ceiling for IO write rate limit (we will never go any higher than this) + */ + private static final double MAX_MERGE_MB_PER_SEC = 10240.0; + /** + * Initial value for IO write rate limit when doAutoIOThrottle is true + */ + private static final double START_MB_PER_SEC = 20.0; + /** + * Current IO write throttle rate, for all merge, across all merge schedulers (shards) on the node + */ + private double targetMBPerSec = START_MB_PER_SEC; + private final SortedSet registeredMergeSchedulers = new TreeSet<>(new Comparator() { + @Override + public int compare(ThreadPoolMergeScheduler tpms1, ThreadPoolMergeScheduler tpms2) { + MergeTask mergeTask1 = tpms1.peekMergeTaskToExecute(); + MergeTask mergeTask2 = tpms2.peekMergeTaskToExecute(); + if (mergeTask1 == null && mergeTask2 == null) { + // arbitrary order between schedulers that cannot run any merge right now + return System.identityHashCode(tpms1) - System.identityHashCode(tpms2); + } else if (mergeTask1 == null) { + // "merge task 2" can run because "merge scheduler 1" cannot run any merges + return 1; + } else if (mergeTask2 == null) { + // "merge task 1" can run because "merge scheduler 2" cannot run any merges + return -1; + } else { + // run smaller merge task first + return mergeTask1.compareTo(mergeTask2); + } + } + }); + private final ExecutorService executorService; + private final int maxConcurrentMerges; + private int currentlyExecutingMergesCount; + private int currentlyActiveIOThrottledMergesCount; + + public ThreadPoolMergeExecutor(ThreadPool threadPool) { + this.executorService = threadPool.executor(ThreadPool.Names.MERGE); + this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax(); + } + + public double getTargetMBPerSec() { + return targetMBPerSec; + } + + public synchronized void registerMergeScheduler(ThreadPoolMergeScheduler threadPoolMergeScheduler) { + if (registeredMergeSchedulers.add(threadPoolMergeScheduler) == false) { + throw new IllegalStateException("cannot register the same scheduler multiple times"); + } + } + + public synchronized void unregisterMergeScheduler(ThreadPoolMergeScheduler threadPoolMergeScheduler) { + if (registeredMergeSchedulers.remove(threadPoolMergeScheduler) == false) { + throw new IllegalStateException("cannot unregister if the scheduler has not been registered"); + } + } + + public synchronized void updateMergeScheduler( + ThreadPoolMergeScheduler threadPoolMergeScheduler, + Consumer updater + ) { + boolean removed = registeredMergeSchedulers.remove(threadPoolMergeScheduler); + if (false == removed) { + throw new IllegalStateException("Cannot update a merge scheduler that is not registered"); + } + currentlyExecutingMergesCount -= threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(); + currentlyActiveIOThrottledMergesCount -= getIOThrottledMergeTasksCount(threadPoolMergeScheduler); + updater.accept(threadPoolMergeScheduler); + boolean added = registeredMergeSchedulers.add(threadPoolMergeScheduler); + if (false == added) { + throw new IllegalStateException("Found duplicate registered merge scheduler"); + } + currentlyExecutingMergesCount += threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(); + currentlyActiveIOThrottledMergesCount += getIOThrottledMergeTasksCount(threadPoolMergeScheduler); + double newTargetMBPerSec = maybeUpdateTargetMBPerSec(); + if (newTargetMBPerSec != targetMBPerSec) { + targetMBPerSec = newTargetMBPerSec; + threadPoolMergeScheduler.setIORateLimitForAllMergeTasks(newTargetMBPerSec); + } + maybeExecuteNextMerges(); + } + + public synchronized void maybeExecuteNextMerges() { + while (true) { + if (currentlyExecutingMergesCount >= maxConcurrentMerges) { + // all merge threads are busy + return; + } + if (registeredMergeSchedulers.first().peekMergeTaskToExecute() == null) { + // no merges available to run + return; + } + ThreadPoolMergeScheduler threadPoolMergeScheduler = registeredMergeSchedulers.removeFirst(); + currentlyExecutingMergesCount -= threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(); + MergeTask mergeTask = threadPoolMergeScheduler.executeNextMergeTask(); + assert mergeTask != null; + executorService.execute(mergeTask); + registeredMergeSchedulers.add(threadPoolMergeScheduler); + currentlyExecutingMergesCount += threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(); + } + } + + private int getIOThrottledMergeTasksCount(ThreadPoolMergeScheduler mergeScheduler) { + if (mergeScheduler.shouldIOThrottleMergeTasks() == false) { + return 0; + } else { + int count = 0; + for (MergeTask runningMergeTask : mergeScheduler.getCurrentlyRunningMergeTasks()) { + if (runningMergeTask.supportsIOThrottling()) { + count++; + } + } + for (MergeTask queuedMergeTask : mergeScheduler.getQueuedMergeTasks()) { + if (queuedMergeTask.supportsIOThrottling()) { + count++; + } + } + return count; + } + } + + private double maybeUpdateTargetMBPerSec() { + if (currentlyActiveIOThrottledMergesCount < maxConcurrentMerges * 2 && targetMBPerSec > MIN_MERGE_MB_PER_SEC) { + return Math.max(MIN_MERGE_MB_PER_SEC, targetMBPerSec / 1.1); + } else if (currentlyActiveIOThrottledMergesCount > maxConcurrentMerges * 4 && targetMBPerSec < MAX_MERGE_MB_PER_SEC) { + return Math.min(MAX_MERGE_MB_PER_SEC, targetMBPerSec * 1.1); + } + return targetMBPerSec; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java new file mode 100644 index 0000000000000..034ca27c93cf8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -0,0 +1,482 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergeRateLimiter; +import org.apache.lucene.index.MergeScheduler; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RateLimitedIndexOutput; +import org.apache.lucene.store.RateLimiter; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergeSchedulerConfig; +import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class ThreadPoolMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler { + private final ShardId shardId; + private final MergeSchedulerConfig config; + private final Logger logger; + // per-scheduler merge stats + private final MergeTracking mergeTracking; + private final ThreadPoolMergeExecutor threadPoolMergeExecutor; + private final ThreadLocal onGoingMergeRateLimiter = new ThreadLocal<>(); + private final PriorityQueue queuedMergeTasks = new PriorityQueue<>(); + private final List currentlyRunningMergeTasks = new ArrayList<>(); + // set when incoming merges should be throttled + private final AtomicBoolean shouldThrottleIncomingMerges = new AtomicBoolean(); + // how many {@link MergeTask}s have kicked off (this is used to name them). + private final AtomicLong mergeTaskCount = new AtomicLong(); + private int maxThreadCount; + private int maxMergeCount; + private boolean shouldIOThrottledMergeTasks; + + @SuppressWarnings("this-escape") + public ThreadPoolMergeScheduler(ShardId shardId, IndexSettings indexSettings, ThreadPoolMergeExecutor threadPoolMergeExecutor) { + this.shardId = shardId; + this.config = indexSettings.getMergeSchedulerConfig(); + this.logger = Loggers.getLogger(getClass(), shardId); + this.mergeTracking = new MergeTracking( + logger, + () -> this.config.isAutoThrottle() ? threadPoolMergeExecutor.getTargetMBPerSec() : Double.POSITIVE_INFINITY + ); + this.threadPoolMergeExecutor = threadPoolMergeExecutor; + threadPoolMergeExecutor.registerMergeScheduler(this); + refreshConfig(); + } + + @Override + public Set onGoingMerges() { + return mergeTracking.onGoingMerges(); + } + + @Override + public MergeStats stats() { + return mergeTracking.stats(); + } + + @Override + public MergeScheduler getMergeScheduler() { + return this; + } + + @Override + public void refreshConfig() { + update(() -> { + maxThreadCount = config.getMaxThreadCount(); + maxMergeCount = config.getMaxMergeCount(); + shouldIOThrottledMergeTasks = config.isAutoThrottle(); + }); + maybeActivateThrottling(); + maybeDeactivateThrottling(); + } + + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { + MergePolicy.OneMerge merge = mergeSource.getNextMerge(); + if (merge != null) { + submitNewMergeTask(mergeSource, merge, trigger); + } + } + + /** + * A callback allowing for custom logic before an actual merge starts. + */ + protected void beforeMerge(OnGoingMerge merge) {} + + /** + * A callback allowing for custom logic after an actual merge starts. + */ + protected void afterMerge(OnGoingMerge merge) {} + + protected void activateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {} + + protected void deactivateThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {} + + @Override + public MergeScheduler clone() { + // Lucene IW makes a clone internally but since we hold on to this instance + // the clone will just be the identity. + return this; + } + + protected void handleMergeException(Throwable t) { + throw new MergePolicy.MergeException(t); + } + + boolean shouldIOThrottleMergeTasks() { + return shouldIOThrottledMergeTasks; + } + + void setIORateLimitForAllMergeTasks(double mbPerSec) { + if (shouldIOThrottledMergeTasks == false) { + throw new IllegalArgumentException("scheduler cannot IO throttle merge tasks"); + } + for (MergeTask runningMergeTask : getCurrentlyRunningMergeTasks()) { + if (runningMergeTask.supportsIOThrottling()) { + runningMergeTask.setIORateLimit(mbPerSec); + } + } + for (MergeTask queuedMergeTask : getQueuedMergeTasks()) { + if (queuedMergeTask.supportsIOThrottling()) { + queuedMergeTask.setIORateLimit(mbPerSec); + } + } + } + + List getCurrentlyRunningMergeTasks() { + return currentlyRunningMergeTasks; + } + + PriorityQueue getQueuedMergeTasks() { + return queuedMergeTasks; + } + + MergeTask peekMergeTaskToExecute() { + if (currentlyRunningMergeTasks.size() >= config.getMaxThreadCount()) { + // there are already enough concurrent merges per scheduler (per shard) that are currently running + return null; + } + MergeTask mergeTask = queuedMergeTasks.peek(); + if (mergeTask == null) { + // no more merges to execute + return null; + } + assert mergeTask.isRunning() == false; + return mergeTask; + } + + synchronized MergeTask executeNextMergeTask() { + if (currentlyRunningMergeTasks.size() >= config.getMaxThreadCount()) { + // there are already enough concurrent merges per scheduler (per shard) that are currently running + return null; + } + MergeTask mergeTask = queuedMergeTasks.poll(); + if (mergeTask == null) { + // no more merges to execute + return null; + } + assert mergeTask.isRunning() == false; + currentlyRunningMergeTasks.add(mergeTask); + return mergeTask; + } + + private void submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { + MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger); + enqueMergeTask(mergeTask); + maybeActivateThrottling(); + } + + private void mergeDone(MergeTask mergeTask) { + assert mergeTask.isRunning(); + update(() -> this.currentlyRunningMergeTasks.remove(mergeTask)); + maybeDeactivateThrottling(); + } + + private void maybeActivateThrottling() { + int numRunningMerges = currentlyRunningMergeTasks.size(); + int numQueuedMerges = queuedMergeTasks.size(); + int configuredMaxMergeCount = maxThreadCount; + // both currently running and enqueued count as "active" for throttling purposes + if (numRunningMerges + numQueuedMerges > configuredMaxMergeCount && shouldThrottleIncomingMerges.getAndSet(true) == false) { + activateThrottling(numRunningMerges, numQueuedMerges, configuredMaxMergeCount); + } + } + + private void maybeDeactivateThrottling() { + int numRunningMerges = currentlyRunningMergeTasks.size(); + int numQueuedMerges = queuedMergeTasks.size(); + int configuredMaxMergeCount = maxMergeCount; + // both currently running and enqueued count as "active" for throttling purposes + if (numRunningMerges + numQueuedMerges <= configuredMaxMergeCount && shouldThrottleIncomingMerges.getAndSet(false)) { + deactivateThrottling(numRunningMerges, numQueuedMerges, configuredMaxMergeCount); + } + } + + private MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { + // forced merges, and merges triggered when closing shard, always run un-throttled + boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1; + return new MergeTask( + mergeSource, + merge, + isAutoThrottle, + "Lucene Merge Task #" + mergeTaskCount.incrementAndGet() + " for shard " + shardId + ); + } + + private void enqueMergeTask(MergeTask mergeTask) { + assert mergeTask.isRunning() == false; + update(() -> this.queuedMergeTasks.add(mergeTask)); + } + + private void update(Runnable updater) { + threadPoolMergeExecutor.updateMergeScheduler(this, (ignored) -> { + synchronized (this) { + updater.run(); + } + }); + } + + /** + * Does the actual merge, by calling {@link org.apache.lucene.index.MergeScheduler.MergeSource#merge} + */ + protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException { + mergeSource.merge(merge); + } + + @Override + public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in) { + // Return a wrapped Directory which has rate-limited output. + // Note: the rate limiter is only per thread. So, if there are multiple merge threads running + // and throttling is required, each thread will be throttled independently. + // The implication of this, is that the total IO rate could be higher than the target rate. + RateLimiter rateLimiter = onGoingMergeRateLimiter.get(); + return new FilterDirectory(in) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + ensureOpen(); + + // This Directory is only supposed to be used during merging, + // so all writes should have MERGE context, else there is a bug + // somewhere that is failing to pass down the right IOContext: + assert context.context() == IOContext.Context.MERGE : "got context=" + context.context(); + + return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context)); + } + }; + } + + final class MergeTask extends AbstractRunnable implements Comparable { + private final String name; + private final SetOnce mergeStartTimeNS; + private final MergeSource mergeSource; + private final OnGoingMerge onGoingMerge; + private final MergeRateLimiter rateLimiter; + private final boolean supportsIOThrottling; + + MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) { + this.name = name; + this.mergeStartTimeNS = new SetOnce<>(); + this.mergeSource = mergeSource; + this.onGoingMerge = new OnGoingMerge(merge); + this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); + this.supportsIOThrottling = supportsIOThrottling; + // probably redundant, but better be explicit + if (this.supportsIOThrottling == false) { + this.rateLimiter.setMBPerSec(Double.POSITIVE_INFINITY); + } + } + + @Override + public int compareTo(MergeTask other) { + // sort smaller merges first, so they are executed before larger ones + return Long.compare(onGoingMerge.getMerge().estimatedMergeBytes, other.onGoingMerge.getMerge().estimatedMergeBytes); + } + + public boolean supportsIOThrottling() { + return supportsIOThrottling; + } + + public void setIORateLimit(double mbPerSec) { + if (supportsIOThrottling == false) { + throw new IllegalArgumentException("merge task cannot be IO throttled"); + } + this.rateLimiter.setMBPerSec(mbPerSec); + } + + public boolean isRunning() { + return mergeStartTimeNS.get() != null; + } + + @Override + public void doRun() throws Exception { + mergeStartTimeNS.set(System.nanoTime()); + try { + onGoingMergeRateLimiter.set(this.rateLimiter); + beforeMerge(onGoingMerge); + mergeTracking.mergeStarted(onGoingMerge); + if (verbose()) { + message(String.format(Locale.ROOT, "merge task %s start", getName())); + } + doMerge(mergeSource, onGoingMerge.getMerge()); + if (verbose()) { + message( + String.format( + Locale.ROOT, + "merge task %s merge segment [%s] done estSize=%.1f MB (written=%.1f MB) " + + "runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s", + getName(), + getSegmentName(onGoingMerge.getMerge()), + bytesToMB(onGoingMerge.getMerge().estimatedMergeBytes), + bytesToMB(rateLimiter.getTotalBytesWritten()), + nsToSec(System.nanoTime() - mergeStartTimeNS.get()), + nsToSec(rateLimiter.getTotalStoppedNS()), + nsToSec(rateLimiter.getTotalPausedNS()), + rateToString(rateLimiter.getMBPerSec()) + ) + ); + } + } catch (Throwable t) { + if (t instanceof MergePolicy.MergeAbortedException) { + // OK to ignore. This is what Lucene's ConcurrentMergeScheduler does + } else if (t instanceof Exception == false) { + // onFailure and onAfter should better be called for Errors too + throw new ExceptionWrappingError(t); + } else { + throw t; + } + } + } + + @Override + public void onAfter() { + assert this.mergeStartTimeNS.get() != null : "onAfter should always be invoked after doRun"; + try { + if (verbose()) { + message(String.format(Locale.ROOT, "merge task %s end", getName())); + } + afterMerge(onGoingMerge); + } finally { + onGoingMergeRateLimiter.remove(); + long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get()); + try { + mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS); + } finally { + mergeDone(this); + // kick-off next merge, if any + MergePolicy.OneMerge nextMerge = null; + try { + nextMerge = mergeSource.getNextMerge(); + } catch (IllegalStateException e) { + if (verbose()) { + message("merge task poll failed, likely that index writer is failed"); + } + // ignore exception, we expect the IW failure to be logged elsewhere + } + if (nextMerge != null) { + submitNewMergeTask(mergeSource, nextMerge, MergeTrigger.MERGE_FINISHED); + } + } + } + } + + @Override + public void onFailure(Exception e) { + // most commonly the merge should've already be aborted by now, + // plus the engine is probably going to be failed when any merge fails, + // but keep this in case something believes calling `MergeTask#onFailure` is a sane way to abort a merge + abortOnGoingMerge(); + mergeDone(this); + handleMergeException(ExceptionWrappingError.maybeUnwrapCause(e)); + } + + @Override + public void onRejection(Exception e) { + if (verbose()) { + message(String.format(Locale.ROOT, "merge task [%s] rejected by thread pool, aborting", onGoingMerge.getId())); + } + abortOnGoingMerge(); + mergeDone(this); + } + + private void abortOnGoingMerge() { + // This would interrupt an IndexWriter if it were actually performing the merge. We just set this here because it seems + // appropriate as we are not going to move forward with the merge. + onGoingMerge.getMerge().setAborted(); + // It is fine to mark this merge as finished. Lucene will eventually produce a new merge including this segment even if + // this merge did not actually execute. + mergeSource.onMergeFinished(onGoingMerge.getMerge()); + } + + private String getName() { + return name; + } + } + + @Override + /* Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */ + protected boolean verbose() { + if (logger.isTraceEnabled()) { + return true; + } + return super.verbose(); + } + + @Override + /* Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */ + protected void message(String message) { + if (logger.isTraceEnabled()) { + logger.trace("{}", message); + } + super.message(message); + } + + /** Close this MergeScheduler. */ + @Override + public void close() throws IOException { + super.close(); + threadPoolMergeExecutor.unregisterMergeScheduler(this); + } + + private static double nsToSec(long ns) { + return ns / (double) TimeUnit.SECONDS.toNanos(1); + } + + private static double bytesToMB(long bytes) { + return bytes / 1024. / 1024.; + } + + private static String getSegmentName(MergePolicy.OneMerge merge) { + return merge.getMergeInfo() != null ? merge.getMergeInfo().info.name : "_na_"; + } + + private static String rateToString(double mbPerSec) { + if (mbPerSec == 0.0) { + return "stopped"; + } else if (mbPerSec == Double.POSITIVE_INFINITY) { + return "unlimited"; + } else { + return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec); + } + } + + private static class ExceptionWrappingError extends RuntimeException { + private static Throwable maybeUnwrapCause(Exception e) { + if (e instanceof ExceptionWrappingError exceptionWrappingError) { + return exceptionWrappingError.getCause(); + } + return e; + } + + private ExceptionWrappingError(Throwable errorCause) { + super(errorCause); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ab1c936d1c469..66ec4f03bc459 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -97,6 +97,7 @@ import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutor; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.ShardFieldData; import org.elasticsearch.index.flush.FlushStats; @@ -194,6 +195,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { private final ThreadPool threadPool; + private final ThreadPoolMergeExecutor threadPoolMergeExecutor; private final MapperService mapperService; private final IndexCache indexCache; private final Store store; @@ -317,6 +319,7 @@ public IndexShard( final IndexEventListener indexEventListener, final CheckedFunction indexReaderWrapper, final ThreadPool threadPool, + final ThreadPoolMergeExecutor threadPoolMergeExecutor, final BigArrays bigArrays, final Engine.Warmer warmer, final List searchOperationListener, @@ -343,6 +346,7 @@ public IndexShard( this.indexSortSupplier = indexSortSupplier; this.indexEventListener = indexEventListener; this.threadPool = threadPool; + this.threadPoolMergeExecutor = threadPoolMergeExecutor; this.mapperService = mapperService; this.indexCache = indexCache; this.internalIndexingStats = new InternalIndexingStats(); @@ -3541,6 +3545,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { return new EngineConfig( shardId, threadPool, + threadPoolMergeExecutor, indexSettings, warmer, store, diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 32634043cfc98..58cef6c5da0ae 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -145,6 +145,10 @@ public Map getBuilders(Settings settings, int allocated false ) ); + result.put( + ThreadPool.Names.MERGE, + new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5), false) + ); result.put( ThreadPool.Names.FORCE_MERGE, new FixedExecutorBuilder( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index cf549f7f4b0b5..a4aaf89925586 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -101,6 +101,7 @@ public static class Names { public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; public static final String SNAPSHOT_META = "snapshot_meta"; + public static final String MERGE = "merge"; public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 26de6a7897786..8c8083a0cc2cd 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2578,10 +2578,10 @@ public boolean mergeCompleted() { public void append(LogEvent event) { final String formattedMessage = event.getMessage().getFormattedMessage(); if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][0]")) { - if (formattedMessage.startsWith("merge thread")) { + if (formattedMessage.startsWith("merge task")) { messages.add(formattedMessage); } else if (event.getLoggerName().endsWith(".MS") - && formattedMessage.contains("MS: merge thread") + && formattedMessage.contains("MS: merge task") && formattedMessage.endsWith("end")) { luceneMergeSchedulerEnded.set(true); } @@ -2616,14 +2616,14 @@ public void testMergeThreadLogging() throws Exception { }); assertBusy(() -> { - List threadMsgs = mockAppender.messages().stream().filter(line -> line.startsWith("merge thread")).toList(); + List threadMsgs = mockAppender.messages().stream().filter(line -> line.startsWith("merge task")).toList(); assertThat("messages:" + threadMsgs, threadMsgs.size(), greaterThanOrEqualTo(3)); assertThat( threadMsgs, containsInRelativeOrder( - matchesRegex("^merge thread .* start$"), - matchesRegex("^merge thread .* merge segment.*$"), - matchesRegex("^merge thread .* end$") + matchesRegex("^merge task .* start$"), + matchesRegex("^merge task .* merge segment.*$"), + matchesRegex("^merge task .* end$") ) ); assertThat(mockAppender.mergeCompleted(), is(true)); @@ -3587,6 +3587,7 @@ public void testRecoverFromForeignTranslog() throws IOException { EngineConfig brokenConfig = new EngineConfig( shardId, threadPool, + threadPoolMergeExecutor, config.getIndexSettings(), null, store, @@ -7149,6 +7150,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { EngineConfig configWithWarmer = new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutor(), config.getIndexSettings(), warmer, store, diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 7d436ab5d8d22..39845edeea01e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4992,6 +4992,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { EngineConfig configWithWarmer = new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutor(), config.getIndexSettings(), warmer, config.getStore(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index ca616dc619ec9..ed7f14ddbba9f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutor; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.LuceneDocument; import org.elasticsearch.index.mapper.MapperService; @@ -89,6 +90,7 @@ public class RefreshListenersTests extends ESTestCase { private Engine engine; private volatile int maxListeners; private ThreadPool threadPool; + private ThreadPoolMergeExecutor threadPoolMergeExecutor; private Store store; @Before @@ -97,6 +99,7 @@ public void setupListeners() throws Exception { maxListeners = randomIntBetween(2, 1000); // Now setup the InternalEngine which is much more complicated because we aren't mocking anything threadPool = new TestThreadPool(getTestName()); + threadPoolMergeExecutor = new ThreadPoolMergeExecutor(threadPool); listeners = new RefreshListeners( () -> maxListeners, () -> engine.refresh("too-many-listeners"), @@ -134,6 +137,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { EngineConfig config = new EngineConfig( shardId, threadPool, + threadPoolMergeExecutor, indexSettings, null, store, diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 7a2f375001874..36a0db0bad8fa 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -155,6 +155,7 @@ public abstract class EngineTestCase extends ESTestCase { protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); protected ThreadPool threadPool; + protected ThreadPoolMergeExecutor threadPoolMergeExecutor; protected TranslogHandler translogHandler; protected Store store; @@ -241,6 +242,8 @@ public void setUp() throws Exception { } defaultSettings = IndexSettingsModule.newIndexSettings("index", indexSettings()); threadPool = new TestThreadPool(getClass().getName()); + threadPoolMergeExecutor = new ThreadPoolMergeExecutor(threadPool); + store = createStore(); storeReplica = createStore(); Lucene.cleanLuceneIndex(store.directory()); @@ -272,6 +275,7 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutor(), config.getIndexSettings(), config.getWarmer(), config.getStore(), @@ -304,6 +308,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutor(), config.getIndexSettings(), config.getWarmer(), config.getStore(), @@ -336,6 +341,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutor(), config.getIndexSettings(), config.getWarmer(), config.getStore(), @@ -840,6 +846,7 @@ public EngineConfig config( return new EngineConfig( shardId, threadPool, + threadPoolMergeExecutor, indexSettings, null, store, @@ -880,6 +887,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutor(), indexSettings, config.getWarmer(), store, diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 2ae4bb0343101..aa5d8b95ed228 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutor; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; @@ -152,6 +153,7 @@ public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailur }; protected ThreadPool threadPool; + protected ThreadPoolMergeExecutor threadPoolMergeExecutor; protected Executor writeExecutor; protected long primaryTerm; @@ -167,6 +169,7 @@ public static void addMockCloseImplementation(IndexShard shard) throws IOExcepti public void setUp() throws Exception { super.setUp(); threadPool = setUpThreadPool(); + threadPoolMergeExecutor = new ThreadPoolMergeExecutor(threadPool); writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards failOnShardFailures(); @@ -537,6 +540,7 @@ protected IndexShard newShard( indexEventListener, indexReaderWrapper, threadPool, + threadPoolMergeExecutor, BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 62dc3313a1172..fcb98450a19c7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutor; import org.elasticsearch.index.engine.TranslogHandler; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; @@ -82,6 +83,7 @@ public class FollowingEngineTests extends ESTestCase { private ThreadPool threadPool; + private ThreadPoolMergeExecutor threadPoolMergeExecutor; private Index index; private ShardId shardId; private AtomicLong primaryTerm = new AtomicLong(); @@ -92,6 +94,7 @@ public class FollowingEngineTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool("following-engine-tests"); + threadPoolMergeExecutor = new ThreadPoolMergeExecutor(threadPool); index = new Index("index", "uuid"); shardId = new ShardId(index, 0); primaryTerm.set(randomLongBetween(1, Long.MAX_VALUE)); @@ -113,7 +116,7 @@ public void testFollowingEngineRejectsNonFollowingIndex() throws IOException { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutor, store); final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new FollowingEngine(engineConfig)); assertThat(e, hasToString(containsString("a following engine can not be constructed for a non-following index"))); } @@ -137,7 +140,7 @@ public void testOutOfOrderDocuments() throws IOException { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutor, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { final VersionType versionType = randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE); final List ops = EngineTestCase.generateSingleDocHistory(true, versionType, 2, 2, 20, "id"); @@ -156,7 +159,7 @@ public void runIndexTest( final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutor, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { final Engine.Index indexToTest = indexForFollowing("id", seqNo, origin); consumer.accept(followingEngine, indexToTest); @@ -182,7 +185,7 @@ public void runDeleteTest( final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutor, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { final String id = "id"; final Engine.Delete delete = new Engine.Delete( @@ -208,7 +211,7 @@ public void testDoNotFillSeqNoGaps() throws Exception { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutor, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { followingEngine.index(indexForFollowing("id", 128, Engine.Operation.Origin.PRIMARY)); int addedNoops = followingEngine.fillSeqNoGaps(primaryTerm.get()); @@ -221,6 +224,7 @@ private EngineConfig engineConfig( final ShardId shardIdValue, final IndexSettings indexSettings, final ThreadPool threadPool, + final ThreadPoolMergeExecutor threadPoolMergeExecutor, final Store store ) throws IOException { final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); @@ -235,6 +239,7 @@ private EngineConfig engineConfig( return new EngineConfig( shardIdValue, threadPool, + threadPoolMergeExecutor, indexSettings, null, store, @@ -506,7 +511,7 @@ public void testConcurrentIndexOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates( IndexMetadata followerIndexMetadata = IndexMetadata.builder(index.getName()).settings(followerSettings).build(); IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetadata, Settings.EMPTY); try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) { - EngineConfig followerConfig = engineConfig(shardId, followerIndexSettings, threadPool, followerStore); + EngineConfig followerConfig = engineConfig(shardId, followerIndexSettings, threadPool, threadPoolMergeExecutor, followerStore); followerStore.createEmpty(); String translogUuid = Translog.createEmptyTranslog( followerConfig.getTranslogConfig().getTranslogPath(), @@ -613,7 +618,7 @@ private void runFollowTest(CheckedBiConsumer operationWithTerms = new HashMap<>(); @@ -882,7 +893,7 @@ public void testMaxSeqNoInCommitUserData() throws Exception { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutor, store); try (FollowingEngine engine = createEngine(store, engineConfig)) { AtomicBoolean running = new AtomicBoolean(true); Thread rollTranslog = new Thread(() -> {