diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 13d955e946ef0..a15d4f3049528 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -526,6 +526,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING, + IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE, ResourceWatcherService.ENABLED, ResourceWatcherService.RELOAD_INTERVAL_HIGH, ResourceWatcherService.RELOAD_INTERVAL_MEDIUM, diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index afd0ceb6d066d..de8bfd0e3e61c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -9,6 +9,7 @@ package org.elasticsearch.index.engine; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat; import org.apache.lucene.index.ByteVectorValues; @@ -87,6 +88,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; @@ -108,6 +110,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; @@ -145,6 +148,7 @@ public abstract class Engine implements Closeable { protected final ReentrantLock failEngineLock = new ReentrantLock(); protected final SetOnce failedEngine = new SetOnce<>(); protected final boolean enableRecoverySource; + protected final boolean pauseIndexingOnThrottle; private final AtomicBoolean isClosing = new AtomicBoolean(); private final SubscribableListener drainOnCloseListener = new SubscribableListener<>(); @@ -176,6 +180,9 @@ protected Engine(EngineConfig engineConfig) { this.enableRecoverySource = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get( engineConfig.getIndexSettings().getSettings() ); + this.pauseIndexingOnThrottle = IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.get( + engineConfig.getIndexSettings().getSettings() + ); } /** @@ -444,12 +451,19 @@ public interface IndexCommitListener { * is enabled */ protected static final class IndexThrottle { + private static final Logger logger = LogManager.getLogger(IndexThrottle.class); private final CounterMetric throttleTimeMillisMetric = new CounterMetric(); private volatile long startOfThrottleNS; private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock()); - private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock()); + private final PauseLock throttlingLock; + private final ReleasableLock lockReference; private volatile ReleasableLock lock = NOOP_LOCK; + public IndexThrottle(boolean pause) { + throttlingLock = new PauseLock(pause ? 0 : 1); + lockReference = new ReleasableLock(throttlingLock); + } + public Releasable acquireThrottle() { return lock.acquire(); } @@ -458,12 +472,15 @@ public Releasable acquireThrottle() { public void activate() { assert lock == NOOP_LOCK : "throttling activated while already active"; startOfThrottleNS = System.nanoTime(); + throttlingLock.throttle(); lock = lockReference; } /** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */ public void deactivate() { assert lock != NOOP_LOCK : "throttling deactivated but not active"; + + throttlingLock.unthrottle(); lock = NOOP_LOCK; assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS"; @@ -553,6 +570,58 @@ public Condition newCondition() { } } + /* A lock implementation that allows us to control how many threads can take the lock + * In particular, this is used to set the number of allowed threads to 1 or 0 + * when index throttling is activated. + */ + protected static final class PauseLock implements Lock { + private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + private final int allowThreads; + + public PauseLock(int allowThreads) { + this.allowThreads = allowThreads; + } + + public void lock() { + semaphore.acquireUninterruptibly(); + } + + @Override + public void lockInterruptibly() throws InterruptedException { + semaphore.acquire(); + } + + @Override + public void unlock() { + semaphore.release(); + } + + @Override + public boolean tryLock() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + + public void throttle() { + assert semaphore.availablePermits() == Integer.MAX_VALUE; + semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads); + } + + public void unthrottle() { + assert semaphore.availablePermits() <= allowThreads; + semaphore.release(Integer.MAX_VALUE - allowThreads); + } + } + /** * Perform document index operation on the engine * @param index operation to perform diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 86fb88c40252d..bd1c5c26bb450 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -171,7 +171,7 @@ public class InternalEngine extends Engine { private final CombinedDeletionPolicy combinedDeletionPolicy; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges - // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling + // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttle // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); @@ -260,7 +260,7 @@ public InternalEngine(EngineConfig engineConfig) { engineConfig.getThreadPoolMergeExecutorService() ); scheduler = mergeScheduler.getMergeScheduler(); - throttle = new IndexThrottle(); + throttle = new IndexThrottle(pauseIndexingOnThrottle); try { store.trimUnsafeCommits(config().getTranslogConfig().getTranslogPath()); translog = openTranslog( diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 76ef5428f624b..4b38a5d378ecf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2733,6 +2733,9 @@ public IndexEventListener getIndexEventListener() { return indexEventListener; } + /** Activate throttling for this shard. If {@link IndexingMemoryController#PAUSE_INDEXING_ON_THROTTLE} + * setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread. + */ public void activateThrottling() { try { getEngine().activateThrottling(); diff --git a/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index 5fdee57ad7973..b884b2c850cc5 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -87,6 +87,17 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos Property.NodeScope ); + /* Currently, indexing is throttled due to memory pressure in stateful/stateless or disk pressure in stateless. + * This limits the number of indexing threads to 1 per shard. However, this might not be enough when the number of + * shards that need indexing is larger than the number of threads. So we might opt to pause indexing completely. + * The default value for this setting is false, but it will be set to true in stateless. + */ + public static final Setting PAUSE_INDEXING_ON_THROTTLE = Setting.boolSetting( + "indices.pause.on.throttle", + false, + Property.NodeScope + ); + private final ThreadPool threadPool; private final Iterable indexShards; @@ -236,7 +247,9 @@ void forceCheck() { statusChecker.run(); } - /** Asks this shard to throttle indexing to one thread */ + /** Asks this shard to throttle indexing to one thread. If the PAUSE_INDEXING_ON_THROTTLE seeting is set to true, + * throttling will pause indexing completely for the throttled shard. + */ protected void activateThrottling(IndexShard shard) { shard.activateThrottling(); }