Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128405.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128405
summary: Modify the mechanism to pause indexing
area: Distributed
type: bug
issues: []
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test025SyncPluginsUsingProxy
issue: https://github.com/elastic/elasticsearch/issues/127138
- class: org.elasticsearch.indices.stats.IndexStatsIT
method: testThrottleStats
issue: https://github.com/elastic/elasticsearch/issues/126359
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityRestIT
method: testTaskCancellation
issue: https://github.com/elastic/elasticsearch/issues/128009
Expand Down
55 changes: 47 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -455,32 +455,71 @@ protected static final class IndexThrottle {
private final CounterMetric throttleTimeMillisMetric = new CounterMetric();
private volatile long startOfThrottleNS;
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
private final PauseLock throttlingLock;
private final ReleasableLock lockReference;
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
private final Lock pauseIndexingLock = new ReentrantLock();
private final Condition pauseCondition = pauseIndexingLock.newCondition();
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
private volatile AtomicBoolean pauseIndexing = new AtomicBoolean();
private final boolean pauseWhenThrottled;
private volatile ReleasableLock lock = NOOP_LOCK;

public IndexThrottle(boolean pause) {
throttlingLock = new PauseLock(pause ? 0 : 1);
lockReference = new ReleasableLock(throttlingLock);
pauseWhenThrottled = pause;
}

public Releasable acquireThrottle() {
return lock.acquire();
if (lock == pauseLockReference) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We read lock twice when doing regular throttling, once for the condition here and once below. Since it is volatile, it would be good to only read it once, i.e., copy it to a local variable:

var lock = this.lock;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But don't we need to read the latest value of lock inside the try block ?

pauseLockReference.acquire();
try {
while (pauseIndexing.getAcquire()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be:

Suggested change
while (pauseIndexing.getAcquire()) {
while (lock == pauseLockReference) {

and thus we can avoid the extra boolean?

// System.out.println("Waiting on pause indexing lock");
logger.trace("Waiting on pause indexing lock");
pauseCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
// System.out.println("Acquired pause indexing lock");
logger.trace("Acquired pause indexing lock");
}
return pauseLockReference;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might as well release the lock immediately and return a noop releasable?

That would allow us to use try-with-resource when acquiring the pauseLockReference too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still like to see this carried through.

As we wake up after pause throttling, with the current mechanism, we'll only wake up one of the paused threads at a time until each of their indexing requests complete. Notice that await only allows one thread at a time to come out of await until the lock is released, since returning from await reacquires the lock.

This means that if the situation is cured, but the first few indexing requests take "a long time", we are still essentially throttling to fewer threads for a while.

I'll LGTM it for now, since it is not that harmful, but would prefer to see this sorted in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had also thought about this problem that even after throttling gets deactivated, we still only let one indexing job pass at a time. But I reasoned that we were already doing that before I moved to the pause indexing option.
But I think I understand now your solution.

} else {
return lock.acquire();
}
}

/** Activate throttling, which switches the lock to be a real lock */
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";
startOfThrottleNS = System.nanoTime();
throttlingLock.throttle();
lock = lockReference;
if (pauseWhenThrottled) {
pauseIndexing.setRelease(true);
lock = pauseLockReference;
} else {
lock = lockReference;
}
}

/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
public void deactivate() {
assert lock != NOOP_LOCK : "throttling deactivated but not active";

throttlingLock.unthrottle();
// throttlingLock.unthrottle();

if (lock == pauseLockReference) {
logger.trace("Deactivate index throttling pause");

// Signal the threads that are waiting on pauseCondition
pauseLockReference.acquire();
try {
// System.out.println("Deactivate pause");
pauseIndexing.setRelease(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the pauseIndexing boolean as suggested above, we need to set the lock = noop no later than here.

I think we could simply this to:

lock = NOOP_LOCK;
pauseLockReference.acquire();
                try {
                    // System.out.println("Deactivate pause");
                    pauseIndexing.setRelease(false);
Comment
To avoid the `pauseIndexing` boolean as suggested above, we need to set the `lock = noop` no later than here.

I think we could simply this to:

lock = NOOP_LOCK;
try (pauseLockReference.acquire()) {
pauseCondition.signalAll();
}

pauseCondition.signalAll();
} finally {
pauseLockReference.close();
}
}
lock = NOOP_LOCK;

assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2967,7 +2967,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {
@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = getMaxMergeCount();
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (numMergesInFlight.decrementAndGet() <= maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
deactivateThrottling();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogOperationsUtils;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
Expand Down Expand Up @@ -169,6 +170,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntSupplier;
Expand Down Expand Up @@ -7015,6 +7017,50 @@ public void testIndexThrottling() throws Exception {
verify(indexWithoutThrottlingCheck, atLeastOnce()).startTime();
}

/* Test that indexing is paused during throttling using the PAUSE_INDEXING_ON_THROTTLE setting is on.
* The test tries to index a document into a shard for which indexing is throttled. It is unable to
* do so until throttling is disabled.
* Indexing proceeds as usual once the shard throttle is deactivated.
*/
public void testIndexThrottlingWithPause() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
try (
Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))
) {
final List<DocIdSeqNoAndSource> prevDocs;
final Engine.Index indexWithThrottlingCheck = indexForDoc(createParsedDoc("1", null));
final Engine.Index indexWithoutThrottlingCheck = indexForDoc(createParsedDoc("2", null));
prevDocs = getDocIds(engine, true);
assertThat(prevDocs.size(), equalTo(0));
Thread indexWithThrottle = new Thread(() -> {
try {
engine.index(indexWithThrottlingCheck);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// Activate throttling (this will pause indexing)
engine.activateThrottling();
assertTrue(engine.isThrottled());
indexWithThrottle.start();
// Wait for the thread to complete, it will not complete because of the pause
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
assertThat(getDocIds(engine, true).size(), equalTo(0));
// Deactivate to allow the indexing thread to proceed
engine.deactivateThrottling();
indexWithThrottle.join();
assertThat(getDocIds(engine, true).size(), equalTo(1));
engine.index(indexWithoutThrottlingCheck);
assertThat(getDocIds(engine, true).size(), equalTo(2));
}
}

public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception {
MapperService mapperService = createMapperService();
final AtomicInteger refreshCount = new AtomicInteger();
Expand Down
Loading