Skip to content

Commit 8fb0811

Browse files
committed
commit es changes
1 parent 8cb4493 commit 8fb0811

File tree

6 files changed

+103
-14
lines changed

6 files changed

+103
-14
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,22 +438,33 @@ protected static final class IndexThrottle {
438438
private volatile long startOfThrottleNS;
439439
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
440440
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
441+
private final ReleasableLock pauseLockReference = new ReleasableLock(new PauseLock());
441442
private volatile ReleasableLock lock = NOOP_LOCK;
442443

443444
public Releasable acquireThrottle() {
444445
return lock.acquire();
445446
}
446447

447448
/** Activate throttling, which switches the lock to be a real lock */
448-
public void activate() {
449+
public void activate(AtomicBoolean pauseIndexing) {
449450
assert lock == NOOP_LOCK : "throttling activated while already active";
450451
startOfThrottleNS = System.nanoTime();
451-
lock = lockReference;
452+
if (pauseIndexing.getAcquire()) {
453+
lock = pauseLockReference;
454+
}
455+
else {
456+
lock = lockReference;
457+
}
452458
}
453459

454460
/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
455461
public void deactivate() {
456462
assert lock != NOOP_LOCK : "throttling deactivated but not active";
463+
464+
if(lock == pauseLockReference)
465+
{
466+
lock.close();
467+
}
457468
lock = NOOP_LOCK;
458469

459470
assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
@@ -543,6 +554,67 @@ public Condition newCondition() {
543554
}
544555
}
545556

557+
/** A Lock implementation that forces the lock to wait on a condition */
558+
protected static final class PauseLock implements Lock {
559+
560+
private final ReentrantLock internalLock = new ReentrantLock();
561+
private final Condition condition = internalLock.newCondition();
562+
private volatile boolean paused = true;
563+
564+
@Override
565+
public void lock() {
566+
internalLock.lock();
567+
try {
568+
while (paused) {
569+
condition.await();
570+
}
571+
} catch (InterruptedException e) {
572+
Thread.currentThread().interrupt();
573+
throw new RuntimeException(e);
574+
} finally {
575+
internalLock.unlock();
576+
}
577+
}
578+
579+
@Override
580+
public void lockInterruptibly() throws InterruptedException {
581+
internalLock.lockInterruptibly();
582+
try {
583+
while (paused) {
584+
condition.await();
585+
}
586+
} finally {
587+
internalLock.unlock();
588+
}
589+
}
590+
591+
@Override
592+
public boolean tryLock() {
593+
return false;
594+
}
595+
596+
@Override
597+
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
598+
return false;
599+
}
600+
601+
@Override
602+
public void unlock() {
603+
internalLock.lock();
604+
try {
605+
paused = false;
606+
condition.signalAll();
607+
} finally {
608+
internalLock.unlock();
609+
}
610+
}
611+
612+
@Override
613+
public Condition newCondition() {
614+
return condition;
615+
}
616+
}
617+
546618
/**
547619
* Perform document index operation on the engine
548620
* @param index operation to perform
@@ -2173,7 +2245,7 @@ public interface Warmer {
21732245
* Request that this engine throttle incoming indexing requests to one thread.
21742246
* Must be matched by a later call to {@link #deactivateThrottling()}.
21752247
*/
2176-
public abstract void activateThrottling();
2248+
public abstract void activateThrottling(boolean pauseIndexing);
21772249

21782250
/**
21792251
* Reverses a previous {@link #activateThrottling} call.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,12 @@ public class InternalEngine extends Engine {
171171
private final CombinedDeletionPolicy combinedDeletionPolicy;
172172

173173
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
174-
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
174+
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttle
175175
// incoming indexing ops to a single thread:
176176
private final AtomicInteger throttleRequestCount = new AtomicInteger();
177+
// Should we throttle indexing ops by pausing them completely. Default value is false which will throttle
178+
// incoming indexing ops to a single thread.
179+
private final AtomicBoolean throttleShouldPauseIndexing = new AtomicBoolean(false);
177180
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
178181
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
179182
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
@@ -2822,11 +2825,15 @@ public void accept(ElasticsearchDirectoryReader reader, ElasticsearchDirectoryRe
28222825
}
28232826

28242827
@Override
2825-
public void activateThrottling() {
2828+
public void activateThrottling(boolean pauseIndexing) {
28262829
int count = throttleRequestCount.incrementAndGet();
2830+
if (pauseIndexing)
2831+
{
2832+
throttleShouldPauseIndexing.setRelease(true);
2833+
}
28272834
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
28282835
if (count == 1) {
2829-
throttle.activate();
2836+
throttle.activate(throttleShouldPauseIndexing);
28302837
}
28312838
}
28322839

@@ -2920,7 +2927,7 @@ protected synchronized void enableIndexingThrottling(int numRunningMerges, int n
29202927
numQueuedMerges,
29212928
configuredMaxMergeCount
29222929
);
2923-
InternalEngine.this.activateThrottling();
2930+
InternalEngine.this.activateThrottling(false);
29242931
}
29252932

29262933
@Override
@@ -2959,7 +2966,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {
29592966
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
29602967
if (isThrottling.getAndSet(true) == false) {
29612968
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
2962-
activateThrottling();
2969+
activateThrottling(false);
29632970
}
29642971
}
29652972
}

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ public SafeCommitInfo getSafeCommitInfo() {
502502
}
503503

504504
@Override
505-
public void activateThrottling() {}
505+
public void activateThrottling(boolean pauseIndexing) {}
506506

507507
@Override
508508
public void deactivateThrottling() {}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2730,14 +2730,23 @@ public IndexEventListener getIndexEventListener() {
27302730
return indexEventListener;
27312731
}
27322732

2733-
public void activateThrottling() {
2733+
/** Asks this shard to throttle indexing to one thread. If pauseIndexing is set to true, throttling will
2734+
* pause indexing completely.
2735+
*
2736+
* @param pauseIndexing pauses indexing completely when set to true, otherwise throttles to one thread.
2737+
*/
2738+
public void activateThrottling(boolean pauseIndexing) {
27342739
try {
2735-
getEngine().activateThrottling();
2740+
getEngine().activateThrottling(pauseIndexing);
27362741
} catch (AlreadyClosedException ex) {
27372742
// ignore
27382743
}
27392744
}
27402745

2746+
public void pauseIndexing() {
2747+
2748+
}
2749+
27412750
public void deactivateThrottling() {
27422751
try {
27432752
getEngine().deactivateThrottling();

server/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,10 @@ void forceCheck() {
232232
statusChecker.run();
233233
}
234234

235-
/** Asks this shard to throttle indexing to one thread */
235+
/** Asks this shard to throttle indexing to one thread. If pauseIndexing is set to true, throttling will
236+
* pause indexing completely. */
236237
protected void activateThrottling(IndexShard shard) {
237-
shard.activateThrottling();
238+
shard.activateThrottling(false);
238239
}
239240

240241
/** Asks this shard to stop throttling indexing to one thread */

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7007,7 +7007,7 @@ public void testIndexThrottling() throws Exception {
70077007
throw new RuntimeException(ex);
70087008
}
70097009
}).when(indexWithoutThrottlingCheck).startTime();
7010-
engine.activateThrottling();
7010+
engine.activateThrottling(false);
70117011
engine.index(indexWithThrottlingCheck);
70127012
engine.deactivateThrottling();
70137013
engine.index(indexWithoutThrottlingCheck);

0 commit comments

Comments
 (0)