From dfe639f45e5e8e7ce1e9da6530ab73c7b41330d8 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Fri, 23 May 2025 13:59:27 -0400 Subject: [PATCH 01/26] pause indexing and race condition diags --- muted-tests.yml | 6 +-- .../elasticsearch/index/engine/Engine.java | 51 ++++++++++++++++++- .../index/engine/InternalEngine.java | 41 +++++++++++---- .../index/engine/ReadOnlyEngine.java | 6 +++ .../engine/ThreadPoolMergeScheduler.java | 3 +- .../elasticsearch/index/shard/IndexShard.java | 13 +++++ .../shard/IndexShardOperationPermits.java | 5 ++ 7 files changed, 110 insertions(+), 15 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index d91fca8023c99..7e179a6c61c8d 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -444,9 +444,9 @@ tests: - class: org.elasticsearch.xpack.esql.qa.mixed.EsqlClientYamlIT method: test {p0=esql/120_profile/avg 8.14 or after} issue: https://github.com/elastic/elasticsearch/issues/127879 -- class: org.elasticsearch.indices.stats.IndexStatsIT - method: testThrottleStats - issue: https://github.com/elastic/elasticsearch/issues/126359 +#- class: org.elasticsearch.indices.stats.IndexStatsIT +# method: testThrottleStats +# issue: https://github.com/elastic/elasticsearch/issues/126359 - class: org.elasticsearch.search.vectors.IVFKnnFloatVectorQueryTests method: testRandomWithFilter issue: https://github.com/elastic/elasticsearch/issues/127963 diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index de8bfd0e3e61c..8f06fd1b4cbd6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -504,6 +504,17 @@ long getThrottleTimeInMillis() { return throttleTimeMillisMetric.count() + TimeValue.nsecToMSec(currentThrottleNS); } + void pauseThrottle() { + if (isThrottled()) { + throttlingLock.pauseThrottle(); + } + // TODO: calculate pause time + } + + void unpauseThrottle() { + throttlingLock.unPauseThrottle(); + } + boolean isThrottled() { return lock != NOOP_LOCK; } @@ -577,9 +588,13 @@ public Condition newCondition() { protected static final class PauseLock implements Lock { private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); private final int allowThreads; + // If the lock is currently throttled, we might need to pause throttling + // to let some threads pass when another task is trying to acquire indexing permits + private volatile AtomicBoolean throttlePaused; public PauseLock(int allowThreads) { this.allowThreads = allowThreads; + throttlePaused = new AtomicBoolean(false); } public void lock() { @@ -612,14 +627,34 @@ public Condition newCondition() { } public void throttle() { - assert semaphore.availablePermits() == Integer.MAX_VALUE; + if(throttlePaused.get()) { + return; + } + assert semaphore.availablePermits() == Integer.MAX_VALUE : "Available permits should be " + Integer.MAX_VALUE + + " but is " + semaphore.availablePermits(); semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads); } public void unthrottle() { - assert semaphore.availablePermits() <= allowThreads; + unPauseThrottle(); + assert semaphore.availablePermits() <= allowThreads : "Available permits should be no greater than " + allowThreads + + " but is " + semaphore.availablePermits(); semaphore.release(Integer.MAX_VALUE - allowThreads); } + + // Pause throttling by allowing 1 thread to pass + public void pauseThrottle() { + // we only need to pause throttling if throttling allows no threads to pass i.e. allowThreads is 0 + if((allowThreads == 0) && throttlePaused.getAndSet(true) == false) { + semaphore.release(1); + } + } + + public void unPauseThrottle() { + if(throttlePaused.getAndSet(false)) { + semaphore.acquireUninterruptibly(1); + } + } } /** @@ -2259,6 +2294,18 @@ public interface Warmer { */ public abstract void deactivateThrottling(); + /** + * If indexing is throttled to the point where it is paused completely, + * another task trying to get indexing permits might want to pause throttling + * by letting one thread pass at a time so that it does not get starved. + */ + public abstract void pauseThrottling(); + + /** + * Reverses a previous {@link #pauseThrottling} call. + */ + public abstract void unPauseThrottling(); + /** * This method replays translog to restore the Lucene index which might be reverted previously. * This ensures that all acknowledged writes are restored correctly when this engine is promoted. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index bd1c5c26bb450..2426214a1f3f9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2827,6 +2827,7 @@ public void activateThrottling() { assert count >= 1 : "invalid post-increment throttleRequestCount=" + count; if (count == 1) { throttle.activate(); + logger.info("Activated throttling"); } } @@ -2836,9 +2837,20 @@ public void deactivateThrottling() { assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count; if (count == 0) { throttle.deactivate(); + logger.info("Deactivated throttling"); } } + @Override + public void pauseThrottling() { + throttle.pauseThrottle(); + } + + @Override + public void unPauseThrottling() { + throttle.unpauseThrottle(); + } + @Override public boolean isThrottled() { return throttle.isThrottled(); @@ -2920,7 +2932,9 @@ protected synchronized void enableIndexingThrottling(int numRunningMerges, int n numQueuedMerges, configuredMaxMergeCount ); - InternalEngine.this.activateThrottling(); + // System.out.println("Activate throttling enableIndexingThrottling"); + InternalEngine.this.activateThrottling(); + } @Override @@ -2931,7 +2945,9 @@ protected synchronized void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, configuredMaxMergeCount ); + // System.out.println("DeActivate throttling disableIndexingThrottling"); InternalEngine.this.deactivateThrottling(); + } @Override @@ -2956,10 +2972,13 @@ private final class EngineConcurrentMergeScheduler extends ElasticsearchConcurre @Override public synchronized void beforeMerge(OnGoingMerge merge) { int maxNumMerges = getMaxMergeCount(); - if (numMergesInFlight.incrementAndGet() > maxNumMerges) { - if (isThrottling.getAndSet(true) == false) { - logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - activateThrottling(); + synchronized (isThrottling) { + if (numMergesInFlight.incrementAndGet() > maxNumMerges) { + if (isThrottling.getAndSet(true) == false) { + logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); + // System.out.println("Activate throttling beforeMerge"); + activateThrottling(); + } } } } @@ -2967,10 +2986,14 @@ public synchronized void beforeMerge(OnGoingMerge merge) { @Override public synchronized void afterMerge(OnGoingMerge merge) { int maxNumMerges = getMaxMergeCount(); - if (numMergesInFlight.decrementAndGet() < maxNumMerges) { - if (isThrottling.getAndSet(false)) { - logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); - deactivateThrottling(); + synchronized (isThrottling) { + if (numMergesInFlight.decrementAndGet() <= maxNumMerges) { + if (isThrottling.getAndSet(false)) { + logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges); + // System.out.println("DeActivate throttling afterMerge"); + deactivateThrottling(); + } + } } maybeFlushAfterMerge(merge); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 63a4696ddb08e..f3c43ff2ae009 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -507,6 +507,12 @@ public void activateThrottling() {} @Override public void deactivateThrottling() {} + @Override + public void pauseThrottling() {} + + @Override + public void unPauseThrottling() {} + @Override public void trimUnreferencedTranslogFiles() {} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index e54c8164c6ab5..40586b70bfb40 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -189,13 +189,14 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg ); } - private void checkMergeTaskThrottling() { + private synchronized void checkMergeTaskThrottling() { long submittedMergesCount = submittedMergeTaskCount.get(); long doneMergesCount = doneMergeTaskCount.get(); int runningMergesCount = runningMergeTasks.size(); int configuredMaxMergeCount = config.getMaxMergeCount(); // both currently running and enqueued merge tasks are considered "active" for throttling purposes int activeMerges = (int) (submittedMergesCount - doneMergesCount); + if (activeMerges > configuredMaxMergeCount // only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4b38a5d378ecf..7c2e8c7619122 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2752,6 +2752,19 @@ public void deactivateThrottling() { } } + public void pauseThrottling() { + Engine engine = getEngineOrNull(); + final boolean throttled; + if (engine == null) { + throttled = false; + } else { + throttled = engine.isThrottled(); + } + if (throttled) { + engine.pauseThrottling(); + } + } + private void handleRefreshException(Exception e) { if (e instanceof AlreadyClosedException) { // ignore diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 0427e9c99ea35..9107ee6bd2139 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -85,8 +85,13 @@ public void blockOperations( final long timeout, final TimeUnit timeUnit, final Executor executor + //@Nullable IndexShard indexShard ) { delayOperations(); + // If indexing is paused on the shard, unpause it so that any currently waiting task can + // go ahead and release the indexing permit it holds. + //if(indexShard && indexShard) + waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); } From 2601960e27a59a645808cbc9457502d981fedb7b Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Fri, 30 May 2025 08:58:12 -0400 Subject: [PATCH 02/26] commit --- .../elasticsearch/index/engine/Engine.java | 90 ++++++++++++++----- .../index/engine/InternalEngine.java | 7 +- .../index/engine/ReadOnlyEngine.java | 5 ++ .../elasticsearch/index/shard/IndexShard.java | 32 +++++-- .../shard/IndexShardOperationPermits.java | 10 ++- .../IndexShardOperationPermitsTests.java | 23 ++--- 6 files changed, 122 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8f06fd1b4cbd6..3fab9c64b4cca 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -455,33 +455,66 @@ protected static final class IndexThrottle { private final CounterMetric throttleTimeMillisMetric = new CounterMetric(); private volatile long startOfThrottleNS; private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock()); - private final PauseLock throttlingLock; - private final ReleasableLock lockReference; + // private final PauseLock throttlingLock; + // private final ReleasableLock lockReference; + private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock()); + private final Lock pauseIndexingLock = new ReentrantLock(); + private final Condition pauseCondition = pauseIndexingLock.newCondition(); + private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); + private volatile AtomicBoolean pauseThrottling = new AtomicBoolean(); + private final boolean pauseWhenThrottled; private volatile ReleasableLock lock = NOOP_LOCK; public IndexThrottle(boolean pause) { - throttlingLock = new PauseLock(pause ? 0 : 1); - lockReference = new ReleasableLock(throttlingLock); + pauseWhenThrottled = pause; } public Releasable acquireThrottle() { - return lock.acquire(); + var lockCopy = this.lock; + if (lockCopy == pauseLockReference) { + //try (pauseLockReference.acquire();) + try (var ignored = pauseLockReference.acquire()) { + // If throttling is activated and not temporarily paused + while ((lock == pauseLockReference) && (pauseThrottling.getAcquire() == false)) { + logger.trace("Waiting on pause indexing lock"); + pauseCondition.await(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + logger.trace("Acquired pause indexing lock"); + } + return (() -> {}); + } else { + return lockCopy.acquire(); + } } /** Activate throttling, which switches the lock to be a real lock */ public void activate() { assert lock == NOOP_LOCK : "throttling activated while already active"; startOfThrottleNS = System.nanoTime(); - throttlingLock.throttle(); - lock = lockReference; + if (pauseWhenThrottled) { + lock = pauseLockReference; + logger.trace("Activated index throttling pause"); + } else { + lock = lockReference; + } } /** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */ public void deactivate() { assert lock != NOOP_LOCK : "throttling deactivated but not active"; - throttlingLock.unthrottle(); lock = NOOP_LOCK; + if (pauseWhenThrottled) { + // Signal the threads that are waiting on pauseCondition + try (Releasable releasableLock = pauseLockReference.acquire()) { + pauseCondition.signalAll(); + } + logger.trace("Deactivated index throttling pause"); + } assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS"; long throttleTimeNS = System.nanoTime() - startOfThrottleNS; @@ -504,21 +537,34 @@ long getThrottleTimeInMillis() { return throttleTimeMillisMetric.count() + TimeValue.nsecToMSec(currentThrottleNS); } - void pauseThrottle() { - if (isThrottled()) { - throttlingLock.pauseThrottle(); + // Pause throttling to allow another task such as relocation to acquire all indexing permits + public void suspendThrottle() { + if (pauseWhenThrottled) { + try (Releasable releasableLock = pauseLockReference.acquire()) { + pauseThrottling.setRelease(true); + pauseCondition.signalAll(); + } } - // TODO: calculate pause time } - void unpauseThrottle() { - throttlingLock.unPauseThrottle(); + // Reverse what was done in pauseThrottle() + public void resumeThrottle() { + if (pauseWhenThrottled) { + try (Releasable releasableLock = pauseLockReference.acquire()) { + pauseThrottling.setRelease(false); + pauseCondition.signalAll(); + } + } } boolean isThrottled() { return lock != NOOP_LOCK; } + boolean isIndexingPaused() { + return (pauseWhenThrottled && isThrottled()); + } + boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only if (isThrottled()) { return lock.isHeldByCurrentThread(); @@ -627,31 +673,31 @@ public Condition newCondition() { } public void throttle() { - if(throttlePaused.get()) { + if (throttlePaused.get()) { return; } - assert semaphore.availablePermits() == Integer.MAX_VALUE : "Available permits should be " + Integer.MAX_VALUE - + " but is " + semaphore.availablePermits(); + assert semaphore.availablePermits() == Integer.MAX_VALUE + : "Available permits should be " + Integer.MAX_VALUE + " but is " + semaphore.availablePermits(); semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads); } public void unthrottle() { unPauseThrottle(); - assert semaphore.availablePermits() <= allowThreads : "Available permits should be no greater than " + allowThreads - + " but is " + semaphore.availablePermits(); + assert semaphore.availablePermits() <= allowThreads + : "Available permits should be no greater than " + allowThreads + " but is " + semaphore.availablePermits(); semaphore.release(Integer.MAX_VALUE - allowThreads); } // Pause throttling by allowing 1 thread to pass public void pauseThrottle() { // we only need to pause throttling if throttling allows no threads to pass i.e. allowThreads is 0 - if((allowThreads == 0) && throttlePaused.getAndSet(true) == false) { + if ((allowThreads == 0) && throttlePaused.getAndSet(true) == false) { semaphore.release(1); } } public void unPauseThrottle() { - if(throttlePaused.getAndSet(false)) { + if (throttlePaused.getAndSet(false)) { semaphore.acquireUninterruptibly(1); } } @@ -2306,6 +2352,8 @@ public interface Warmer { */ public abstract void unPauseThrottling(); + public abstract boolean isIndexingPaused(); + /** * This method replays translog to restore the Lucene index which might be reverted previously. * This ensures that all acknowledged writes are restored correctly when this engine is promoted. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2426214a1f3f9..45f3f5af9523a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2856,6 +2856,11 @@ public boolean isThrottled() { return throttle.isThrottled(); } + @Override + public boolean isIndexingPaused() { + return throttle.isIndexingPaused(); + } + boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only return throttle.throttleLockIsHeldByCurrentThread(); } @@ -2933,7 +2938,7 @@ protected synchronized void enableIndexingThrottling(int numRunningMerges, int n configuredMaxMergeCount ); // System.out.println("Activate throttling enableIndexingThrottling"); - InternalEngine.this.activateThrottling(); + InternalEngine.this.activateThrottling(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index f3c43ff2ae009..a308040cc8ed6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -513,6 +513,11 @@ public void pauseThrottling() {} @Override public void unPauseThrottling() {} + @Override + public boolean isIndexingPaused() { + return (false); + } + @Override public void trimUnreferencedTranslogFiles() {} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7c2e8c7619122..52acf312b5a94 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -879,8 +879,14 @@ public void onFailure(Exception e) { listener.onFailure(e); } } - }, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by - // CancellableThreads and we want to be able to interrupt it + }, + 30L, + TimeUnit.MINUTES, + // Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it + EsExecutors.DIRECT_EXECUTOR_SERVICE, + this + ); + } } @@ -2752,16 +2758,26 @@ public void deactivateThrottling() { } } - public void pauseThrottling() { + public boolean pauseThrottling() { Engine engine = getEngineOrNull(); - final boolean throttled; + final boolean indexingPaused; if (engine == null) { - throttled = false; + indexingPaused = false; } else { - throttled = engine.isThrottled(); + indexingPaused = engine.isIndexingPaused(); } - if (throttled) { + if (indexingPaused) { engine.pauseThrottling(); + return (true); + } + return (false); + } + + public void unpauseThrottling() { + try { + getEngine().unPauseThrottling(); + } catch (AlreadyClosedException ex) { + // ignore } } @@ -3824,7 +3840,7 @@ private void asyncBlockOperations(ActionListener onPermitAcquired, l onPermitAcquired.onFailure(e); }); try { - indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic()); + indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic(), this); } catch (Exception e) { forceRefreshes.close(); throw e; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 9107ee6bd2139..0b09a52ef0bb2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -84,15 +84,17 @@ public void blockOperations( final ActionListener onAcquired, final long timeout, final TimeUnit timeUnit, - final Executor executor - //@Nullable IndexShard indexShard + final Executor executor, + @Nullable IndexShard indexShard ) { delayOperations(); // If indexing is paused on the shard, unpause it so that any currently waiting task can // go ahead and release the indexing permit it holds. - //if(indexShard && indexShard) - + boolean throttlingPaused = indexShard.pauseThrottling(); waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); + if (throttlingPaused) { + indexShard.unpauseThrottling(); + } } private void waitUntilBlocked(ActionListener onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index cb9927be732f6..3034f392630ab 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -193,7 +193,8 @@ public void testBlockIfClosed() { wrap(() -> { throw new IllegalArgumentException("fake error"); }), randomInt(10), TimeUnit.MINUTES, - threadPool.generic() + threadPool.generic(), + null ) ); } @@ -219,7 +220,7 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce blocked.set(true); blockAcquired.countDown(); releaseBlock.await(); - }), 30, TimeUnit.MINUTES, threadPool.generic()); + }), 30, TimeUnit.MINUTES, threadPool.generic(), null); assertFalse(blocked.get()); assertFalse(future.isDone()); } @@ -313,7 +314,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } } - }, blockReleased::countDown), 1, TimeUnit.MINUTES, threadPool.generic()); + }, blockReleased::countDown), 1, TimeUnit.MINUTES, threadPool.generic(), null); blockAcquired.await(); return () -> { releaseBlock.countDown(); @@ -333,7 +334,7 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx blocked.set(true); blockAcquired.countDown(); releaseBlock.await(); - }), 30, TimeUnit.MINUTES, threadPool.generic()); + }), 30, TimeUnit.MINUTES, threadPool.generic(), null); blockAcquired.await(); assertTrue(blocked.get()); @@ -381,7 +382,7 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE permits.blockOperations(wrap(() -> { onBlocked.set(true); blockedLatch.countDown(); - }), 30, TimeUnit.MINUTES, threadPool.generic()); + }), 30, TimeUnit.MINUTES, threadPool.generic(), null); assertFalse(onBlocked.get()); // if we submit another operation, it should be delayed @@ -463,7 +464,7 @@ public void onFailure(Exception e) { permits.blockOperations(wrap(() -> { values.add(operations); operationLatch.countDown(); - }), 30, TimeUnit.MINUTES, threadPool.generic()); + }), 30, TimeUnit.MINUTES, threadPool.generic(), null); }); blockingThread.start(); @@ -540,7 +541,7 @@ public void testAsyncBlockOperationsOnRejection() { final var rejectingExecutor = threadPool.executor(REJECTING_EXECUTOR); rejectingExecutor.execute(threadBlock::actionGet); assertThat( - safeAwaitFailure(Releasable.class, l -> permits.blockOperations(l, 1, TimeUnit.HOURS, rejectingExecutor)), + safeAwaitFailure(Releasable.class, l -> permits.blockOperations(l, 1, TimeUnit.HOURS, rejectingExecutor, null)), instanceOf(EsRejectedExecutionException.class) ); @@ -553,7 +554,7 @@ public void testAsyncBlockOperationsOnRejection() { } // ensure that another block can still be acquired - try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic()))) { + try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic(), null))) { assertNotNull(block); } } @@ -568,7 +569,7 @@ public void testAsyncBlockOperationsOnTimeout() { safeAwaitFailure( ElasticsearchTimeoutException.class, Releasable.class, - f -> permits.blockOperations(f, 0, TimeUnit.SECONDS, threadPool.generic()) + f -> permits.blockOperations(f, 0, TimeUnit.SECONDS, threadPool.generic(), null) ).getMessage() ); @@ -582,7 +583,7 @@ public void testAsyncBlockOperationsOnTimeout() { } // ensure that another block can still be acquired - try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic()))) { + try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic(), null))) { assertNotNull(block); } } @@ -613,7 +614,7 @@ public void onFailure(final Exception e) { reference.set(e); onFailureLatch.countDown(); } - }, 1, TimeUnit.MILLISECONDS, threadPool.generic()); + }, 1, TimeUnit.MILLISECONDS, threadPool.generic(), null); onFailureLatch.await(); assertThat(reference.get(), hasToString(containsString("timeout while blocking operations"))); From ec91a191067c7a19de5e5d01081abe55bf366247 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Fri, 30 May 2025 11:13:32 -0400 Subject: [PATCH 03/26] commit --- .../elasticsearch/index/engine/Engine.java | 18 +++++------ .../index/engine/InternalEngine.java | 32 +++++++++++-------- .../index/engine/ReadOnlyEngine.java | 4 +-- .../elasticsearch/index/shard/IndexShard.java | 8 ++--- .../shard/IndexShardOperationPermits.java | 6 ++-- 5 files changed, 36 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 3fab9c64b4cca..8edf3c323cce8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -461,7 +461,7 @@ protected static final class IndexThrottle { private final Lock pauseIndexingLock = new ReentrantLock(); private final Condition pauseCondition = pauseIndexingLock.newCondition(); private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); - private volatile AtomicBoolean pauseThrottling = new AtomicBoolean(); + private volatile AtomicBoolean suspendThrottling = new AtomicBoolean(); private final boolean pauseWhenThrottled; private volatile ReleasableLock lock = NOOP_LOCK; @@ -472,10 +472,10 @@ public IndexThrottle(boolean pause) { public Releasable acquireThrottle() { var lockCopy = this.lock; if (lockCopy == pauseLockReference) { - //try (pauseLockReference.acquire();) - try (var ignored = pauseLockReference.acquire()) { + // try (pauseLockReference.acquire();) + try (var ignored = pauseLockReference.acquire()) { // If throttling is activated and not temporarily paused - while ((lock == pauseLockReference) && (pauseThrottling.getAcquire() == false)) { + while ((lock == pauseLockReference) && (suspendThrottling.getAcquire() == false)) { logger.trace("Waiting on pause indexing lock"); pauseCondition.await(); } @@ -541,7 +541,7 @@ long getThrottleTimeInMillis() { public void suspendThrottle() { if (pauseWhenThrottled) { try (Releasable releasableLock = pauseLockReference.acquire()) { - pauseThrottling.setRelease(true); + suspendThrottling.setRelease(true); pauseCondition.signalAll(); } } @@ -551,7 +551,7 @@ public void suspendThrottle() { public void resumeThrottle() { if (pauseWhenThrottled) { try (Releasable releasableLock = pauseLockReference.acquire()) { - pauseThrottling.setRelease(false); + suspendThrottling.setRelease(false); pauseCondition.signalAll(); } } @@ -2345,12 +2345,12 @@ public interface Warmer { * another task trying to get indexing permits might want to pause throttling * by letting one thread pass at a time so that it does not get starved. */ - public abstract void pauseThrottling(); + public abstract void suspendThrottling(); /** - * Reverses a previous {@link #pauseThrottling} call. + * Reverses a previous {@link #resumeThrottling} call. */ - public abstract void unPauseThrottling(); + public abstract void resumeThrottling(); public abstract boolean isIndexingPaused(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 45f3f5af9523a..d0accad9d5a23 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2823,32 +2823,36 @@ public void accept(ElasticsearchDirectoryReader reader, ElasticsearchDirectoryRe @Override public void activateThrottling() { - int count = throttleRequestCount.incrementAndGet(); - assert count >= 1 : "invalid post-increment throttleRequestCount=" + count; - if (count == 1) { - throttle.activate(); - logger.info("Activated throttling"); + synchronized (throttleRequestCount) { + int count = throttleRequestCount.incrementAndGet(); + assert count >= 1 : "invalid post-increment throttleRequestCount=" + count; + if (count == 1) { + throttle.activate(); + logger.info("Activated throttling"); + } } } @Override public void deactivateThrottling() { - int count = throttleRequestCount.decrementAndGet(); - assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count; - if (count == 0) { - throttle.deactivate(); - logger.info("Deactivated throttling"); + synchronized (throttleRequestCount) { + int count = throttleRequestCount.decrementAndGet(); + assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count; + if (count == 0) { + throttle.deactivate(); + logger.info("Deactivated throttling"); + } } } @Override - public void pauseThrottling() { - throttle.pauseThrottle(); + public void suspendThrottling() { + throttle.suspendThrottle(); } @Override - public void unPauseThrottling() { - throttle.unpauseThrottle(); + public void resumeThrottling() { + throttle.resumeThrottle(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index a308040cc8ed6..cc877750e23d3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -508,10 +508,10 @@ public void activateThrottling() {} public void deactivateThrottling() {} @Override - public void pauseThrottling() {} + public void suspendThrottling() {} @Override - public void unPauseThrottling() {} + public void resumeThrottling() {} @Override public boolean isIndexingPaused() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 52acf312b5a94..100d6ceaa325a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2758,7 +2758,7 @@ public void deactivateThrottling() { } } - public boolean pauseThrottling() { + public boolean suspendThrottling() { Engine engine = getEngineOrNull(); final boolean indexingPaused; if (engine == null) { @@ -2767,15 +2767,15 @@ public boolean pauseThrottling() { indexingPaused = engine.isIndexingPaused(); } if (indexingPaused) { - engine.pauseThrottling(); + engine.suspendThrottling(); return (true); } return (false); } - public void unpauseThrottling() { + public void resumeThrottling() { try { - getEngine().unPauseThrottling(); + getEngine().resumeThrottling(); } catch (AlreadyClosedException ex) { // ignore } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 0b09a52ef0bb2..25092aa6a05c0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -88,12 +88,12 @@ public void blockOperations( @Nullable IndexShard indexShard ) { delayOperations(); - // If indexing is paused on the shard, unpause it so that any currently waiting task can + // If indexing is paused on the shard, suspend it so that any currently paused task can // go ahead and release the indexing permit it holds. - boolean throttlingPaused = indexShard.pauseThrottling(); + boolean throttlingPaused = indexShard.suspendThrottling(); waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); if (throttlingPaused) { - indexShard.unpauseThrottling(); + indexShard.resumeThrottling(); } } From f12949e6ec45bc5752eb2d0d63c4629fff64a26c Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Mon, 2 Jun 2025 09:58:04 -0400 Subject: [PATCH 04/26] commit --- .../elasticsearch/recovery/RelocationIT.java | 61 +++++++++++++++++++ .../elasticsearch/index/engine/Engine.java | 1 + .../shard/IndexShardOperationPermits.java | 4 +- 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index c6ad7326fe569..35e733d8161f0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -43,6 +43,8 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndexingMemoryController; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.plugins.Plugin; @@ -74,6 +76,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -117,6 +120,64 @@ public Settings indexSettings() { .build(); } + public void testSimpleRelocationIndexingPaused() { + logger.info("--> starting [node1] ..."); + final String node_1 = internalCluster().startNode( + Settings.builder() + .put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)); + + logger.info("--> creating test index ..."); + prepareCreate("test", indexSettings(1, 0)).get(); + + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get(); + } + logger.info("--> flush so we have an actual index"); + indicesAdmin().prepareFlush().get(); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get(); + } + + logger.info("--> verifying count"); + indicesAdmin().prepareRefresh().get(); + assertHitCount(prepareSearch("test").setSize(0), 20L); + + logger.info("--> start another node"); + final String node_2 = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .get(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1); + IndexService indexService = indicesService.indexService(resolveIndex("test")); + IndexShard shard = indexService.getShard(0); + shard.activateThrottling(); + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); + logger.info("--> index 1 more doc"); + IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); + var future = indexRequestBuilder.execute(); + //future.actionGet(); + + logger.info("--> relocate the shard from node1 to node2"); + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); + + clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .get(); + //assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(true)); + + logger.info("--> verifying count again..."); + indicesAdmin().prepareRefresh().get(); + assertHitCount(prepareSearch("test").setSize(0), 20); + } + public void testSimpleRelocationNoIndexing() { logger.info("--> starting [node1] ..."); final String node_1 = internalCluster().startNode(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index a3cbd55230bd8..f5dcbcc642985 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -495,6 +495,7 @@ public void activate() { startOfThrottleNS = System.nanoTime(); if (pauseWhenThrottled) { lock = pauseLockReference; + System.out.println("Activated index throttling pause"); logger.trace("Activated index throttling pause"); } else { lock = lockReference; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 25092aa6a05c0..5e8a57f56bf08 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -90,11 +90,13 @@ public void blockOperations( delayOperations(); // If indexing is paused on the shard, suspend it so that any currently paused task can // go ahead and release the indexing permit it holds. - boolean throttlingPaused = indexShard.suspendThrottling(); + //boolean throttlingPaused = indexShard.suspendThrottling(); waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); + /* if (throttlingPaused) { indexShard.resumeThrottling(); } + */ } private void waitUntilBlocked(ActionListener onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { From 90670f3b885c74f6cc3492c3c83f5daf132c4f03 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Mon, 2 Jun 2025 17:38:27 -0400 Subject: [PATCH 05/26] commit --- .../elasticsearch/recovery/RelocationIT.java | 53 ++++++++++++------- .../elasticsearch/index/engine/Engine.java | 9 ++-- .../elasticsearch/index/shard/IndexShard.java | 10 ++++ .../shard/IndexShardOperationPermits.java | 10 ++-- 4 files changed, 52 insertions(+), 30 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 35e733d8161f0..7defe8c9effb1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -120,11 +120,9 @@ public Settings indexSettings() { .build(); } - public void testSimpleRelocationIndexingPaused() { + public void testSimpleRelocationNoIndexing() { logger.info("--> starting [node1] ..."); - final String node_1 = internalCluster().startNode( - Settings.builder() - .put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)); + final String node_1 = internalCluster().startNode(); logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, 0)).get(); @@ -152,16 +150,6 @@ public void testSimpleRelocationIndexingPaused() { .get(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShard(0); - shard.activateThrottling(); - LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); - logger.info("--> index 1 more doc"); - IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); - var future = indexRequestBuilder.execute(); - //future.actionGet(); - logger.info("--> relocate the shard from node1 to node2"); ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); @@ -170,17 +158,23 @@ public void testSimpleRelocationIndexingPaused() { .setWaitForNoRelocatingShards(true) .setTimeout(ACCEPTABLE_RELOCATION_TIME) .get(); - //assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(true)); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> verifying count again..."); indicesAdmin().prepareRefresh().get(); assertHitCount(prepareSearch("test").setSize(0), 20); } - public void testSimpleRelocationNoIndexing() { + // This tests that relocation can successfully suspend index throttling to grab + // indexing permits required for relocation to succeed. + public void testSimpleRelocationWithIndexingPaused() throws Exception { logger.info("--> starting [node1] ..."); - final String node_1 = internalCluster().startNode(); + // Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate + // index throttling for a shard on this node, it will pause indexing for that shard until throttling + // is deactivated. + final String node_1 = internalCluster().startNode( + Settings.builder() + .put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)); logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, 0)).get(); @@ -208,9 +202,27 @@ public void testSimpleRelocationNoIndexing() { .get(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + // Activate index throttling on "test" index primary shard + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1); + IndexService indexService = indicesService.indexService(resolveIndex("test")); + IndexShard shard = indexService.getShard(0); + shard.activateThrottling(); + // Verify that indexing is paused for the throttled shard + assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); }); + // Try to index a document into the "test" index which is currently throttled + logger.info("--> Try to index a doc while indexing is paused"); + IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); + var future = indexRequestBuilder.execute(); + // Verify that the new document has not been indexed indicating that the indexing thread is paused. + logger.info("--> verifying count is unchanged..."); + indicesAdmin().prepareRefresh().get(); + assertHitCount(prepareSearch("test").setSize(0), 20); + logger.info("--> relocate the shard from node1 to node2"); ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); + // Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing + // the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed. clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) .setWaitForEvents(Priority.LANGUID) .setWaitForNoRelocatingShards(true) @@ -218,9 +230,10 @@ public void testSimpleRelocationNoIndexing() { .get(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - logger.info("--> verifying count again..."); + logger.info("--> verifying count after relocation ..."); indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch("test").setSize(0), 20); + assertHitCount(prepareSearch("test").setSize(0), 21); + logger.info("--> Test finished ..."); } public void testRelocationWhileIndexingRandom() throws Exception { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index f5dcbcc642985..c565e1621add7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -461,7 +461,10 @@ protected static final class IndexThrottle { private final Condition pauseCondition = pauseIndexingLock.newCondition(); private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); private volatile AtomicBoolean suspendThrottling = new AtomicBoolean(); - private final boolean pauseWhenThrottled; // Should throttling pause indexing ? + + // Should throttling pause indexing ? This is decided by the + // IndexingMemoryController#PAUSE_INDEXING_ON_THROTTLE setting for this node. + private final boolean pauseWhenThrottled; private volatile ReleasableLock lock = NOOP_LOCK; public IndexThrottle(boolean pause) { @@ -547,7 +550,7 @@ boolean isIndexingPaused() { /** Suspend throttling to allow another task such as relocation to acquire all indexing permits */ public void suspendThrottle() { if (pauseWhenThrottled) { - try (Releasable releasableLock = pauseLockReference.acquire()) { + try (Releasable ignored = pauseLockReference.acquire()) { suspendThrottling.setRelease(true); pauseCondition.signalAll(); } @@ -557,7 +560,7 @@ public void suspendThrottle() { /** Reverse what was done in {@link #suspendThrottle()} */ public void resumeThrottle() { if (pauseWhenThrottled) { - try (Releasable releasableLock = pauseLockReference.acquire()) { + try (Releasable ignored = pauseLockReference.acquire()) { suspendThrottling.setRelease(false); pauseCondition.signalAll(); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 100d6ceaa325a..43f0da33a223d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2758,6 +2758,16 @@ public void deactivateThrottling() { } } + public boolean isIndexingPaused() { + Engine engine = getEngineOrNull(); + final boolean indexingPaused; + if (engine == null) { + indexingPaused = false; + } else { + indexingPaused = engine.isIndexingPaused(); + } + return (indexingPaused); + } public boolean suspendThrottling() { Engine engine = getEngineOrNull(); final boolean indexingPaused; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 5e8a57f56bf08..8081ff9932ab6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -88,15 +88,11 @@ public void blockOperations( @Nullable IndexShard indexShard ) { delayOperations(); - // If indexing is paused on the shard, suspend it so that any currently paused task can + // In case indexing is paused on the shard, suspend throttling so that any currently paused task can // go ahead and release the indexing permit it holds. - //boolean throttlingPaused = indexShard.suspendThrottling(); + indexShard.suspendThrottling(); waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); - /* - if (throttlingPaused) { - indexShard.resumeThrottling(); - } - */ + indexShard.resumeThrottling(); } private void waitUntilBlocked(ActionListener onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { From 45e3799a189dde2246f73e2db507e9e81c107665 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Mon, 2 Jun 2025 23:33:53 -0400 Subject: [PATCH 06/26] commit --- .../elasticsearch/recovery/RelocationIT.java | 25 +++++++++++++------ .../elasticsearch/index/engine/Engine.java | 3 +-- .../elasticsearch/index/shard/IndexShard.java | 1 + .../shard/IndexShardOperationPermits.java | 1 + 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 7defe8c9effb1..4c9703deb25c3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -76,7 +76,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -173,8 +172,8 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { // index throttling for a shard on this node, it will pause indexing for that shard until throttling // is deactivated. final String node_1 = internalCluster().startNode( - Settings.builder() - .put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)); + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true) + ); logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, 0)).get(); @@ -204,8 +203,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { // Activate index throttling on "test" index primary shard IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1); - IndexService indexService = indicesService.indexService(resolveIndex("test")); - IndexShard shard = indexService.getShard(0); + IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); // Verify that indexing is paused for the throttled shard assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); }); @@ -230,16 +228,18 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { .get(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + // Relocated shard is not throttled + assertThat(shard.isIndexingPaused(), equalTo(false)); logger.info("--> verifying count after relocation ..."); indicesAdmin().prepareRefresh().get(); assertHitCount(prepareSearch("test").setSize(0), 21); - logger.info("--> Test finished ..."); } public void testRelocationWhileIndexingRandom() throws Exception { int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4); int numberOfReplicas = randomBoolean() ? 0 : 1; int numberOfNodes = numberOfReplicas == 0 ? 2 : 3; + boolean throttleIndexing = randomBoolean(); logger.info( "testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})", @@ -248,9 +248,12 @@ public void testRelocationWhileIndexingRandom() throws Exception { numberOfNodes ); + // Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate + // index throttling for a shard on this node, it will pause indexing for that shard until throttling + // is deactivated. String[] nodes = new String[numberOfNodes]; logger.info("--> starting [node1] ..."); - nodes[0] = internalCluster().startNode(); + nodes[0] = internalCluster().startNode(Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)); logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, numberOfReplicas)).get(); @@ -274,6 +277,14 @@ public void testRelocationWhileIndexingRandom() throws Exception { waitForDocs(numDocs, indexer); logger.info("--> {} docs indexed", numDocs); + if (throttleIndexing) { + // Activate index throttling on "test" index primary shard + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[0]); + IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); + shard.activateThrottling(); + // Verify that indexing is paused for the throttled shard + assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); }); + } logger.info("--> starting relocations..."); int nodeShiftBased = numberOfReplicas; // if we have replicas shift those for (int i = 0; i < numberOfRelocations; i++) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index c565e1621add7..ec3eda2277f0b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -498,7 +498,6 @@ public void activate() { startOfThrottleNS = System.nanoTime(); if (pauseWhenThrottled) { lock = pauseLockReference; - System.out.println("Activated index throttling pause"); logger.trace("Activated index throttling pause"); } else { lock = lockReference; @@ -544,7 +543,7 @@ boolean isThrottled() { } boolean isIndexingPaused() { - return (pauseWhenThrottled && isThrottled()); + return (lock == pauseLockReference); } /** Suspend throttling to allow another task such as relocation to acquire all indexing permits */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 43f0da33a223d..f4386d462b9b1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2768,6 +2768,7 @@ public boolean isIndexingPaused() { } return (indexingPaused); } + public boolean suspendThrottling() { Engine engine = getEngineOrNull(); final boolean indexingPaused; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 8081ff9932ab6..b32fefc1a446b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -92,6 +92,7 @@ public void blockOperations( // go ahead and release the indexing permit it holds. indexShard.suspendThrottling(); waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); + // TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled indexShard.resumeThrottling(); } From bf91cab99141b9f3ee1c66979977dea09e315ecc Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Mon, 2 Jun 2025 23:50:31 -0400 Subject: [PATCH 07/26] commit --- .../elasticsearch/index/engine/ThreadPoolMergeScheduler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 23d48d8a5da6f..33ef06699c8c7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -236,14 +236,13 @@ isAutoThrottle && isAutoThrottle(), ); } - private synchronized void checkMergeTaskThrottling() { + private void checkMergeTaskThrottling() { long submittedMergesCount = submittedMergeTaskCount.get(); long doneMergesCount = doneMergeTaskCount.get(); int runningMergesCount = runningMergeTasks.size(); int configuredMaxMergeCount = getMaxMergeCount(); // both currently running and enqueued merge tasks are considered "active" for throttling purposes int activeMerges = (int) (submittedMergesCount - doneMergesCount); - if (activeMerges > configuredMaxMergeCount // only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec() From e642fea7352fb99a00644cb1e2a3074836f2259b Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Tue, 3 Jun 2025 20:14:31 -0400 Subject: [PATCH 08/26] address review comments --- .../elasticsearch/recovery/RelocationIT.java | 23 ++++----- .../elasticsearch/index/shard/IndexShard.java | 49 +++++++++++++------ .../shard/IndexShardOperationPermits.java | 12 ++--- .../IndexShardOperationPermitsTests.java | 23 +++++---- 4 files changed, 59 insertions(+), 48 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 4c9703deb25c3..039f431b64a26 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.tests.util.English; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -184,14 +185,10 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { } logger.info("--> flush so we have an actual index"); indicesAdmin().prepareFlush().get(); - logger.info("--> index more docs so we have something in the translog"); - for (int i = 10; i < 20; i++) { - prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get(); - } logger.info("--> verifying count"); indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch("test").setSize(0), 20L); + assertHitCount(prepareSearch("test").setSize(0), 10L); logger.info("--> start another node"); final String node_2 = internalCluster().startNode(); @@ -206,15 +203,16 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); // Verify that indexing is paused for the throttled shard - assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); }); + assertThat(shard.isIndexingPaused(), equalTo(true)); // Try to index a document into the "test" index which is currently throttled logger.info("--> Try to index a doc while indexing is paused"); IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); var future = indexRequestBuilder.execute(); + expectThrows(ElasticsearchException.class, () -> future.actionGet(10, TimeUnit.SECONDS)); // Verify that the new document has not been indexed indicating that the indexing thread is paused. logger.info("--> verifying count is unchanged..."); indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch("test").setSize(0), 20); + assertHitCount(prepareSearch("test").setSize(0), 10); logger.info("--> relocate the shard from node1 to node2"); ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); @@ -231,8 +229,9 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { // Relocated shard is not throttled assertThat(shard.isIndexingPaused(), equalTo(false)); logger.info("--> verifying count after relocation ..."); + future.actionGet(); indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch("test").setSize(0), 21); + assertHitCount(prepareSearch("test").setSize(0), 11); } public void testRelocationWhileIndexingRandom() throws Exception { @@ -253,7 +252,9 @@ public void testRelocationWhileIndexingRandom() throws Exception { // is deactivated. String[] nodes = new String[numberOfNodes]; logger.info("--> starting [node1] ..."); - nodes[0] = internalCluster().startNode(Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)); + nodes[0] = internalCluster().startNode( + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean()) + ); logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, numberOfReplicas)).get(); @@ -282,8 +283,8 @@ public void testRelocationWhileIndexingRandom() throws Exception { IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[0]); IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); - // Verify that indexing is paused for the throttled shard - assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); }); + // Verify that indexing is throttled for this shard + assertBusy(() -> { assertThat(shard.getEngineOrNull().isThrottled(), equalTo(true)); }); } logger.info("--> starting relocations..."); int nodeShiftBased = numberOfReplicas; // if we have replicas shift those diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f4386d462b9b1..f3b7866ce200f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -801,7 +801,8 @@ public void relocated( ) throws IllegalIndexShardStateException, IllegalStateException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { - indexShardOperationPermits.blockOperations(new ActionListener<>() { + // indexShardOperationPermits.blockOperations(new ActionListener<>() { + blockOperations(new ActionListener<>() { @Override public void onResponse(Releasable releasable) { boolean success = false; @@ -883,8 +884,7 @@ public void onFailure(Exception e) { 30L, TimeUnit.MINUTES, // Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it - EsExecutors.DIRECT_EXECUTOR_SERVICE, - this + EsExecutors.DIRECT_EXECUTOR_SERVICE ); } @@ -2769,19 +2769,9 @@ public boolean isIndexingPaused() { return (indexingPaused); } - public boolean suspendThrottling() { + public void suspendThrottling() { Engine engine = getEngineOrNull(); - final boolean indexingPaused; - if (engine == null) { - indexingPaused = false; - } else { - indexingPaused = engine.isIndexingPaused(); - } - if (indexingPaused) { - engine.suspendThrottling(); - return (true); - } - return (false); + engine.suspendThrottling(); } public void resumeThrottling() { @@ -3841,6 +3831,32 @@ private ActionListener wrapPrimaryOperationPermitListener(final Acti }); } + /** + * Immediately delays operations and uses the {@code executor} to wait for in-flight operations to finish and then acquires all + * permits. When all permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are + * started. Delayed operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in + * this case the {@code onFailure} handler will be invoked after delayed operations are released. + * + * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed. This listener should not throw. + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument + * @param executor executor on which to wait for in-flight operations to finish and acquire all permits + */ + public void blockOperations( + final ActionListener onAcquired, + final long timeout, + final TimeUnit timeUnit, + final Executor executor + ) { + indexShardOperationPermits.delayOperations(); + // In case indexing is paused on the shard, suspend throttling so that any currently paused task can + // go ahead and release the indexing permit it holds. + suspendThrottling(); + indexShardOperationPermits.waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); + // TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled + resumeThrottling(); + } + private void asyncBlockOperations(ActionListener onPermitAcquired, long timeout, TimeUnit timeUnit) { final Releasable forceRefreshes = refreshListeners.forceRefreshes(); final ActionListener wrappedListener = ActionListener.wrap(r -> { @@ -3851,7 +3867,8 @@ private void asyncBlockOperations(ActionListener onPermitAcquired, l onPermitAcquired.onFailure(e); }); try { - indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic(), this); + blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic()); + // indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic(), this); } catch (Exception e) { forceRefreshes.close(); throw e; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index b32fefc1a446b..27143ea278c78 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -84,19 +84,13 @@ public void blockOperations( final ActionListener onAcquired, final long timeout, final TimeUnit timeUnit, - final Executor executor, - @Nullable IndexShard indexShard + final Executor executor ) { delayOperations(); - // In case indexing is paused on the shard, suspend throttling so that any currently paused task can - // go ahead and release the indexing permit it holds. - indexShard.suspendThrottling(); waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); - // TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled - indexShard.resumeThrottling(); } - private void waitUntilBlocked(ActionListener onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { + protected void waitUntilBlocked(ActionListener onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { executor.execute(new AbstractRunnable() { final Releasable released = Releasables.releaseOnce(() -> releaseDelayedOperations()); @@ -132,7 +126,7 @@ protected void doRun() { }); } - private void delayOperations() { + protected void delayOperations() { if (closed) { throw new IndexShardClosedException(shardId); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 3034f392630ab..cb9927be732f6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -193,8 +193,7 @@ public void testBlockIfClosed() { wrap(() -> { throw new IllegalArgumentException("fake error"); }), randomInt(10), TimeUnit.MINUTES, - threadPool.generic(), - null + threadPool.generic() ) ); } @@ -220,7 +219,7 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce blocked.set(true); blockAcquired.countDown(); releaseBlock.await(); - }), 30, TimeUnit.MINUTES, threadPool.generic(), null); + }), 30, TimeUnit.MINUTES, threadPool.generic()); assertFalse(blocked.get()); assertFalse(future.isDone()); } @@ -314,7 +313,7 @@ public void onFailure(Exception e) { throw new RuntimeException(e); } } - }, blockReleased::countDown), 1, TimeUnit.MINUTES, threadPool.generic(), null); + }, blockReleased::countDown), 1, TimeUnit.MINUTES, threadPool.generic()); blockAcquired.await(); return () -> { releaseBlock.countDown(); @@ -334,7 +333,7 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx blocked.set(true); blockAcquired.countDown(); releaseBlock.await(); - }), 30, TimeUnit.MINUTES, threadPool.generic(), null); + }), 30, TimeUnit.MINUTES, threadPool.generic()); blockAcquired.await(); assertTrue(blocked.get()); @@ -382,7 +381,7 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE permits.blockOperations(wrap(() -> { onBlocked.set(true); blockedLatch.countDown(); - }), 30, TimeUnit.MINUTES, threadPool.generic(), null); + }), 30, TimeUnit.MINUTES, threadPool.generic()); assertFalse(onBlocked.get()); // if we submit another operation, it should be delayed @@ -464,7 +463,7 @@ public void onFailure(Exception e) { permits.blockOperations(wrap(() -> { values.add(operations); operationLatch.countDown(); - }), 30, TimeUnit.MINUTES, threadPool.generic(), null); + }), 30, TimeUnit.MINUTES, threadPool.generic()); }); blockingThread.start(); @@ -541,7 +540,7 @@ public void testAsyncBlockOperationsOnRejection() { final var rejectingExecutor = threadPool.executor(REJECTING_EXECUTOR); rejectingExecutor.execute(threadBlock::actionGet); assertThat( - safeAwaitFailure(Releasable.class, l -> permits.blockOperations(l, 1, TimeUnit.HOURS, rejectingExecutor, null)), + safeAwaitFailure(Releasable.class, l -> permits.blockOperations(l, 1, TimeUnit.HOURS, rejectingExecutor)), instanceOf(EsRejectedExecutionException.class) ); @@ -554,7 +553,7 @@ public void testAsyncBlockOperationsOnRejection() { } // ensure that another block can still be acquired - try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic(), null))) { + try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic()))) { assertNotNull(block); } } @@ -569,7 +568,7 @@ public void testAsyncBlockOperationsOnTimeout() { safeAwaitFailure( ElasticsearchTimeoutException.class, Releasable.class, - f -> permits.blockOperations(f, 0, TimeUnit.SECONDS, threadPool.generic(), null) + f -> permits.blockOperations(f, 0, TimeUnit.SECONDS, threadPool.generic()) ).getMessage() ); @@ -583,7 +582,7 @@ public void testAsyncBlockOperationsOnTimeout() { } // ensure that another block can still be acquired - try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic(), null))) { + try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic()))) { assertNotNull(block); } } @@ -614,7 +613,7 @@ public void onFailure(final Exception e) { reference.set(e); onFailureLatch.countDown(); } - }, 1, TimeUnit.MILLISECONDS, threadPool.generic(), null); + }, 1, TimeUnit.MILLISECONDS, threadPool.generic()); onFailureLatch.await(); assertThat(reference.get(), hasToString(containsString("timeout while blocking operations"))); From 82a37f54cb6ba62c435e080e489c0fdcefb44dd2 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 4 Jun 2025 08:35:20 -0400 Subject: [PATCH 09/26] test failure --- .../java/org/elasticsearch/index/shard/IndexShard.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f3b7866ce200f..e3fb46a7feef5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2770,8 +2770,12 @@ public boolean isIndexingPaused() { } public void suspendThrottling() { - Engine engine = getEngineOrNull(); - engine.suspendThrottling(); + try { + getEngine().suspendThrottling(); + } catch (AlreadyClosedException ex) { + // ignore + } + } public void resumeThrottling() { From 560a035d80bf70e54a1feaf474f5fd1d00123727 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 4 Jun 2025 09:21:06 -0400 Subject: [PATCH 10/26] remove commented code --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e3fb46a7feef5..4289dd7e6d637 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -801,7 +801,6 @@ public void relocated( ) throws IllegalIndexShardStateException, IllegalStateException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { - // indexShardOperationPermits.blockOperations(new ActionListener<>() { blockOperations(new ActionListener<>() { @Override public void onResponse(Releasable releasable) { @@ -3872,7 +3871,6 @@ private void asyncBlockOperations(ActionListener onPermitAcquired, l }); try { blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic()); - // indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic(), this); } catch (Exception e) { forceRefreshes.close(); throw e; From 048f944aa9a18f6dd5d63b1e78d49105bea71a81 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 4 Jun 2025 10:04:39 -0400 Subject: [PATCH 11/26] minor changes --- .../java/org/elasticsearch/recovery/RelocationIT.java | 2 -- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 1 - 2 files changed, 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 039f431b64a26..62fa799b39201 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -226,8 +226,6 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { .get(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - // Relocated shard is not throttled - assertThat(shard.isIndexingPaused(), equalTo(false)); logger.info("--> verifying count after relocation ..."); future.actionGet(); indicesAdmin().prepareRefresh().get(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4289dd7e6d637..d0e8c67bf49cc 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2774,7 +2774,6 @@ public void suspendThrottling() { } catch (AlreadyClosedException ex) { // ignore } - } public void resumeThrottling() { From 3e044bf1e77e32de4efb54022da7efa16dee111d Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 4 Jun 2025 11:33:11 -0400 Subject: [PATCH 12/26] modified testRelocationWhileIndexingRandom --- .../elasticsearch/recovery/RelocationIT.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 62fa799b39201..ff056a8b252b2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -276,13 +276,13 @@ public void testRelocationWhileIndexingRandom() throws Exception { waitForDocs(numDocs, indexer); logger.info("--> {} docs indexed", numDocs); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[0]); + IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); if (throttleIndexing) { // Activate index throttling on "test" index primary shard - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[0]); - IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); // Verify that indexing is throttled for this shard - assertBusy(() -> { assertThat(shard.getEngineOrNull().isThrottled(), equalTo(true)); }); + assertThat(shard.getEngineOrNull().isThrottled(), equalTo(true)); } logger.info("--> starting relocations..."); int nodeShiftBased = numberOfReplicas; // if we have replicas shift those @@ -308,8 +308,22 @@ public void testRelocationWhileIndexingRandom() throws Exception { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); indexer.pauseIndexing(); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); + if(throttleIndexing) { + // Deactivate throttling on source shard to allow indexing threads to pass + shard.deactivateThrottling(); + // Activate throttling on target shard before next relocation + indicesService = internalCluster().getInstance(IndicesService.class, nodes[toNode]); + shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); + shard.activateThrottling(); + // Verify that indexing is throttled for this shard + assertThat(shard.getEngineOrNull().isThrottled(), equalTo(true)); + } } logger.info("--> done relocations"); + // Deactivate throttling on the primary shard to allow indexing threads to pass + if(throttleIndexing) { + shard.deactivateThrottling(); + } logger.info("--> waiting for indexing threads to stop ..."); indexer.stopAndAwaitStopped(); logger.info("--> indexing threads stopped"); From 7abedf20dbf9cc2efc92f814b5abd5d2dce32eec Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 4 Jun 2025 15:43:04 +0000 Subject: [PATCH 13/26] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/recovery/RelocationIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index ff056a8b252b2..3037a4a4c48dd 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -308,7 +308,7 @@ public void testRelocationWhileIndexingRandom() throws Exception { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); indexer.pauseIndexing(); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); - if(throttleIndexing) { + if (throttleIndexing) { // Deactivate throttling on source shard to allow indexing threads to pass shard.deactivateThrottling(); // Activate throttling on target shard before next relocation @@ -321,7 +321,7 @@ public void testRelocationWhileIndexingRandom() throws Exception { } logger.info("--> done relocations"); // Deactivate throttling on the primary shard to allow indexing threads to pass - if(throttleIndexing) { + if (throttleIndexing) { shard.deactivateThrottling(); } logger.info("--> waiting for indexing threads to stop ..."); From c5cce6e3749c992c305511b6a2eb8447ffe88b79 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 5 Jun 2025 00:44:52 -0400 Subject: [PATCH 14/26] update index settings --- .../java/org/elasticsearch/recovery/RelocationIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 3037a4a4c48dd..36092ba6e7dde 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -151,7 +151,7 @@ public void testSimpleRelocationNoIndexing() { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> relocate the shard from node1 to node2"); - ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); + updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", node_2), "test"); clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) .setWaitForEvents(Priority.LANGUID) From 8485a8e1714025c4f92635e67d5fd50d808bc315 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 5 Jun 2025 01:21:32 -0400 Subject: [PATCH 15/26] test --- .../java/org/elasticsearch/recovery/RelocationIT.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 36092ba6e7dde..432853f016d7a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -151,7 +151,7 @@ public void testSimpleRelocationNoIndexing() { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> relocate the shard from node1 to node2"); - updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", node_2), "test"); + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) .setWaitForEvents(Priority.LANGUID) @@ -216,6 +216,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { logger.info("--> relocate the shard from node1 to node2"); ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); + // updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", node_2), "test"); // Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing // the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed. @@ -296,6 +297,10 @@ public void testRelocationWhileIndexingRandom() throws Exception { indexer.continueIndexing(numDocs); logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, nodes[fromNode], nodes[toNode])); + + // updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", nodes[toNode]), "test"); + // ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test"); + if (rarely()) { logger.debug("--> flushing"); indicesAdmin().prepareFlush().get(); From 5e81c31cffb49a0686f1f1a0a9a578d7a5c2f9e0 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 5 Jun 2025 12:07:08 -0400 Subject: [PATCH 16/26] address comments --- .../elasticsearch/recovery/RelocationIT.java | 32 +++++++++++-------- .../elasticsearch/index/engine/Engine.java | 2 -- .../index/engine/InternalEngine.java | 5 --- .../index/engine/ReadOnlyEngine.java | 5 --- .../elasticsearch/index/shard/IndexShard.java | 26 ++++++--------- .../shard/IndexShardOperationPermits.java | 4 +-- 6 files changed, 29 insertions(+), 45 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 432853f016d7a..19500dbd57f7f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -38,6 +38,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.IndexEventListener; @@ -179,8 +180,9 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, 0)).get(); - logger.info("--> index 10 docs"); - for (int i = 0; i < 10; i++) { + logger.info("--> index docs"); + int numDocs = between(1,10); + for (int i = 0; i < numDocs; i++) { prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get(); } logger.info("--> flush so we have an actual index"); @@ -188,7 +190,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { logger.info("--> verifying count"); indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch("test").setSize(0), 10L); + assertHitCount(prepareSearch("test").setSize(0), numDocs); logger.info("--> start another node"); final String node_2 = internalCluster().startNode(); @@ -203,20 +205,21 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); // Verify that indexing is paused for the throttled shard - assertThat(shard.isIndexingPaused(), equalTo(true)); + Engine engine = shard.getEngineOrNull(); + assertThat(engine != null && engine.isThrottled(), equalTo(true)); // Try to index a document into the "test" index which is currently throttled logger.info("--> Try to index a doc while indexing is paused"); IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); var future = indexRequestBuilder.execute(); - expectThrows(ElasticsearchException.class, () -> future.actionGet(10, TimeUnit.SECONDS)); + expectThrows(ElasticsearchException.class, () -> future.actionGet(500, TimeUnit.MILLISECONDS)); // Verify that the new document has not been indexed indicating that the indexing thread is paused. logger.info("--> verifying count is unchanged..."); indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch("test").setSize(0), 10); + assertHitCount(prepareSearch("test").setSize(0), numDocs); logger.info("--> relocate the shard from node1 to node2"); - ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2)); - // updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", node_2), "test"); + updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", node_2), "test"); + ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test"); // Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing // the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed. @@ -230,7 +233,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { logger.info("--> verifying count after relocation ..."); future.actionGet(); indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch("test").setSize(0), 11); + assertHitCount(prepareSearch("test").setSize(0), numDocs + 1); } public void testRelocationWhileIndexingRandom() throws Exception { @@ -283,7 +286,8 @@ public void testRelocationWhileIndexingRandom() throws Exception { // Activate index throttling on "test" index primary shard shard.activateThrottling(); // Verify that indexing is throttled for this shard - assertThat(shard.getEngineOrNull().isThrottled(), equalTo(true)); + Engine engine = shard.getEngineOrNull(); + assertThat(engine != null && engine.isThrottled(), equalTo(true)); } logger.info("--> starting relocations..."); int nodeShiftBased = numberOfReplicas; // if we have replicas shift those @@ -296,10 +300,9 @@ public void testRelocationWhileIndexingRandom() throws Exception { logger.debug("--> Allow indexer to index [{}] documents", numDocs); indexer.continueIndexing(numDocs); logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); - ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, nodes[fromNode], nodes[toNode])); - // updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", nodes[toNode]), "test"); - // ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test"); + updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", nodes[toNode]), "test"); + ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test"); if (rarely()) { logger.debug("--> flushing"); @@ -321,7 +324,8 @@ public void testRelocationWhileIndexingRandom() throws Exception { shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); // Verify that indexing is throttled for this shard - assertThat(shard.getEngineOrNull().isThrottled(), equalTo(true)); + Engine engine = shard.getEngineOrNull(); + assertThat(engine != null && engine.isThrottled(), equalTo(true)); } } logger.info("--> done relocations"); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 300b818bca8d1..3a656dd16f911 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -2276,8 +2276,6 @@ public interface Warmer { */ public abstract void resumeThrottling(); - public abstract boolean isIndexingPaused(); - /** * This method replays translog to restore the Lucene index which might be reverted previously. * This ensures that all acknowledged writes are restored correctly when this engine is promoted. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0209e6e084b28..cf6efe1cec096 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2867,11 +2867,6 @@ public boolean isThrottled() { return throttle.isThrottled(); } - @Override - public boolean isIndexingPaused() { - return throttle.isIndexingPaused(); - } - boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only return throttle.throttleLockIsHeldByCurrentThread(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index cc877750e23d3..91107b2663466 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -513,11 +513,6 @@ public void suspendThrottling() {} @Override public void resumeThrottling() {} - @Override - public boolean isIndexingPaused() { - return (false); - } - @Override public void trimUnreferencedTranslogFiles() {} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d0e8c67bf49cc..4fcafa0a4adcb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2757,18 +2757,7 @@ public void deactivateThrottling() { } } - public boolean isIndexingPaused() { - Engine engine = getEngineOrNull(); - final boolean indexingPaused; - if (engine == null) { - indexingPaused = false; - } else { - indexingPaused = engine.isIndexingPaused(); - } - return (indexingPaused); - } - - public void suspendThrottling() { + private void suspendThrottling() { try { getEngine().suspendThrottling(); } catch (AlreadyClosedException ex) { @@ -2776,7 +2765,7 @@ public void suspendThrottling() { } } - public void resumeThrottling() { + private void resumeThrottling() { try { getEngine().resumeThrottling(); } catch (AlreadyClosedException ex) { @@ -3850,13 +3839,16 @@ public void blockOperations( final TimeUnit timeUnit, final Executor executor ) { - indexShardOperationPermits.delayOperations(); // In case indexing is paused on the shard, suspend throttling so that any currently paused task can // go ahead and release the indexing permit it holds. suspendThrottling(); - indexShardOperationPermits.waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); - // TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled - resumeThrottling(); + try { + indexShardOperationPermits.blockOperations(ActionListener.runAfter(onAcquired, this::resumeThrottling), + timeout, timeUnit, executor); + } catch (IndexShardClosedException e) { + resumeThrottling(); + throw e; + } } private void asyncBlockOperations(ActionListener onPermitAcquired, long timeout, TimeUnit timeUnit) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 27143ea278c78..0427e9c99ea35 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -90,7 +90,7 @@ public void blockOperations( waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor); } - protected void waitUntilBlocked(ActionListener onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { + private void waitUntilBlocked(ActionListener onAcquired, long timeout, TimeUnit timeUnit, Executor executor) { executor.execute(new AbstractRunnable() { final Releasable released = Releasables.releaseOnce(() -> releaseDelayedOperations()); @@ -126,7 +126,7 @@ protected void doRun() { }); } - protected void delayOperations() { + private void delayOperations() { if (closed) { throw new IndexShardClosedException(shardId); } From 661fa12a12b73e4e116fee69b402410ff635d2f9 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 5 Jun 2025 12:46:26 -0400 Subject: [PATCH 17/26] test --- .../java/org/elasticsearch/recovery/RelocationIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 19500dbd57f7f..3b088cf553ecd 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -249,9 +249,7 @@ public void testRelocationWhileIndexingRandom() throws Exception { numberOfNodes ); - // Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate - // index throttling for a shard on this node, it will pause indexing for that shard until throttling - // is deactivated. + // Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless String[] nodes = new String[numberOfNodes]; logger.info("--> starting [node1] ..."); nodes[0] = internalCluster().startNode( @@ -318,13 +316,15 @@ public void testRelocationWhileIndexingRandom() throws Exception { logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); if (throttleIndexing) { // Deactivate throttling on source shard to allow indexing threads to pass + Engine engine = shard.getEngineOrNull(); + assertThat(engine, equalTo(null)); shard.deactivateThrottling(); // Activate throttling on target shard before next relocation indicesService = internalCluster().getInstance(IndicesService.class, nodes[toNode]); shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); // Verify that indexing is throttled for this shard - Engine engine = shard.getEngineOrNull(); + engine = shard.getEngineOrNull(); assertThat(engine != null && engine.isThrottled(), equalTo(true)); } } From 77058ad6a413098060e8495f5a8238f3ab7d02ff Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 5 Jun 2025 16:54:30 +0000 Subject: [PATCH 18/26] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/recovery/RelocationIT.java | 2 +- .../java/org/elasticsearch/index/shard/IndexShard.java | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 3b088cf553ecd..f69380d086104 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -181,7 +181,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { prepareCreate("test", indexSettings(1, 0)).get(); logger.info("--> index docs"); - int numDocs = between(1,10); + int numDocs = between(1, 10); for (int i = 0; i < numDocs; i++) { prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get(); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4fcafa0a4adcb..50f2f24e08362 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3843,8 +3843,12 @@ public void blockOperations( // go ahead and release the indexing permit it holds. suspendThrottling(); try { - indexShardOperationPermits.blockOperations(ActionListener.runAfter(onAcquired, this::resumeThrottling), - timeout, timeUnit, executor); + indexShardOperationPermits.blockOperations( + ActionListener.runAfter(onAcquired, this::resumeThrottling), + timeout, + timeUnit, + executor + ); } catch (IndexShardClosedException e) { resumeThrottling(); throw e; From 1d872c15b2548b938aa24dbbfd49f126831f2eb2 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 5 Jun 2025 13:05:45 -0400 Subject: [PATCH 19/26] test --- .../java/org/elasticsearch/recovery/RelocationIT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 3b088cf553ecd..4c672299d529c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -316,15 +316,13 @@ public void testRelocationWhileIndexingRandom() throws Exception { logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); if (throttleIndexing) { // Deactivate throttling on source shard to allow indexing threads to pass - Engine engine = shard.getEngineOrNull(); - assertThat(engine, equalTo(null)); shard.deactivateThrottling(); // Activate throttling on target shard before next relocation indicesService = internalCluster().getInstance(IndicesService.class, nodes[toNode]); shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); // Verify that indexing is throttled for this shard - engine = shard.getEngineOrNull(); + Engine engine = shard.getEngineOrNull(); assertThat(engine != null && engine.isThrottled(), equalTo(true)); } } From 610bc0fa5a9b8dbdb27543f4c245c7a1a01a97a7 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 5 Jun 2025 16:06:05 -0400 Subject: [PATCH 20/26] old changes --- .../java/org/elasticsearch/recovery/RelocationIT.java | 2 +- .../org/elasticsearch/index/shard/IndexShard.java | 11 ----------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index ff056a8b252b2..13258ee6ce0d2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -203,7 +203,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); shard.activateThrottling(); // Verify that indexing is paused for the throttled shard - assertThat(shard.isIndexingPaused(), equalTo(true)); + assertThat(shard.getEngineOrNull().isIndexingPaused(), equalTo(true)); // Try to index a document into the "test" index which is currently throttled logger.info("--> Try to index a doc while indexing is paused"); IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d0e8c67bf49cc..80e23f0fa05f6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2757,17 +2757,6 @@ public void deactivateThrottling() { } } - public boolean isIndexingPaused() { - Engine engine = getEngineOrNull(); - final boolean indexingPaused; - if (engine == null) { - indexingPaused = false; - } else { - indexingPaused = engine.isIndexingPaused(); - } - return (indexingPaused); - } - public void suspendThrottling() { try { getEngine().suspendThrottling(); From 6a2bf2b1b33d1affd0388f0c519ddbcff96301e5 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Thu, 5 Jun 2025 22:21:46 -0400 Subject: [PATCH 21/26] fix test --- .../elasticsearch/recovery/RelocationIT.java | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 19afdfaee87f5..54c2e9a5a6b79 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -253,15 +253,15 @@ public void testRelocationWhileIndexingRandom() throws Exception { String[] nodes = new String[numberOfNodes]; logger.info("--> starting [node1] ..."); nodes[0] = internalCluster().startNode( - Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean()) - ); + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())); logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, numberOfReplicas)).get(); for (int i = 2; i <= numberOfNodes; i++) { logger.info("--> starting [node{}] ...", i); - nodes[i - 1] = internalCluster().startNode(); + nodes[i - 1] = internalCluster().startNode( + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())); if (i != numberOfNodes) { ClusterHealthResponse healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) .setWaitForEvents(Priority.LANGUID) @@ -278,15 +278,6 @@ public void testRelocationWhileIndexingRandom() throws Exception { waitForDocs(numDocs, indexer); logger.info("--> {} docs indexed", numDocs); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[0]); - IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); - if (throttleIndexing) { - // Activate index throttling on "test" index primary shard - shard.activateThrottling(); - // Verify that indexing is throttled for this shard - Engine engine = shard.getEngineOrNull(); - assertThat(engine != null && engine.isThrottled(), equalTo(true)); - } logger.info("--> starting relocations..."); int nodeShiftBased = numberOfReplicas; // if we have replicas shift those for (int i = 0; i < numberOfRelocations; i++) { @@ -295,6 +286,18 @@ public void testRelocationWhileIndexingRandom() throws Exception { fromNode += nodeShiftBased; toNode += nodeShiftBased; numDocs = scaledRandomIntBetween(200, 1000); + + // Throttle indexing on source shard + if (throttleIndexing) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]); + IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); + // Activate index throttling on "test" index primary shard + logger.info("--> activate throttling for shard on node {}...", nodes[fromNode]); + shard.activateThrottling(); + // Verify that indexing is throttled for this shard + Engine engine = shard.getEngineOrNull(); + assertThat(engine != null && engine.isThrottled(), equalTo(true)); + } logger.debug("--> Allow indexer to index [{}] documents", numDocs); indexer.continueIndexing(numDocs); logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); @@ -314,23 +317,15 @@ public void testRelocationWhileIndexingRandom() throws Exception { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); indexer.pauseIndexing(); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); + // Deactivate throttle on source shard if (throttleIndexing) { - // Deactivate throttling on source shard to allow indexing threads to pass + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]); + IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); + logger.info("--> deactivate throttling for shard on node {}...", nodes[fromNode]); shard.deactivateThrottling(); - // Activate throttling on target shard before next relocation - indicesService = internalCluster().getInstance(IndicesService.class, nodes[toNode]); - shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); - shard.activateThrottling(); - // Verify that indexing is throttled for this shard - Engine engine = shard.getEngineOrNull(); - assertThat(engine != null && engine.isThrottled(), equalTo(true)); } } logger.info("--> done relocations"); - // Deactivate throttling on the primary shard to allow indexing threads to pass - if (throttleIndexing) { - shard.deactivateThrottling(); - } logger.info("--> waiting for indexing threads to stop ..."); indexer.stopAndAwaitStopped(); logger.info("--> indexing threads stopped"); From ee22887016e596ffadf9374adfb0aa0072e94058 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 6 Jun 2025 02:29:53 +0000 Subject: [PATCH 22/26] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/recovery/RelocationIT.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index 54c2e9a5a6b79..47a0beff5e11a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -253,7 +253,8 @@ public void testRelocationWhileIndexingRandom() throws Exception { String[] nodes = new String[numberOfNodes]; logger.info("--> starting [node1] ..."); nodes[0] = internalCluster().startNode( - Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())); + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean()) + ); logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, numberOfReplicas)).get(); @@ -261,7 +262,8 @@ public void testRelocationWhileIndexingRandom() throws Exception { for (int i = 2; i <= numberOfNodes; i++) { logger.info("--> starting [node{}] ...", i); nodes[i - 1] = internalCluster().startNode( - Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())); + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean()) + ); if (i != numberOfNodes) { ClusterHealthResponse healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) .setWaitForEvents(Priority.LANGUID) From c9438b47c45b60cdf05ad58af3c84f58a231fe95 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Fri, 11 Jul 2025 17:09:47 -0400 Subject: [PATCH 23/26] fix test + throttle only for primary --- .../elasticsearch/recovery/RelocationIT.java | 35 ++++++++++++------- .../elasticsearch/index/shard/IndexShard.java | 1 + 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index f24eb190b2ce6..f54445aa01bc5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -77,6 +77,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -206,6 +207,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { // Verify that indexing is paused for the throttled shard Engine engine = shard.getEngineOrNull(); assertThat(engine != null && engine.isThrottled(), equalTo(true)); + // Try to index a document into the "test" index which is currently throttled logger.info("--> Try to index a doc while indexing is paused"); IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); @@ -229,6 +231,12 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception { .get(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + logger.info("--> verifying shard primary has relocated ..."); + indicesService = internalCluster().getInstance(IndicesService.class, node_2); + shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); + assertThat(shard.routingEntry().primary(), equalTo(true)); + engine = shard.getEngineOrNull(); + assertThat(engine != null && engine.isThrottled(), equalTo(false)); logger.info("--> verifying count after relocation ..."); future.actionGet(); indicesAdmin().prepareRefresh().get(); @@ -258,6 +266,7 @@ public void testRelocationWhileIndexingRandom() throws Exception { logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, numberOfReplicas)).get(); + // Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless for (int i = 2; i <= numberOfNodes; i++) { logger.info("--> starting [node{}] ...", i); nodes[i - 1] = internalCluster().startNode( @@ -280,15 +289,21 @@ public void testRelocationWhileIndexingRandom() throws Exception { logger.info("--> {} docs indexed", numDocs); logger.info("--> starting relocations..."); - int nodeShiftBased = numberOfReplicas; // if we have replicas shift those + + // When we have a replica, the primary is on node 0 and replica is on node 1. We cannot move primary + // to a node containing the replica, so relocation of primary needs to happen between node 0 and 2. + // When there is no replica, we only have 2 nodes and primary relocates back and forth between node 0 and 1. for (int i = 0; i < numberOfRelocations; i++) { int fromNode = (i % 2); int toNode = fromNode == 0 ? 1 : 0; - fromNode += nodeShiftBased; - toNode += nodeShiftBased; + if (numberOfReplicas == 1) { + fromNode = fromNode == 1 ? 2 : 0; + toNode = toNode == 1 ? 2 : 0; + } + numDocs = scaledRandomIntBetween(200, 1000); - // Throttle indexing on source shard + // Throttle indexing on primary shard if (throttleIndexing) { IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]); IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); @@ -303,8 +318,7 @@ public void testRelocationWhileIndexingRandom() throws Exception { indexer.continueIndexing(numDocs); logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); - updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", nodes[toNode]), "test"); - ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test"); + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, nodes[fromNode], nodes[toNode])); if (rarely()) { logger.debug("--> flushing"); @@ -314,18 +328,13 @@ public void testRelocationWhileIndexingRandom() throws Exception { .setWaitForEvents(Priority.LANGUID) .setWaitForNoRelocatingShards(true) .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .setWaitForGreenStatus() .get(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); indexer.pauseIndexing(); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); - // Deactivate throttle on source shard - if (throttleIndexing) { - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]); - IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); - logger.info("--> deactivate throttling for shard on node {}...", nodes[fromNode]); - shard.deactivateThrottling(); - } } + logger.info("--> done relocations"); logger.info("--> waiting for indexing threads to stop ..."); indexer.stopAndAwaitStopped(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 44e9442c30bdc..7823cd04ecaf0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2754,6 +2754,7 @@ public IndexEventListener getIndexEventListener() { * setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread. */ public void activateThrottling() { + assert shardRouting.primary(): "only primaries can be throttled: " + shardRouting; try { getEngine().activateThrottling(); } catch (AlreadyClosedException ex) { From 31fe364e58cbc4cf31c0bd2211b71248e0d5e47e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 11 Jul 2025 21:19:59 +0000 Subject: [PATCH 24/26] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/recovery/RelocationIT.java | 1 - .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index f54445aa01bc5..df50d7fb0603e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -77,7 +77,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.Stream; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7823cd04ecaf0..66564b332f4bb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2754,7 +2754,7 @@ public IndexEventListener getIndexEventListener() { * setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread. */ public void activateThrottling() { - assert shardRouting.primary(): "only primaries can be throttled: " + shardRouting; + assert shardRouting.primary() : "only primaries can be throttled: " + shardRouting; try { getEngine().activateThrottling(); } catch (AlreadyClosedException ex) { From 2ee8dfb8c54093ab570e033627fe1856b929b4c1 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 30 Jul 2025 14:39:51 -0400 Subject: [PATCH 25/26] relax assert that throttled shard is primary --- .../src/main/java/org/elasticsearch/index/engine/Engine.java | 3 +++ .../main/java/org/elasticsearch/index/shard/IndexShard.java | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 777dea6425780..a244ad4f17da6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -147,6 +147,9 @@ public abstract class Engine implements Closeable { protected final ReentrantLock failEngineLock = new ReentrantLock(); protected final SetOnce failedEngine = new SetOnce<>(); protected final boolean enableRecoverySource; + // This should only be enabled in serverless. In stateful clusters, where we have + // indexing replicas, if pause throttling gets enabled on replicas, it will indirectly + // pause the primary as well which might prevent us from relocating the primary shard. protected final boolean pauseIndexingOnThrottle; private final AtomicBoolean isClosing = new AtomicBoolean(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8f59d4991bff6..19af2616dbb7b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2755,7 +2755,6 @@ public IndexEventListener getIndexEventListener() { * setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread. */ public void activateThrottling() { - assert shardRouting.primary() : "only primaries can be throttled: " + shardRouting; try { getEngine().activateThrottling(); } catch (AlreadyClosedException ex) { From f492df17ebea78221927733d614c24f9a84ffe86 Mon Sep 17 00:00:00 2001 From: Ankita Kumar Date: Wed, 30 Jul 2025 14:45:52 -0400 Subject: [PATCH 26/26] add comment --- .../org/elasticsearch/indices/IndexingMemoryController.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index 0f9b724d965bf..d5213f30dc63c 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -90,7 +90,10 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos /* Currently, indexing is throttled due to memory pressure in stateful/stateless or disk pressure in stateless. * This limits the number of indexing threads to 1 per shard. However, this might not be enough when the number of * shards that need indexing is larger than the number of threads. So we might opt to pause indexing completely. - * The default value for this setting is false, but it will be set to true in stateless. + * The default value for this setting is false, but it can be set to true in stateless. + * Note that this should only be enabled in stateless. In stateful clusters, where we have + * indexing replicas, if pause throttling gets enabled on replicas, it will indirectly + * pause the primary as well which might prevent us from relocating the primary shard. */ public static final Setting PAUSE_INDEXING_ON_THROTTLE = Setting.boolSetting( "indices.pause.on.throttle",