Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8fb0811
commit es changes
ankikuma Apr 22, 2025
23ebfcd
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma Apr 22, 2025
e65edc3
[CI] Auto commit changes from spotless
Apr 22, 2025
9a17752
es commit
ankikuma Apr 23, 2025
0ca45bd
commit es
ankikuma Apr 23, 2025
b6174c3
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma Apr 23, 2025
c192804
merge
ankikuma Apr 23, 2025
9397d07
[CI] Auto commit changes from spotless
Apr 23, 2025
72d7b93
spotless
ankikuma Apr 23, 2025
4b9bf6e
merge
ankikuma Apr 23, 2025
1aa3b03
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma Apr 29, 2025
284aadb
address comments
ankikuma Apr 30, 2025
1121e5a
review comments and test
ankikuma May 2, 2025
2493b54
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma May 2, 2025
a1324e6
refresh
ankikuma May 2, 2025
d480056
comment
ankikuma May 2, 2025
d33901d
comment
ankikuma May 2, 2025
10a60a5
[CI] Auto commit changes from spotless
May 2, 2025
28c1f27
remove debug comments
ankikuma May 5, 2025
adc1bf6
commit
ankikuma May 5, 2025
8ce3c9f
Merge branch '04212025/PauseIndexingES11516' of github.com:ankikuma/e…
ankikuma May 5, 2025
8a52140
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma May 5, 2025
8e2165c
commit
ankikuma May 5, 2025
6dc3e8f
[CI] Auto commit changes from spotless
May 5, 2025
e8e48f9
change lock implementation
ankikuma May 5, 2025
a06f7e6
merge
ankikuma May 5, 2025
0969087
minor comment
ankikuma May 5, 2025
a8f83fb
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma May 6, 2025
38a0153
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma May 6, 2025
5edcc04
test with setting true
ankikuma May 7, 2025
9147f3b
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma May 7, 2025
e1f5de2
default setting false
ankikuma May 8, 2025
73f93c7
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma May 8, 2025
1e77c09
Merge remote-tracking branch 'upstream/main' into 04212025/PauseIndex…
ankikuma May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING,
IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING,
IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE,
ResourceWatcherService.ENABLED,
ResourceWatcherService.RELOAD_INTERVAL_HIGH,
ResourceWatcherService.RELOAD_INTERVAL_MEDIUM,
Expand Down
71 changes: 70 additions & 1 deletion server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.index.engine;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat;
import org.apache.lucene.index.ByteVectorValues;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -108,6 +110,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -145,6 +148,7 @@ public abstract class Engine implements Closeable {
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final SetOnce<Exception> failedEngine = new SetOnce<>();
protected final boolean enableRecoverySource;
protected final boolean pauseIndexingOnThrottle;

private final AtomicBoolean isClosing = new AtomicBoolean();
private final SubscribableListener<Void> drainOnCloseListener = new SubscribableListener<>();
Expand Down Expand Up @@ -176,6 +180,9 @@ protected Engine(EngineConfig engineConfig) {
this.enableRecoverySource = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(
engineConfig.getIndexSettings().getSettings()
);
this.pauseIndexingOnThrottle = IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.get(
engineConfig.getIndexSettings().getSettings()
);
}

/**
Expand Down Expand Up @@ -444,12 +451,19 @@ public interface IndexCommitListener {
* is enabled
*/
protected static final class IndexThrottle {
private static final Logger logger = LogManager.getLogger(IndexThrottle.class);
private final CounterMetric throttleTimeMillisMetric = new CounterMetric();
private volatile long startOfThrottleNS;
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
private final PauseLock throttlingLock;
private final ReleasableLock lockReference;
private volatile ReleasableLock lock = NOOP_LOCK;

public IndexThrottle(boolean pause) {
throttlingLock = new PauseLock(pause ? 0 : 1);
lockReference = new ReleasableLock(throttlingLock);
}

public Releasable acquireThrottle() {
return lock.acquire();
}
Expand All @@ -458,12 +472,15 @@ public Releasable acquireThrottle() {
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";
startOfThrottleNS = System.nanoTime();
throttlingLock.throttle();
lock = lockReference;
}

/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
public void deactivate() {
assert lock != NOOP_LOCK : "throttling deactivated but not active";

throttlingLock.unthrottle();
lock = NOOP_LOCK;

assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
Expand Down Expand Up @@ -553,6 +570,58 @@ public Condition newCondition() {
}
}

/* A lock implementation that allows us to control how many threads can take the lock
* In particular, this is used to set the number of allowed threads to 1 or 0
* when index throttling is activated.
*/
protected static final class PauseLock implements Lock {
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
private final int allowThreads;

public PauseLock(int allowThreads) {
this.allowThreads = allowThreads;
}

public void lock() {
semaphore.acquireUninterruptibly();
}

@Override
public void lockInterruptibly() throws InterruptedException {
semaphore.acquire();
}

@Override
public void unlock() {
semaphore.release();
}

@Override
public boolean tryLock() {
throw new UnsupportedOperationException();
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}

public void throttle() {
assert semaphore.availablePermits() == Integer.MAX_VALUE;
semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads);
}

public void unthrottle() {
assert semaphore.availablePermits() <= allowThreads;
semaphore.release(Integer.MAX_VALUE - allowThreads);
}
}

/**
* Perform document index operation on the engine
* @param index operation to perform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public class InternalEngine extends Engine {
private final CombinedDeletionPolicy combinedDeletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttle
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
Expand Down Expand Up @@ -260,7 +260,7 @@ public InternalEngine(EngineConfig engineConfig) {
engineConfig.getThreadPoolMergeExecutorService()
);
scheduler = mergeScheduler.getMergeScheduler();
throttle = new IndexThrottle();
throttle = new IndexThrottle(pauseIndexingOnThrottle);
try {
store.trimUnsafeCommits(config().getTranslogConfig().getTranslogPath());
translog = openTranslog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2733,6 +2733,9 @@ public IndexEventListener getIndexEventListener() {
return indexEventListener;
}

/** Activate throttling for this shard. If {@link IndexingMemoryController#PAUSE_INDEXING_ON_THROTTLE}
* setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread.
*/
public void activateThrottling() {
try {
getEngine().activateThrottling();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos
Property.NodeScope
);

/* Currently, indexing is throttled due to memory pressure in stateful/stateless or disk pressure in stateless.
* This limits the number of indexing threads to 1 per shard. However, this might not be enough when the number of
* shards that need indexing is larger than the number of threads. So we might opt to pause indexing completely.
* The default value for this setting is false, but it will be set to true in stateless.
*/
public static final Setting<Boolean> PAUSE_INDEXING_ON_THROTTLE = Setting.boolSetting(
"indices.pause.on.throttle",
false,
Property.NodeScope
);

private final ThreadPool threadPool;

private final Iterable<IndexShard> indexShards;
Expand Down Expand Up @@ -236,7 +247,9 @@ void forceCheck() {
statusChecker.run();
}

/** Asks this shard to throttle indexing to one thread */
/** Asks this shard to throttle indexing to one thread. If the PAUSE_INDEXING_ON_THROTTLE seeting is set to true,
* throttling will pause indexing completely for the throttled shard.
*/
protected void activateThrottling(IndexShard shard) {
shard.activateThrottling();
}
Expand Down