Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128405.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128405
summary: Modify the mechanism to pause indexing
area: Distributed
type: bug
issues: []
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test025SyncPluginsUsingProxy
issue: https://github.com/elastic/elasticsearch/issues/127138
- class: org.elasticsearch.indices.stats.IndexStatsIT
method: testThrottleStats
issue: https://github.com/elastic/elasticsearch/issues/126359
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityRestIT
method: testTaskCancellation
issue: https://github.com/elastic/elasticsearch/issues/128009
Expand Down
67 changes: 59 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,33 +455,64 @@ protected static final class IndexThrottle {
private final CounterMetric throttleTimeMillisMetric = new CounterMetric();
private volatile long startOfThrottleNS;
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
private final PauseLock throttlingLock;
private final ReleasableLock lockReference;
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
private final Lock pauseIndexingLock = new ReentrantLock();
private final Condition pauseCondition = pauseIndexingLock.newCondition();
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
private volatile AtomicBoolean pauseThrottling = new AtomicBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this and the method differently, like suspendThrottling? pauseThrottling sounds like a variable indicating that the "pause throttling" mechanism is enabled when instead it is that it is disabled ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

private final boolean pauseWhenThrottled;
private volatile ReleasableLock lock = NOOP_LOCK;

public IndexThrottle(boolean pause) {
throttlingLock = new PauseLock(pause ? 0 : 1);
lockReference = new ReleasableLock(throttlingLock);
pauseWhenThrottled = pause;
}

public Releasable acquireThrottle() {
return lock.acquire();
if (lock == pauseLockReference) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We read lock twice when doing regular throttling, once for the condition here and once below. Since it is volatile, it would be good to only read it once, i.e., copy it to a local variable:

var lock = this.lock;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But don't we need to read the latest value of lock inside the try block ?

pauseLockReference.acquire();
try {
// If throttling is activate and not temporarily paused
while ((lock == pauseLockReference) && (pauseThrottling.getAcquire() == false)) {
logger.trace("Waiting on pause indexing lock");
pauseCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
logger.trace("Acquired pause indexing lock");
}
return pauseLockReference;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might as well release the lock immediately and return a noop releasable?

That would allow us to use try-with-resource when acquiring the pauseLockReference too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still like to see this carried through.

As we wake up after pause throttling, with the current mechanism, we'll only wake up one of the paused threads at a time until each of their indexing requests complete. Notice that await only allows one thread at a time to come out of await until the lock is released, since returning from await reacquires the lock.

This means that if the situation is cured, but the first few indexing requests take "a long time", we are still essentially throttling to fewer threads for a while.

I'll LGTM it for now, since it is not that harmful, but would prefer to see this sorted in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had also thought about this problem that even after throttling gets deactivated, we still only let one indexing job pass at a time. But I reasoned that we were already doing that before I moved to the pause indexing option.
But I think I understand now your solution.

} else {
return lock.acquire();
}
}

/** Activate throttling, which switches the lock to be a real lock */
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";

startOfThrottleNS = System.nanoTime();
throttlingLock.throttle();
lock = lockReference;
if (pauseWhenThrottled) {
lock = pauseLockReference;
logger.trace("Activated index throttling pause");
} else {
lock = lockReference;
}
}

/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
public void deactivate() {
assert lock != NOOP_LOCK : "throttling deactivated but not active";

throttlingLock.unthrottle();
lock = NOOP_LOCK;
if (pauseWhenThrottled) {
// Signal the threads that are waiting on pauseCondition
try (Releasable releasableLock = pauseLockReference.acquire()) {
pauseCondition.signalAll();
}
logger.trace("Deactivated index throttling pause");
}

assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
long throttleTimeNS = System.nanoTime() - startOfThrottleNS;
Expand All @@ -508,6 +539,26 @@ boolean isThrottled() {
return lock != NOOP_LOCK;
}

// Pause throttling to allow another task such as relocation to acquire all indexing permits
public void pauseThrottle() {
if (pauseWhenThrottled) {
try (Releasable releasableLock = pauseLockReference.acquire()) {
pauseThrottling.setRelease(true);
pauseCondition.signalAll();
}
}
}

// Reverse what was done in pauseThrottle()
public void unpauseThrottle() {
if (pauseWhenThrottled) {
try (Releasable releasableLock = pauseLockReference.acquire()) {
pauseThrottling.setRelease(false);
pauseCondition.signalAll();
}
}
}

boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
if (isThrottled()) {
return lock.isHeldByCurrentThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2828,19 +2828,27 @@ public void accept(ElasticsearchDirectoryReader reader, ElasticsearchDirectoryRe

@Override
public void activateThrottling() {
int count = throttleRequestCount.incrementAndGet();
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
if (count == 1) {
throttle.activate();
// Synchronize on throttleRequestCount to make activateThrottling and deactivateThrottling
// atomic w.r.t each other
synchronized (throttleRequestCount) {
int count = throttleRequestCount.incrementAndGet();
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
if (count == 1) {
throttle.activate();
}
}
}

@Override
public void deactivateThrottling() {
int count = throttleRequestCount.decrementAndGet();
assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count;
if (count == 0) {
throttle.deactivate();
// Synchronize on throttleRequestCount to make activateThrottling and deactivateThrottling
// atomic w.r.t each other
synchronized (throttleRequestCount) {
int count = throttleRequestCount.decrementAndGet();
assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count;
if (count == 0) {
throttle.deactivate();
}
}
}

Expand Down Expand Up @@ -2972,7 +2980,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {
@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = getMaxMergeCount();
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (numMergesInFlight.decrementAndGet() <= maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
deactivateThrottling();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogOperationsUtils;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
Expand Down Expand Up @@ -170,6 +171,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntSupplier;
Expand Down Expand Up @@ -7035,6 +7037,50 @@ public void testIndexThrottling() throws Exception {
verify(indexWithoutThrottlingCheck, atLeastOnce()).startTime();
}

/* Test that indexing is paused during throttling using the PAUSE_INDEXING_ON_THROTTLE setting is on.
* The test tries to index a document into a shard for which indexing is throttled. It is unable to
* do so until throttling is disabled.
* Indexing proceeds as usual once the shard throttle is deactivated.
*/
public void testIndexThrottlingWithPause() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
try (
Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))
) {
final List<DocIdSeqNoAndSource> prevDocs;
final Engine.Index indexWithThrottlingCheck = indexForDoc(createParsedDoc("1", null));
final Engine.Index indexWithoutThrottlingCheck = indexForDoc(createParsedDoc("2", null));
prevDocs = getDocIds(engine, true);
assertThat(prevDocs.size(), equalTo(0));
Thread indexWithThrottle = new Thread(() -> {
try {
engine.index(indexWithThrottlingCheck);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// Activate throttling (this will pause indexing)
engine.activateThrottling();
assertTrue(engine.isThrottled());
indexWithThrottle.start();
// Wait for the thread to complete, it will not complete because of the pause
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
assertThat(getDocIds(engine, true).size(), equalTo(0));
// Deactivate to allow the indexing thread to proceed
engine.deactivateThrottling();
indexWithThrottle.join();
assertThat(getDocIds(engine, true).size(), equalTo(1));
engine.index(indexWithoutThrottlingCheck);
assertThat(getDocIds(engine, true).size(), equalTo(2));
}
}

public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception {
MapperService mapperService = createMapperService();
final AtomicInteger refreshCount = new AtomicInteger();
Expand Down
Loading