Skip to content

Commit 2601960

Browse files
committed
commit
1 parent dfe639f commit 2601960

File tree

6 files changed

+122
-45
lines changed

6 files changed

+122
-45
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 69 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -455,33 +455,66 @@ protected static final class IndexThrottle {
455455
private final CounterMetric throttleTimeMillisMetric = new CounterMetric();
456456
private volatile long startOfThrottleNS;
457457
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
458-
private final PauseLock throttlingLock;
459-
private final ReleasableLock lockReference;
458+
// private final PauseLock throttlingLock;
459+
// private final ReleasableLock lockReference;
460+
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
461+
private final Lock pauseIndexingLock = new ReentrantLock();
462+
private final Condition pauseCondition = pauseIndexingLock.newCondition();
463+
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
464+
private volatile AtomicBoolean pauseThrottling = new AtomicBoolean();
465+
private final boolean pauseWhenThrottled;
460466
private volatile ReleasableLock lock = NOOP_LOCK;
461467

462468
public IndexThrottle(boolean pause) {
463-
throttlingLock = new PauseLock(pause ? 0 : 1);
464-
lockReference = new ReleasableLock(throttlingLock);
469+
pauseWhenThrottled = pause;
465470
}
466471

467472
public Releasable acquireThrottle() {
468-
return lock.acquire();
473+
var lockCopy = this.lock;
474+
if (lockCopy == pauseLockReference) {
475+
//try (pauseLockReference.acquire();)
476+
try (var ignored = pauseLockReference.acquire()) {
477+
// If throttling is activated and not temporarily paused
478+
while ((lock == pauseLockReference) && (pauseThrottling.getAcquire() == false)) {
479+
logger.trace("Waiting on pause indexing lock");
480+
pauseCondition.await();
481+
}
482+
} catch (InterruptedException e) {
483+
Thread.currentThread().interrupt();
484+
throw new RuntimeException(e);
485+
} finally {
486+
logger.trace("Acquired pause indexing lock");
487+
}
488+
return (() -> {});
489+
} else {
490+
return lockCopy.acquire();
491+
}
469492
}
470493

471494
/** Activate throttling, which switches the lock to be a real lock */
472495
public void activate() {
473496
assert lock == NOOP_LOCK : "throttling activated while already active";
474497
startOfThrottleNS = System.nanoTime();
475-
throttlingLock.throttle();
476-
lock = lockReference;
498+
if (pauseWhenThrottled) {
499+
lock = pauseLockReference;
500+
logger.trace("Activated index throttling pause");
501+
} else {
502+
lock = lockReference;
503+
}
477504
}
478505

479506
/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
480507
public void deactivate() {
481508
assert lock != NOOP_LOCK : "throttling deactivated but not active";
482509

483-
throttlingLock.unthrottle();
484510
lock = NOOP_LOCK;
511+
if (pauseWhenThrottled) {
512+
// Signal the threads that are waiting on pauseCondition
513+
try (Releasable releasableLock = pauseLockReference.acquire()) {
514+
pauseCondition.signalAll();
515+
}
516+
logger.trace("Deactivated index throttling pause");
517+
}
485518

486519
assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
487520
long throttleTimeNS = System.nanoTime() - startOfThrottleNS;
@@ -504,21 +537,34 @@ long getThrottleTimeInMillis() {
504537
return throttleTimeMillisMetric.count() + TimeValue.nsecToMSec(currentThrottleNS);
505538
}
506539

507-
void pauseThrottle() {
508-
if (isThrottled()) {
509-
throttlingLock.pauseThrottle();
540+
// Pause throttling to allow another task such as relocation to acquire all indexing permits
541+
public void suspendThrottle() {
542+
if (pauseWhenThrottled) {
543+
try (Releasable releasableLock = pauseLockReference.acquire()) {
544+
pauseThrottling.setRelease(true);
545+
pauseCondition.signalAll();
546+
}
510547
}
511-
// TODO: calculate pause time
512548
}
513549

514-
void unpauseThrottle() {
515-
throttlingLock.unPauseThrottle();
550+
// Reverse what was done in pauseThrottle()
551+
public void resumeThrottle() {
552+
if (pauseWhenThrottled) {
553+
try (Releasable releasableLock = pauseLockReference.acquire()) {
554+
pauseThrottling.setRelease(false);
555+
pauseCondition.signalAll();
556+
}
557+
}
516558
}
517559

518560
boolean isThrottled() {
519561
return lock != NOOP_LOCK;
520562
}
521563

564+
boolean isIndexingPaused() {
565+
return (pauseWhenThrottled && isThrottled());
566+
}
567+
522568
boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
523569
if (isThrottled()) {
524570
return lock.isHeldByCurrentThread();
@@ -627,31 +673,31 @@ public Condition newCondition() {
627673
}
628674

629675
public void throttle() {
630-
if(throttlePaused.get()) {
676+
if (throttlePaused.get()) {
631677
return;
632678
}
633-
assert semaphore.availablePermits() == Integer.MAX_VALUE : "Available permits should be " + Integer.MAX_VALUE
634-
+ " but is " + semaphore.availablePermits();
679+
assert semaphore.availablePermits() == Integer.MAX_VALUE
680+
: "Available permits should be " + Integer.MAX_VALUE + " but is " + semaphore.availablePermits();
635681
semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads);
636682
}
637683

638684
public void unthrottle() {
639685
unPauseThrottle();
640-
assert semaphore.availablePermits() <= allowThreads : "Available permits should be no greater than " + allowThreads
641-
+ " but is " + semaphore.availablePermits();
686+
assert semaphore.availablePermits() <= allowThreads
687+
: "Available permits should be no greater than " + allowThreads + " but is " + semaphore.availablePermits();
642688
semaphore.release(Integer.MAX_VALUE - allowThreads);
643689
}
644690

645691
// Pause throttling by allowing 1 thread to pass
646692
public void pauseThrottle() {
647693
// we only need to pause throttling if throttling allows no threads to pass i.e. allowThreads is 0
648-
if((allowThreads == 0) && throttlePaused.getAndSet(true) == false) {
694+
if ((allowThreads == 0) && throttlePaused.getAndSet(true) == false) {
649695
semaphore.release(1);
650696
}
651697
}
652698

653699
public void unPauseThrottle() {
654-
if(throttlePaused.getAndSet(false)) {
700+
if (throttlePaused.getAndSet(false)) {
655701
semaphore.acquireUninterruptibly(1);
656702
}
657703
}
@@ -2306,6 +2352,8 @@ public interface Warmer {
23062352
*/
23072353
public abstract void unPauseThrottling();
23082354

2355+
public abstract boolean isIndexingPaused();
2356+
23092357
/**
23102358
* This method replays translog to restore the Lucene index which might be reverted previously.
23112359
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2856,6 +2856,11 @@ public boolean isThrottled() {
28562856
return throttle.isThrottled();
28572857
}
28582858

2859+
@Override
2860+
public boolean isIndexingPaused() {
2861+
return throttle.isIndexingPaused();
2862+
}
2863+
28592864
boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
28602865
return throttle.throttleLockIsHeldByCurrentThread();
28612866
}
@@ -2933,7 +2938,7 @@ protected synchronized void enableIndexingThrottling(int numRunningMerges, int n
29332938
configuredMaxMergeCount
29342939
);
29352940
// System.out.println("Activate throttling enableIndexingThrottling");
2936-
InternalEngine.this.activateThrottling();
2941+
InternalEngine.this.activateThrottling();
29372942

29382943
}
29392944

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,11 @@ public void pauseThrottling() {}
513513
@Override
514514
public void unPauseThrottling() {}
515515

516+
@Override
517+
public boolean isIndexingPaused() {
518+
return (false);
519+
}
520+
516521
@Override
517522
public void trimUnreferencedTranslogFiles() {}
518523

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -879,8 +879,14 @@ public void onFailure(Exception e) {
879879
listener.onFailure(e);
880880
}
881881
}
882-
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
883-
// CancellableThreads and we want to be able to interrupt it
882+
},
883+
30L,
884+
TimeUnit.MINUTES,
885+
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
886+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
887+
this
888+
);
889+
884890
}
885891
}
886892

@@ -2752,16 +2758,26 @@ public void deactivateThrottling() {
27522758
}
27532759
}
27542760

2755-
public void pauseThrottling() {
2761+
public boolean pauseThrottling() {
27562762
Engine engine = getEngineOrNull();
2757-
final boolean throttled;
2763+
final boolean indexingPaused;
27582764
if (engine == null) {
2759-
throttled = false;
2765+
indexingPaused = false;
27602766
} else {
2761-
throttled = engine.isThrottled();
2767+
indexingPaused = engine.isIndexingPaused();
27622768
}
2763-
if (throttled) {
2769+
if (indexingPaused) {
27642770
engine.pauseThrottling();
2771+
return (true);
2772+
}
2773+
return (false);
2774+
}
2775+
2776+
public void unpauseThrottling() {
2777+
try {
2778+
getEngine().unPauseThrottling();
2779+
} catch (AlreadyClosedException ex) {
2780+
// ignore
27652781
}
27662782
}
27672783

@@ -3824,7 +3840,7 @@ private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, l
38243840
onPermitAcquired.onFailure(e);
38253841
});
38263842
try {
3827-
indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic());
3843+
indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic(), this);
38283844
} catch (Exception e) {
38293845
forceRefreshes.close();
38303846
throw e;

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,17 @@ public void blockOperations(
8484
final ActionListener<Releasable> onAcquired,
8585
final long timeout,
8686
final TimeUnit timeUnit,
87-
final Executor executor
88-
//@Nullable IndexShard indexShard
87+
final Executor executor,
88+
@Nullable IndexShard indexShard
8989
) {
9090
delayOperations();
9191
// If indexing is paused on the shard, unpause it so that any currently waiting task can
9292
// go ahead and release the indexing permit it holds.
93-
//if(indexShard && indexShard)
94-
93+
boolean throttlingPaused = indexShard.pauseThrottling();
9594
waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor);
95+
if (throttlingPaused) {
96+
indexShard.unpauseThrottling();
97+
}
9698
}
9799

98100
private void waitUntilBlocked(ActionListener<Releasable> onAcquired, long timeout, TimeUnit timeUnit, Executor executor) {

server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ public void testBlockIfClosed() {
193193
wrap(() -> { throw new IllegalArgumentException("fake error"); }),
194194
randomInt(10),
195195
TimeUnit.MINUTES,
196-
threadPool.generic()
196+
threadPool.generic(),
197+
null
197198
)
198199
);
199200
}
@@ -219,7 +220,7 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce
219220
blocked.set(true);
220221
blockAcquired.countDown();
221222
releaseBlock.await();
222-
}), 30, TimeUnit.MINUTES, threadPool.generic());
223+
}), 30, TimeUnit.MINUTES, threadPool.generic(), null);
223224
assertFalse(blocked.get());
224225
assertFalse(future.isDone());
225226
}
@@ -313,7 +314,7 @@ public void onFailure(Exception e) {
313314
throw new RuntimeException(e);
314315
}
315316
}
316-
}, blockReleased::countDown), 1, TimeUnit.MINUTES, threadPool.generic());
317+
}, blockReleased::countDown), 1, TimeUnit.MINUTES, threadPool.generic(), null);
317318
blockAcquired.await();
318319
return () -> {
319320
releaseBlock.countDown();
@@ -333,7 +334,7 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx
333334
blocked.set(true);
334335
blockAcquired.countDown();
335336
releaseBlock.await();
336-
}), 30, TimeUnit.MINUTES, threadPool.generic());
337+
}), 30, TimeUnit.MINUTES, threadPool.generic(), null);
337338
blockAcquired.await();
338339
assertTrue(blocked.get());
339340

@@ -381,7 +382,7 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE
381382
permits.blockOperations(wrap(() -> {
382383
onBlocked.set(true);
383384
blockedLatch.countDown();
384-
}), 30, TimeUnit.MINUTES, threadPool.generic());
385+
}), 30, TimeUnit.MINUTES, threadPool.generic(), null);
385386
assertFalse(onBlocked.get());
386387

387388
// if we submit another operation, it should be delayed
@@ -463,7 +464,7 @@ public void onFailure(Exception e) {
463464
permits.blockOperations(wrap(() -> {
464465
values.add(operations);
465466
operationLatch.countDown();
466-
}), 30, TimeUnit.MINUTES, threadPool.generic());
467+
}), 30, TimeUnit.MINUTES, threadPool.generic(), null);
467468
});
468469
blockingThread.start();
469470

@@ -540,7 +541,7 @@ public void testAsyncBlockOperationsOnRejection() {
540541
final var rejectingExecutor = threadPool.executor(REJECTING_EXECUTOR);
541542
rejectingExecutor.execute(threadBlock::actionGet);
542543
assertThat(
543-
safeAwaitFailure(Releasable.class, l -> permits.blockOperations(l, 1, TimeUnit.HOURS, rejectingExecutor)),
544+
safeAwaitFailure(Releasable.class, l -> permits.blockOperations(l, 1, TimeUnit.HOURS, rejectingExecutor, null)),
544545
instanceOf(EsRejectedExecutionException.class)
545546
);
546547

@@ -553,7 +554,7 @@ public void testAsyncBlockOperationsOnRejection() {
553554
}
554555

555556
// ensure that another block can still be acquired
556-
try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic()))) {
557+
try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic(), null))) {
557558
assertNotNull(block);
558559
}
559560
}
@@ -568,7 +569,7 @@ public void testAsyncBlockOperationsOnTimeout() {
568569
safeAwaitFailure(
569570
ElasticsearchTimeoutException.class,
570571
Releasable.class,
571-
f -> permits.blockOperations(f, 0, TimeUnit.SECONDS, threadPool.generic())
572+
f -> permits.blockOperations(f, 0, TimeUnit.SECONDS, threadPool.generic(), null)
572573
).getMessage()
573574
);
574575

@@ -582,7 +583,7 @@ public void testAsyncBlockOperationsOnTimeout() {
582583
}
583584

584585
// ensure that another block can still be acquired
585-
try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic()))) {
586+
try (Releasable block = safeAwait(l -> permits.blockOperations(l, 1, TimeUnit.HOURS, threadPool.generic(), null))) {
586587
assertNotNull(block);
587588
}
588589
}
@@ -613,7 +614,7 @@ public void onFailure(final Exception e) {
613614
reference.set(e);
614615
onFailureLatch.countDown();
615616
}
616-
}, 1, TimeUnit.MILLISECONDS, threadPool.generic());
617+
}, 1, TimeUnit.MILLISECONDS, threadPool.generic(), null);
617618
onFailureLatch.await();
618619
assertThat(reference.get(), hasToString(containsString("timeout while blocking operations")));
619620

0 commit comments

Comments
 (0)