Skip to content

Commit dfe639f

Browse files
committed
pause indexing and race condition diags
1 parent 9de75e6 commit dfe639f

File tree

7 files changed

+110
-15
lines changed

7 files changed

+110
-15
lines changed

muted-tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -444,9 +444,9 @@ tests:
444444
- class: org.elasticsearch.xpack.esql.qa.mixed.EsqlClientYamlIT
445445
method: test {p0=esql/120_profile/avg 8.14 or after}
446446
issue: https://github.com/elastic/elasticsearch/issues/127879
447-
- class: org.elasticsearch.indices.stats.IndexStatsIT
448-
method: testThrottleStats
449-
issue: https://github.com/elastic/elasticsearch/issues/126359
447+
#- class: org.elasticsearch.indices.stats.IndexStatsIT
448+
# method: testThrottleStats
449+
# issue: https://github.com/elastic/elasticsearch/issues/126359
450450
- class: org.elasticsearch.search.vectors.IVFKnnFloatVectorQueryTests
451451
method: testRandomWithFilter
452452
issue: https://github.com/elastic/elasticsearch/issues/127963

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,17 @@ long getThrottleTimeInMillis() {
504504
return throttleTimeMillisMetric.count() + TimeValue.nsecToMSec(currentThrottleNS);
505505
}
506506

507+
void pauseThrottle() {
508+
if (isThrottled()) {
509+
throttlingLock.pauseThrottle();
510+
}
511+
// TODO: calculate pause time
512+
}
513+
514+
void unpauseThrottle() {
515+
throttlingLock.unPauseThrottle();
516+
}
517+
507518
boolean isThrottled() {
508519
return lock != NOOP_LOCK;
509520
}
@@ -577,9 +588,13 @@ public Condition newCondition() {
577588
protected static final class PauseLock implements Lock {
578589
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
579590
private final int allowThreads;
591+
// If the lock is currently throttled, we might need to pause throttling
592+
// to let some threads pass when another task is trying to acquire indexing permits
593+
private volatile AtomicBoolean throttlePaused;
580594

581595
public PauseLock(int allowThreads) {
582596
this.allowThreads = allowThreads;
597+
throttlePaused = new AtomicBoolean(false);
583598
}
584599

585600
public void lock() {
@@ -612,14 +627,34 @@ public Condition newCondition() {
612627
}
613628

614629
public void throttle() {
615-
assert semaphore.availablePermits() == Integer.MAX_VALUE;
630+
if(throttlePaused.get()) {
631+
return;
632+
}
633+
assert semaphore.availablePermits() == Integer.MAX_VALUE : "Available permits should be " + Integer.MAX_VALUE
634+
+ " but is " + semaphore.availablePermits();
616635
semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads);
617636
}
618637

619638
public void unthrottle() {
620-
assert semaphore.availablePermits() <= allowThreads;
639+
unPauseThrottle();
640+
assert semaphore.availablePermits() <= allowThreads : "Available permits should be no greater than " + allowThreads
641+
+ " but is " + semaphore.availablePermits();
621642
semaphore.release(Integer.MAX_VALUE - allowThreads);
622643
}
644+
645+
// Pause throttling by allowing 1 thread to pass
646+
public void pauseThrottle() {
647+
// we only need to pause throttling if throttling allows no threads to pass i.e. allowThreads is 0
648+
if((allowThreads == 0) && throttlePaused.getAndSet(true) == false) {
649+
semaphore.release(1);
650+
}
651+
}
652+
653+
public void unPauseThrottle() {
654+
if(throttlePaused.getAndSet(false)) {
655+
semaphore.acquireUninterruptibly(1);
656+
}
657+
}
623658
}
624659

625660
/**
@@ -2259,6 +2294,18 @@ public interface Warmer {
22592294
*/
22602295
public abstract void deactivateThrottling();
22612296

2297+
/**
2298+
* If indexing is throttled to the point where it is paused completely,
2299+
* another task trying to get indexing permits might want to pause throttling
2300+
* by letting one thread pass at a time so that it does not get starved.
2301+
*/
2302+
public abstract void pauseThrottling();
2303+
2304+
/**
2305+
* Reverses a previous {@link #pauseThrottling} call.
2306+
*/
2307+
public abstract void unPauseThrottling();
2308+
22622309
/**
22632310
* This method replays translog to restore the Lucene index which might be reverted previously.
22642311
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2827,6 +2827,7 @@ public void activateThrottling() {
28272827
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
28282828
if (count == 1) {
28292829
throttle.activate();
2830+
logger.info("Activated throttling");
28302831
}
28312832
}
28322833

@@ -2836,9 +2837,20 @@ public void deactivateThrottling() {
28362837
assert count >= 0 : "invalid post-decrement throttleRequestCount=" + count;
28372838
if (count == 0) {
28382839
throttle.deactivate();
2840+
logger.info("Deactivated throttling");
28392841
}
28402842
}
28412843

2844+
@Override
2845+
public void pauseThrottling() {
2846+
throttle.pauseThrottle();
2847+
}
2848+
2849+
@Override
2850+
public void unPauseThrottling() {
2851+
throttle.unpauseThrottle();
2852+
}
2853+
28422854
@Override
28432855
public boolean isThrottled() {
28442856
return throttle.isThrottled();
@@ -2920,7 +2932,9 @@ protected synchronized void enableIndexingThrottling(int numRunningMerges, int n
29202932
numQueuedMerges,
29212933
configuredMaxMergeCount
29222934
);
2923-
InternalEngine.this.activateThrottling();
2935+
// System.out.println("Activate throttling enableIndexingThrottling");
2936+
InternalEngine.this.activateThrottling();
2937+
29242938
}
29252939

29262940
@Override
@@ -2931,7 +2945,9 @@ protected synchronized void disableIndexingThrottling(int numRunningMerges, int
29312945
numQueuedMerges,
29322946
configuredMaxMergeCount
29332947
);
2948+
// System.out.println("DeActivate throttling disableIndexingThrottling");
29342949
InternalEngine.this.deactivateThrottling();
2950+
29352951
}
29362952

29372953
@Override
@@ -2956,21 +2972,28 @@ private final class EngineConcurrentMergeScheduler extends ElasticsearchConcurre
29562972
@Override
29572973
public synchronized void beforeMerge(OnGoingMerge merge) {
29582974
int maxNumMerges = getMaxMergeCount();
2959-
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
2960-
if (isThrottling.getAndSet(true) == false) {
2961-
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
2962-
activateThrottling();
2975+
synchronized (isThrottling) {
2976+
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
2977+
if (isThrottling.getAndSet(true) == false) {
2978+
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
2979+
// System.out.println("Activate throttling beforeMerge");
2980+
activateThrottling();
2981+
}
29632982
}
29642983
}
29652984
}
29662985

29672986
@Override
29682987
public synchronized void afterMerge(OnGoingMerge merge) {
29692988
int maxNumMerges = getMaxMergeCount();
2970-
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
2971-
if (isThrottling.getAndSet(false)) {
2972-
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
2973-
deactivateThrottling();
2989+
synchronized (isThrottling) {
2990+
if (numMergesInFlight.decrementAndGet() <= maxNumMerges) {
2991+
if (isThrottling.getAndSet(false)) {
2992+
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
2993+
// System.out.println("DeActivate throttling afterMerge");
2994+
deactivateThrottling();
2995+
}
2996+
29742997
}
29752998
}
29762999
maybeFlushAfterMerge(merge);

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,12 @@ public void activateThrottling() {}
507507
@Override
508508
public void deactivateThrottling() {}
509509

510+
@Override
511+
public void pauseThrottling() {}
512+
513+
@Override
514+
public void unPauseThrottling() {}
515+
510516
@Override
511517
public void trimUnreferencedTranslogFiles() {}
512518

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,14 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg
189189
);
190190
}
191191

192-
private void checkMergeTaskThrottling() {
192+
private synchronized void checkMergeTaskThrottling() {
193193
long submittedMergesCount = submittedMergeTaskCount.get();
194194
long doneMergesCount = doneMergeTaskCount.get();
195195
int runningMergesCount = runningMergeTasks.size();
196196
int configuredMaxMergeCount = config.getMaxMergeCount();
197197
// both currently running and enqueued merge tasks are considered "active" for throttling purposes
198198
int activeMerges = (int) (submittedMergesCount - doneMergesCount);
199+
199200
if (activeMerges > configuredMaxMergeCount
200201
// only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load
201202
&& threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec()

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2752,6 +2752,19 @@ public void deactivateThrottling() {
27522752
}
27532753
}
27542754

2755+
public void pauseThrottling() {
2756+
Engine engine = getEngineOrNull();
2757+
final boolean throttled;
2758+
if (engine == null) {
2759+
throttled = false;
2760+
} else {
2761+
throttled = engine.isThrottled();
2762+
}
2763+
if (throttled) {
2764+
engine.pauseThrottling();
2765+
}
2766+
}
2767+
27552768
private void handleRefreshException(Exception e) {
27562769
if (e instanceof AlreadyClosedException) {
27572770
// ignore

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,13 @@ public void blockOperations(
8585
final long timeout,
8686
final TimeUnit timeUnit,
8787
final Executor executor
88+
//@Nullable IndexShard indexShard
8889
) {
8990
delayOperations();
91+
// If indexing is paused on the shard, unpause it so that any currently waiting task can
92+
// go ahead and release the indexing permit it holds.
93+
//if(indexShard && indexShard)
94+
9095
waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor);
9196
}
9297

0 commit comments

Comments
 (0)