Skip to content

Commit 0d6dd21

Browse files
ankikumajoshua-adams-1
authored andcommitted
Modify the mechanism to pause indexing (elastic#128405)
This PR changes the mechanism to pause indexing which was introduced in elastic#127173. The original PR caused IndexStatsIT#testThrottleStats to fail. See elastic#126359.
1 parent a34b165 commit 0d6dd21

File tree

5 files changed

+129
-73
lines changed

5 files changed

+129
-73
lines changed

docs/changelog/128405.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128405
2+
summary: Modify the mechanism to pause indexing
3+
area: Distributed
4+
type: bug
5+
issues: []

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,6 @@ tests:
417417
- class: org.elasticsearch.packaging.test.DockerTests
418418
method: test025SyncPluginsUsingProxy
419419
issue: https://github.com/elastic/elasticsearch/issues/127138
420-
- class: org.elasticsearch.indices.stats.IndexStatsIT
421-
method: testThrottleStats
422-
issue: https://github.com/elastic/elasticsearch/issues/126359
423420
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityRestIT
424421
method: testTaskCancellation
425422
issue: https://github.com/elastic/elasticsearch/issues/128009

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 61 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@
110110
import java.util.Set;
111111
import java.util.concurrent.CountDownLatch;
112112
import java.util.concurrent.ExecutionException;
113-
import java.util.concurrent.Semaphore;
114113
import java.util.concurrent.TimeUnit;
115114
import java.util.concurrent.atomic.AtomicBoolean;
116115
import java.util.concurrent.locks.Condition;
@@ -455,33 +454,66 @@ protected static final class IndexThrottle {
455454
private final CounterMetric throttleTimeMillisMetric = new CounterMetric();
456455
private volatile long startOfThrottleNS;
457456
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
458-
private final PauseLock throttlingLock;
459-
private final ReleasableLock lockReference;
457+
// This lock throttles indexing to 1 thread (per shard)
458+
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
459+
// This lock pauses indexing completely (on a per shard basis)
460+
private final Lock pauseIndexingLock = new ReentrantLock();
461+
private final Condition pauseCondition = pauseIndexingLock.newCondition();
462+
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
463+
private volatile AtomicBoolean suspendThrottling = new AtomicBoolean();
464+
private final boolean pauseWhenThrottled; // Should throttling pause indexing ?
460465
private volatile ReleasableLock lock = NOOP_LOCK;
461466

462467
public IndexThrottle(boolean pause) {
463-
throttlingLock = new PauseLock(pause ? 0 : 1);
464-
lockReference = new ReleasableLock(throttlingLock);
468+
pauseWhenThrottled = pause;
465469
}
466470

467471
public Releasable acquireThrottle() {
468-
return lock.acquire();
472+
var lockCopy = this.lock;
473+
if (lockCopy == pauseLockReference) {
474+
try (var ignored = pauseLockReference.acquire()) {
475+
// If pause throttling is activated and not temporarily suspended
476+
while ((lock == pauseLockReference) && (suspendThrottling.getAcquire() == false)) {
477+
logger.trace("Waiting on pause indexing lock");
478+
pauseCondition.await();
479+
}
480+
} catch (InterruptedException e) {
481+
Thread.currentThread().interrupt();
482+
throw new RuntimeException(e);
483+
} finally {
484+
logger.trace("Acquired pause indexing lock");
485+
}
486+
return (() -> {});
487+
} else {
488+
return lockCopy.acquire();
489+
}
469490
}
470491

471492
/** Activate throttling, which switches the lock to be a real lock */
472493
public void activate() {
473494
assert lock == NOOP_LOCK : "throttling activated while already active";
495+
474496
startOfThrottleNS = System.nanoTime();
475-
throttlingLock.throttle();
476-
lock = lockReference;
497+
if (pauseWhenThrottled) {
498+
lock = pauseLockReference;
499+
logger.trace("Activated index throttling pause");
500+
} else {
501+
lock = lockReference;
502+
}
477503
}
478504

479505
/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
480506
public void deactivate() {
481507
assert lock != NOOP_LOCK : "throttling deactivated but not active";
482508

483-
throttlingLock.unthrottle();
484509
lock = NOOP_LOCK;
510+
if (pauseWhenThrottled) {
511+
// Signal the threads that are waiting on pauseCondition
512+
try (Releasable releasableLock = pauseLockReference.acquire()) {
513+
pauseCondition.signalAll();
514+
}
515+
logger.trace("Deactivated index throttling pause");
516+
}
485517

486518
assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
487519
long throttleTimeNS = System.nanoTime() - startOfThrottleNS;
@@ -508,6 +540,26 @@ boolean isThrottled() {
508540
return lock != NOOP_LOCK;
509541
}
510542

543+
/** Suspend throttling to allow another task such as relocation to acquire all indexing permits */
544+
public void suspendThrottle() {
545+
if (pauseWhenThrottled) {
546+
try (Releasable releasableLock = pauseLockReference.acquire()) {
547+
suspendThrottling.setRelease(true);
548+
pauseCondition.signalAll();
549+
}
550+
}
551+
}
552+
553+
/** Reverse what was done in {@link #suspendThrottle()} */
554+
public void resumeThrottle() {
555+
if (pauseWhenThrottled) {
556+
try (Releasable releasableLock = pauseLockReference.acquire()) {
557+
suspendThrottling.setRelease(false);
558+
pauseCondition.signalAll();
559+
}
560+
}
561+
}
562+
511563
boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
512564
if (isThrottled()) {
513565
return lock.isHeldByCurrentThread();
@@ -570,58 +622,6 @@ public Condition newCondition() {
570622
}
571623
}
572624

573-
/* A lock implementation that allows us to control how many threads can take the lock
574-
* In particular, this is used to set the number of allowed threads to 1 or 0
575-
* when index throttling is activated.
576-
*/
577-
protected static final class PauseLock implements Lock {
578-
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
579-
private final int allowThreads;
580-
581-
public PauseLock(int allowThreads) {
582-
this.allowThreads = allowThreads;
583-
}
584-
585-
public void lock() {
586-
semaphore.acquireUninterruptibly();
587-
}
588-
589-
@Override
590-
public void lockInterruptibly() throws InterruptedException {
591-
semaphore.acquire();
592-
}
593-
594-
@Override
595-
public void unlock() {
596-
semaphore.release();
597-
}
598-
599-
@Override
600-
public boolean tryLock() {
601-
throw new UnsupportedOperationException();
602-
}
603-
604-
@Override
605-
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
606-
throw new UnsupportedOperationException();
607-
}
608-
609-
@Override
610-
public Condition newCondition() {
611-
throw new UnsupportedOperationException();
612-
}
613-
614-
public void throttle() {
615-
assert semaphore.availablePermits() == Integer.MAX_VALUE;
616-
semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads);
617-
}
618-
619-
public void unthrottle() {
620-
assert semaphore.availablePermits() <= allowThreads;
621-
semaphore.release(Integer.MAX_VALUE - allowThreads);
622-
}
623-
}
624-
625625
/**
626626
* Perform document index operation on the engine
627627
* @param index operation to perform

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2828,19 +2828,27 @@ public void accept(ElasticsearchDirectoryReader reader, ElasticsearchDirectoryRe
28282828

28292829
@Override
28302830
public void activateThrottling() {
2831-
int count = throttleRequestCount.incrementAndGet();
2832-
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
2833-
if (count == 1) {
2834-
throttle.activate();
2831+
// Synchronize on throttleRequestCount to make activateThrottling and deactivateThrottling
2832+
// atomic w.r.t each other
2833+
synchronized (throttleRequestCount) {
2834+
int count = throttleRequestCount.incrementAndGet();
2835+
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
2836+
if (count == 1) {
2837+
throttle.activate();
2838+
}
28352839
}
28362840
}
28372841

28382842
@Override
28392843
public void deactivateThrottling() {
2840-
int count = throttleRequestCount.decrementAndGet();
2841-
assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count;
2842-
if (count == 0) {
2843-
throttle.deactivate();
2844+
// Synchronize on throttleRequestCount to make activateThrottling and deactivateThrottling
2845+
// atomic w.r.t each other
2846+
synchronized (throttleRequestCount) {
2847+
int count = throttleRequestCount.decrementAndGet();
2848+
assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count;
2849+
if (count == 0) {
2850+
throttle.deactivate();
2851+
}
28442852
}
28452853
}
28462854

@@ -2972,7 +2980,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {
29722980
@Override
29732981
public synchronized void afterMerge(OnGoingMerge merge) {
29742982
int maxNumMerges = getMaxMergeCount();
2975-
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
2983+
if (numMergesInFlight.decrementAndGet() <= maxNumMerges) {
29762984
if (isThrottling.getAndSet(false)) {
29772985
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
29782986
deactivateThrottling();

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@
131131
import org.elasticsearch.index.translog.TranslogConfig;
132132
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
133133
import org.elasticsearch.index.translog.TranslogOperationsUtils;
134+
import org.elasticsearch.indices.IndexingMemoryController;
134135
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
135136
import org.elasticsearch.indices.recovery.RecoverySettings;
136137
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
@@ -170,6 +171,7 @@
170171
import java.util.concurrent.atomic.AtomicInteger;
171172
import java.util.concurrent.atomic.AtomicLong;
172173
import java.util.concurrent.atomic.AtomicReference;
174+
import java.util.concurrent.locks.LockSupport;
173175
import java.util.function.BiFunction;
174176
import java.util.function.Function;
175177
import java.util.function.IntSupplier;
@@ -7035,6 +7037,50 @@ public void testIndexThrottling() throws Exception {
70357037
verify(indexWithoutThrottlingCheck, atLeastOnce()).startTime();
70367038
}
70377039

7040+
/* Test that indexing is paused during throttling using the PAUSE_INDEXING_ON_THROTTLE setting is on.
7041+
* The test tries to index a document into a shard for which indexing is throttled. It is unable to
7042+
* do so until throttling is disabled.
7043+
* Indexing proceeds as usual once the shard throttle is deactivated.
7044+
*/
7045+
public void testIndexThrottlingWithPause() throws Exception {
7046+
Settings.Builder settings = Settings.builder()
7047+
.put(defaultSettings.getSettings())
7048+
.put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true);
7049+
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
7050+
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
7051+
try (
7052+
Store store = createStore();
7053+
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))
7054+
) {
7055+
final List<DocIdSeqNoAndSource> prevDocs;
7056+
final Engine.Index indexWithThrottlingCheck = indexForDoc(createParsedDoc("1", null));
7057+
final Engine.Index indexWithoutThrottlingCheck = indexForDoc(createParsedDoc("2", null));
7058+
prevDocs = getDocIds(engine, true);
7059+
assertThat(prevDocs.size(), equalTo(0));
7060+
Thread indexWithThrottle = new Thread(() -> {
7061+
try {
7062+
engine.index(indexWithThrottlingCheck);
7063+
} catch (IOException e) {
7064+
throw new RuntimeException(e);
7065+
}
7066+
});
7067+
7068+
// Activate throttling (this will pause indexing)
7069+
engine.activateThrottling();
7070+
assertTrue(engine.isThrottled());
7071+
indexWithThrottle.start();
7072+
// Wait for the thread to complete, it will not complete because of the pause
7073+
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
7074+
assertThat(getDocIds(engine, true).size(), equalTo(0));
7075+
// Deactivate to allow the indexing thread to proceed
7076+
engine.deactivateThrottling();
7077+
indexWithThrottle.join();
7078+
assertThat(getDocIds(engine, true).size(), equalTo(1));
7079+
engine.index(indexWithoutThrottlingCheck);
7080+
assertThat(getDocIds(engine, true).size(), equalTo(2));
7081+
}
7082+
}
7083+
70387084
public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception {
70397085
MapperService mapperService = createMapperService();
70407086
final AtomicInteger refreshCount = new AtomicInteger();

0 commit comments

Comments
 (0)