From 60ed208133ccfda08076c3486a54613a5fc65b06 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Fri, 23 May 2025 15:23:51 -0400 Subject: [PATCH 01/10] Change indexing pause throttling implementation --- muted-tests.yml | 6 +- .../elasticsearch/index/engine/Engine.java | 64 ++++++++++++++++--- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index d91fca8023c99..7e179a6c61c8d 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -444,9 +444,9 @@ tests: - class: org.elasticsearch.xpack.esql.qa.mixed.EsqlClientYamlIT method: test {p0=esql/120_profile/avg 8.14 or after} issue: https://github.com/elastic/elasticsearch/issues/127879 -- class: org.elasticsearch.indices.stats.IndexStatsIT - method: testThrottleStats - issue: https://github.com/elastic/elasticsearch/issues/126359 +#- class: org.elasticsearch.indices.stats.IndexStatsIT +# method: testThrottleStats +# issue: https://github.com/elastic/elasticsearch/issues/126359 - class: org.elasticsearch.search.vectors.IVFKnnFloatVectorQueryTests method: testRandomWithFilter issue: https://github.com/elastic/elasticsearch/issues/127963 diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index de8bfd0e3e61c..e7c4a7abc551b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -455,32 +455,80 @@ protected static final class IndexThrottle { private final CounterMetric throttleTimeMillisMetric = new CounterMetric(); private volatile long startOfThrottleNS; private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock()); - private final PauseLock throttlingLock; - private final ReleasableLock lockReference; + private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock()); + private final Lock pauseIndexingLock = new ReentrantLock(); + private final Condition pauseCondition = pauseIndexingLock.newCondition(); + private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); + private volatile AtomicBoolean pauseIndexing = new AtomicBoolean(); + private final boolean pauseWhenThrottled; + // private final PauseLock throttlingLock; + // private final ReleasableLock lockReference; private volatile ReleasableLock lock = NOOP_LOCK; public IndexThrottle(boolean pause) { - throttlingLock = new PauseLock(pause ? 0 : 1); - lockReference = new ReleasableLock(throttlingLock); + pauseWhenThrottled = pause; + // throttlingLock = new PauseLock(pause ? 0 : 1); + // lockReference = new ReleasableLock(throttlingLock); } public Releasable acquireThrottle() { - return lock.acquire(); + if (lock == pauseLockReference) { + pauseLockReference.acquire(); + try { + while (pauseIndexing.getAcquire()) { + // System.out.println("Waiting on pause indexing lock"); + logger.trace("Waiting on pause indexing lock"); + pauseCondition.await(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + // System.out.println("Acquired pause indexing lock"); + logger.trace("Acquired pause indexing lock"); + } + return pauseLockReference; + } else { + return lock.acquire(); + } + + // return lock.acquire(); + } /** Activate throttling, which switches the lock to be a real lock */ public void activate() { assert lock == NOOP_LOCK : "throttling activated while already active"; startOfThrottleNS = System.nanoTime(); - throttlingLock.throttle(); - lock = lockReference; + // throttlingLock.throttle(); + // lock = lockReference; + if (pauseWhenThrottled) { + pauseIndexing.setRelease(true); + lock = pauseLockReference; + } else { + lock = lockReference; + } } /** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */ public void deactivate() { assert lock != NOOP_LOCK : "throttling deactivated but not active"; - throttlingLock.unthrottle(); + // throttlingLock.unthrottle(); + + if (lock == pauseLockReference) { + logger.trace("Deactivate index throttling pause"); + + // Signal the threads that are waiting on pauseCondition + pauseLockReference.acquire(); + try { + // System.out.println("Deactivate pause"); + pauseIndexing.setRelease(false); + pauseCondition.signalAll(); + } finally { + pauseLockReference.close(); + } + } lock = NOOP_LOCK; assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS"; From 6d4202eed1fd2d6d35c03551eddb97ef04d62130 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Fri, 23 May 2025 15:41:41 -0400 Subject: [PATCH 02/10] Fix check in afterMerge() --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bd1c5c26bb450..8f2ee9b957c0c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2967,7 +2967,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) { @Override public synchronized void afterMerge(OnGoingMerge merge) { int maxNumMerges = getMaxMergeCount(); - if (numMergesInFlight.decrementAndGet() < maxNumMerges) { + if (numMergesInFlight.decrementAndGet() <= maxNumMerges) { if (isThrottling.getAndSet(false)) { logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); deactivateThrottling(); From aa4d58a5afd05cc4956799b799e64b7461a68e0e Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Tue, 27 May 2025 18:34:49 -0400 Subject: [PATCH 03/10] Update docs/changelog/128405.yaml --- docs/changelog/128405.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/128405.yaml diff --git a/docs/changelog/128405.yaml b/docs/changelog/128405.yaml new file mode 100644 index 0000000000000..aefa068814747 --- /dev/null +++ b/docs/changelog/128405.yaml @@ -0,0 +1,5 @@ +pr: 128405 +summary: Modify the mechanism to pause indexing +area: Distributed +type: bug +issues: [] From ff014b84ef33fc47280241fe800c57117c0c1c3f Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Tue, 27 May 2025 18:37:27 -0400 Subject: [PATCH 04/10] Add test --- .../elasticsearch/index/engine/Engine.java | 9 ---- .../index/engine/InternalEngineTests.java | 46 +++++++++++++++++++ 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e7c4a7abc551b..b5b14e2c39861 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -461,14 +461,10 @@ protected static final class IndexThrottle { private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); private volatile AtomicBoolean pauseIndexing = new AtomicBoolean(); private final boolean pauseWhenThrottled; - // private final PauseLock throttlingLock; - // private final ReleasableLock lockReference; private volatile ReleasableLock lock = NOOP_LOCK; public IndexThrottle(boolean pause) { pauseWhenThrottled = pause; - // throttlingLock = new PauseLock(pause ? 0 : 1); - // lockReference = new ReleasableLock(throttlingLock); } public Releasable acquireThrottle() { @@ -491,17 +487,12 @@ public Releasable acquireThrottle() { } else { return lock.acquire(); } - - // return lock.acquire(); - } /** Activate throttling, which switches the lock to be a real lock */ public void activate() { assert lock == NOOP_LOCK : "throttling activated while already active"; startOfThrottleNS = System.nanoTime(); - // throttlingLock.throttle(); - // lock = lockReference; if (pauseWhenThrottled) { pauseIndexing.setRelease(true); lock = pauseLockReference; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index bc63ef763ec57..d84c29e946826 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -130,6 +130,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.index.translog.TranslogOperationsUtils; +import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; @@ -169,6 +170,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.IntSupplier; @@ -7015,6 +7017,50 @@ public void testIndexThrottling() throws Exception { verify(indexWithoutThrottlingCheck, atLeastOnce()).startTime(); } + /* Test that indexing is paused during throttling using the PAUSE_INDEXING_ON_THROTTLE setting is on. + * The test tries to index a document into a shard for which indexing is throttled. It is unable to + * do so until throttling is disabled. + * Indexing proceeds as usual once the shard throttle is deactivated. + */ + public void testIndexThrottlingWithPause() throws Exception { + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + try ( + Store store = createStore(); + InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null)) + ) { + final List prevDocs; + final Engine.Index indexWithThrottlingCheck = indexForDoc(createParsedDoc("1", null)); + final Engine.Index indexWithoutThrottlingCheck = indexForDoc(createParsedDoc("2", null)); + prevDocs = getDocIds(engine, true); + assertThat(prevDocs.size(), equalTo(0)); + Thread indexWithThrottle = new Thread(() -> { + try { + engine.index(indexWithThrottlingCheck); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + // Activate throttling (this will pause indexing) + engine.activateThrottling(); + assertTrue(engine.isThrottled()); + indexWithThrottle.start(); + // Wait for the thread to complete, it will not complete because of the pause + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500)); + assertThat(getDocIds(engine, true).size(), equalTo(0)); + // Deactivate to allow the indexing thread to proceed + engine.deactivateThrottling(); + indexWithThrottle.join(); + assertThat(getDocIds(engine, true).size(), equalTo(1)); + engine.index(indexWithoutThrottlingCheck); + assertThat(getDocIds(engine, true).size(), equalTo(2)); + } + } + public void testRealtimeGetOnlyRefreshIfNeeded() throws Exception { MapperService mapperService = createMapperService(); final AtomicInteger refreshCount = new AtomicInteger(); From 50129f3b962378a9b062e6371c2d81c7cbecb0ef Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Tue, 27 May 2025 18:41:36 -0400 Subject: [PATCH 05/10] remove comments --- .../src/main/java/org/elasticsearch/index/engine/Engine.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b5b14e2c39861..2ffbdc08391ca 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -472,7 +472,6 @@ public Releasable acquireThrottle() { pauseLockReference.acquire(); try { while (pauseIndexing.getAcquire()) { - // System.out.println("Waiting on pause indexing lock"); logger.trace("Waiting on pause indexing lock"); pauseCondition.await(); } @@ -480,7 +479,6 @@ public Releasable acquireThrottle() { Thread.currentThread().interrupt(); throw new RuntimeException(e); } finally { - // System.out.println("Acquired pause indexing lock"); logger.trace("Acquired pause indexing lock"); } return pauseLockReference; @@ -505,15 +503,12 @@ public void activate() { public void deactivate() { assert lock != NOOP_LOCK : "throttling deactivated but not active"; - // throttlingLock.unthrottle(); - if (lock == pauseLockReference) { logger.trace("Deactivate index throttling pause"); // Signal the threads that are waiting on pauseCondition pauseLockReference.acquire(); try { - // System.out.println("Deactivate pause"); pauseIndexing.setRelease(false); pauseCondition.signalAll(); } finally { From 0fabe841a10de343b7dfd7d90da955cb0c386aee Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 28 May 2025 20:58:55 -0400 Subject: [PATCH 06/10] review comments --- .../elasticsearch/index/engine/Engine.java | 46 ++++++++++++++----- .../index/engine/InternalEngine.java | 24 ++++++---- 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 2ffbdc08391ca..0e09581fa8738 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -459,7 +459,8 @@ protected static final class IndexThrottle { private final Lock pauseIndexingLock = new ReentrantLock(); private final Condition pauseCondition = pauseIndexingLock.newCondition(); private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); - private volatile AtomicBoolean pauseIndexing = new AtomicBoolean(); + // private volatile AtomicBoolean pauseIndexing = new AtomicBoolean(); + private volatile AtomicBoolean pauseThrottling = new AtomicBoolean(); private final boolean pauseWhenThrottled; private volatile ReleasableLock lock = NOOP_LOCK; @@ -471,7 +472,9 @@ public Releasable acquireThrottle() { if (lock == pauseLockReference) { pauseLockReference.acquire(); try { - while (pauseIndexing.getAcquire()) { + // while (pauseIndexing.getAcquire()) { + // If throttling is activate and not temporarily paused + while ((lock == pauseLockReference) && (pauseThrottling.getAcquire() == false)) { logger.trace("Waiting on pause indexing lock"); pauseCondition.await(); } @@ -490,10 +493,11 @@ public Releasable acquireThrottle() { /** Activate throttling, which switches the lock to be a real lock */ public void activate() { assert lock == NOOP_LOCK : "throttling activated while already active"; + startOfThrottleNS = System.nanoTime(); if (pauseWhenThrottled) { - pauseIndexing.setRelease(true); lock = pauseLockReference; + logger.trace("Activated index throttling pause"); } else { lock = lockReference; } @@ -503,19 +507,15 @@ public void activate() { public void deactivate() { assert lock != NOOP_LOCK : "throttling deactivated but not active"; - if (lock == pauseLockReference) { - logger.trace("Deactivate index throttling pause"); - + lock = NOOP_LOCK; + if (pauseWhenThrottled) { // Signal the threads that are waiting on pauseCondition - pauseLockReference.acquire(); - try { - pauseIndexing.setRelease(false); + try (Releasable releasableLock = pauseLockReference.acquire()) { + // pauseIndexing.setRelease(false); pauseCondition.signalAll(); - } finally { - pauseLockReference.close(); } + logger.trace("Deactivated index throttling pause"); } - lock = NOOP_LOCK; assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS"; long throttleTimeNS = System.nanoTime() - startOfThrottleNS; @@ -542,6 +542,28 @@ boolean isThrottled() { return lock != NOOP_LOCK; } + // Pause throttling to allow another task such as relocation to acquire all indexing permits + public void pauseThrottle() { + if (pauseWhenThrottled) { + try (Releasable releasableLock = pauseLockReference.acquire()) { + // pauseIndexing.setRelease(false); + pauseThrottling.setRelease(true); + pauseCondition.signalAll(); + } + } + } + + // Reverse what was done in pauseThrottle() + public void unpauseThrottle() { + if (pauseWhenThrottled) { + try (Releasable releasableLock = pauseLockReference.acquire()) { + // pauseIndexing.setRelease(true); + pauseThrottling.setRelease(false); + pauseCondition.signalAll(); + } + } + } + boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only if (isThrottled()) { return lock.isHeldByCurrentThread(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8f2ee9b957c0c..88552664f44d9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2823,19 +2823,27 @@ public void accept(ElasticsearchDirectoryReader reader, ElasticsearchDirectoryRe @Override public void activateThrottling() { - int count = throttleRequestCount.incrementAndGet(); - assert count >= 1 : "invalid post-increment throttleRequestCount=" + count; - if (count == 1) { - throttle.activate(); + // Synchronize on throttleRequestCount to make activateThrottling and deactivateThrottling + // atomic w.r.t each other + synchronized (throttleRequestCount) { + int count = throttleRequestCount.incrementAndGet(); + assert count >= 1 : "invalid post-increment throttleRequestCount=" + count; + if (count == 1) { + throttle.activate(); + } } } @Override public void deactivateThrottling() { - int count = throttleRequestCount.decrementAndGet(); - assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count; - if (count == 0) { - throttle.deactivate(); + // Synchronize on throttleRequestCount to make activateThrottling and deactivateThrottling + // atomic w.r.t each other + synchronized (throttleRequestCount) { + int count = throttleRequestCount.decrementAndGet(); + assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count; + if (count == 0) { + throttle.deactivate(); + } } } From 72b3b753ed4ea17f6ac816bb4b6744a2dc64a149 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 28 May 2025 21:35:12 -0400 Subject: [PATCH 07/10] remove unused code --- .../src/main/java/org/elasticsearch/index/engine/Engine.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0e09581fa8738..1caa21b7e0948 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -459,7 +459,6 @@ protected static final class IndexThrottle { private final Lock pauseIndexingLock = new ReentrantLock(); private final Condition pauseCondition = pauseIndexingLock.newCondition(); private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); - // private volatile AtomicBoolean pauseIndexing = new AtomicBoolean(); private volatile AtomicBoolean pauseThrottling = new AtomicBoolean(); private final boolean pauseWhenThrottled; private volatile ReleasableLock lock = NOOP_LOCK; @@ -472,7 +471,6 @@ public Releasable acquireThrottle() { if (lock == pauseLockReference) { pauseLockReference.acquire(); try { - // while (pauseIndexing.getAcquire()) { // If throttling is activate and not temporarily paused while ((lock == pauseLockReference) && (pauseThrottling.getAcquire() == false)) { logger.trace("Waiting on pause indexing lock"); @@ -511,7 +509,6 @@ public void deactivate() { if (pauseWhenThrottled) { // Signal the threads that are waiting on pauseCondition try (Releasable releasableLock = pauseLockReference.acquire()) { - // pauseIndexing.setRelease(false); pauseCondition.signalAll(); } logger.trace("Deactivated index throttling pause"); @@ -546,7 +543,6 @@ boolean isThrottled() { public void pauseThrottle() { if (pauseWhenThrottled) { try (Releasable releasableLock = pauseLockReference.acquire()) { - // pauseIndexing.setRelease(false); pauseThrottling.setRelease(true); pauseCondition.signalAll(); } @@ -557,7 +553,6 @@ public void pauseThrottle() { public void unpauseThrottle() { if (pauseWhenThrottled) { try (Releasable releasableLock = pauseLockReference.acquire()) { - // pauseIndexing.setRelease(true); pauseThrottling.setRelease(false); pauseCondition.signalAll(); } From a88680bd9059ae258fdc4d85c33a5d6727e85edc Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Fri, 30 May 2025 09:24:19 -0400 Subject: [PATCH 08/10] review comments --- .../elasticsearch/index/engine/Engine.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 1caa21b7e0948..3d84078b7cbaf 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -459,7 +459,7 @@ protected static final class IndexThrottle { private final Lock pauseIndexingLock = new ReentrantLock(); private final Condition pauseCondition = pauseIndexingLock.newCondition(); private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); - private volatile AtomicBoolean pauseThrottling = new AtomicBoolean(); + private volatile AtomicBoolean suspendThrottling = new AtomicBoolean(); private final boolean pauseWhenThrottled; private volatile ReleasableLock lock = NOOP_LOCK; @@ -468,11 +468,11 @@ public IndexThrottle(boolean pause) { } public Releasable acquireThrottle() { - if (lock == pauseLockReference) { - pauseLockReference.acquire(); - try { - // If throttling is activate and not temporarily paused - while ((lock == pauseLockReference) && (pauseThrottling.getAcquire() == false)) { + var lockCopy = this.lock; + if (lockCopy == pauseLockReference) { + try (var ignored = pauseLockReference.acquire()) { + // If pause throttling is activated and not temporarily suspended + while ((lock == pauseLockReference) && (suspendThrottling.getAcquire() == false)) { logger.trace("Waiting on pause indexing lock"); pauseCondition.await(); } @@ -482,9 +482,9 @@ public Releasable acquireThrottle() { } finally { logger.trace("Acquired pause indexing lock"); } - return pauseLockReference; + return (() -> {}); } else { - return lock.acquire(); + return lockCopy.acquire(); } } @@ -540,20 +540,20 @@ boolean isThrottled() { } // Pause throttling to allow another task such as relocation to acquire all indexing permits - public void pauseThrottle() { + public void suspendThrottle() { if (pauseWhenThrottled) { try (Releasable releasableLock = pauseLockReference.acquire()) { - pauseThrottling.setRelease(true); + suspendThrottling.setRelease(true); pauseCondition.signalAll(); } } } // Reverse what was done in pauseThrottle() - public void unpauseThrottle() { + public void resumeThrottle() { if (pauseWhenThrottled) { try (Releasable releasableLock = pauseLockReference.acquire()) { - pauseThrottling.setRelease(false); + suspendThrottling.setRelease(false); pauseCondition.signalAll(); } } From 4755af4c156e6f5ea0002de6f6c8f23e210251e5 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Fri, 30 May 2025 11:34:05 -0400 Subject: [PATCH 09/10] code cleanup --- .../elasticsearch/index/engine/Engine.java | 60 ++----------------- 1 file changed, 5 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 3d84078b7cbaf..4b2aad507984b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -455,12 +455,14 @@ protected static final class IndexThrottle { private final CounterMetric throttleTimeMillisMetric = new CounterMetric(); private volatile long startOfThrottleNS; private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock()); + // This lock throttles indexing to 1 thread (per shard) private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock()); + // This lock pauses indexing completely (on a per shard basis) private final Lock pauseIndexingLock = new ReentrantLock(); private final Condition pauseCondition = pauseIndexingLock.newCondition(); private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); private volatile AtomicBoolean suspendThrottling = new AtomicBoolean(); - private final boolean pauseWhenThrottled; + private final boolean pauseWhenThrottled; // Should throttling pause indexing ? private volatile ReleasableLock lock = NOOP_LOCK; public IndexThrottle(boolean pause) { @@ -539,7 +541,7 @@ boolean isThrottled() { return lock != NOOP_LOCK; } - // Pause throttling to allow another task such as relocation to acquire all indexing permits + /** Suspend throttling to allow another task such as relocation to acquire all indexing permits */ public void suspendThrottle() { if (pauseWhenThrottled) { try (Releasable releasableLock = pauseLockReference.acquire()) { @@ -549,7 +551,7 @@ public void suspendThrottle() { } } - // Reverse what was done in pauseThrottle() + /** Reverse what was done in {@link #suspendThrottle()} */ public void resumeThrottle() { if (pauseWhenThrottled) { try (Releasable releasableLock = pauseLockReference.acquire()) { @@ -621,58 +623,6 @@ public Condition newCondition() { } } - /* A lock implementation that allows us to control how many threads can take the lock - * In particular, this is used to set the number of allowed threads to 1 or 0 - * when index throttling is activated. - */ - protected static final class PauseLock implements Lock { - private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); - private final int allowThreads; - - public PauseLock(int allowThreads) { - this.allowThreads = allowThreads; - } - - public void lock() { - semaphore.acquireUninterruptibly(); - } - - @Override - public void lockInterruptibly() throws InterruptedException { - semaphore.acquire(); - } - - @Override - public void unlock() { - semaphore.release(); - } - - @Override - public boolean tryLock() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public Condition newCondition() { - throw new UnsupportedOperationException(); - } - - public void throttle() { - assert semaphore.availablePermits() == Integer.MAX_VALUE; - semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads); - } - - public void unthrottle() { - assert semaphore.availablePermits() <= allowThreads; - semaphore.release(Integer.MAX_VALUE - allowThreads); - } - } - /** * Perform document index operation on the engine * @param index operation to perform From f3d91a9e727c9a216a248b1318ed3d3c44bc0dae Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 30 May 2025 15:46:54 +0000 Subject: [PATCH 10/10] [CI] Auto commit changes from spotless --- server/src/main/java/org/elasticsearch/index/engine/Engine.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 4b2aad507984b..2730c5523dbac 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -110,7 +110,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition;