Skip to content

Commit e8e48f9

Browse files
committed
change lock implementation
1 parent 8e2165c commit e8e48f9

File tree

2 files changed

+65
-51
lines changed

2 files changed

+65
-51
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 63 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import java.util.Set;
111111
import java.util.concurrent.CountDownLatch;
112112
import java.util.concurrent.ExecutionException;
113+
import java.util.concurrent.Semaphore;
113114
import java.util.concurrent.TimeUnit;
114115
import java.util.concurrent.atomic.AtomicBoolean;
115116
import java.util.concurrent.locks.Condition;
@@ -454,67 +455,32 @@ protected static final class IndexThrottle {
454455
private final CounterMetric throttleTimeMillisMetric = new CounterMetric();
455456
private volatile long startOfThrottleNS;
456457
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
457-
private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
458-
private final Lock pauseIndexingLock = new ReentrantLock();
459-
private final Condition pauseCondition = pauseIndexingLock.newCondition();
460-
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
461-
private volatile AtomicBoolean pauseIndexing = new AtomicBoolean();
458+
private final PauseLock throttlingLock;
459+
private final ReleasableLock lockReference;
462460
private volatile ReleasableLock lock = NOOP_LOCK;
463461

462+
public IndexThrottle(boolean pause) {
463+
throttlingLock = new PauseLock(pause ? 0 : 1);
464+
lockReference = new ReleasableLock(throttlingLock);
465+
}
466+
464467
public Releasable acquireThrottle() {
465-
if (lock == pauseLockReference) {
466-
pauseLockReference.acquire();
467-
try {
468-
while (pauseIndexing.getAcquire()) {
469-
logger.trace("Waiting on pause indexing lock" +
470-
"");
471-
pauseCondition.await();
472-
}
473-
} catch (InterruptedException e) {
474-
Thread.currentThread().interrupt();
475-
throw new RuntimeException(e);
476-
} finally {
477-
// System.out.println("Acquired pause indexing lock");
478-
logger.trace("Acquired pause indexing lock");
479-
}
480-
return pauseLockReference;
481-
} else {
482-
return lock.acquire();
483-
}
468+
return lock.acquire();
484469
}
485470

486471
/** Activate throttling, which switches the lock to be a real lock */
487472
public void activate() {
488473
assert lock == NOOP_LOCK : "throttling activated while already active";
489474
startOfThrottleNS = System.nanoTime();
475+
throttlingLock.throttle();
490476
lock = lockReference;
491477
}
492478

493-
public void activatePause() {
494-
assert lock == NOOP_LOCK : "throttling activated while already active";
495-
startOfThrottleNS = System.nanoTime();
496-
// System.out.println("Activate pause");
497-
pauseIndexing.setRelease(true);
498-
lock = pauseLockReference;
499-
}
500-
501479
/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
502480
public void deactivate() {
503481
assert lock != NOOP_LOCK : "throttling deactivated but not active";
504482

505-
if (lock == pauseLockReference) {
506-
logger.trace("Deactivate index throttling pause");
507-
508-
// Signal the threads that are waiting on pauseCondition
509-
pauseLockReference.acquire();
510-
try {
511-
// System.out.println("Deactivate pause");
512-
pauseIndexing.setRelease(false);
513-
pauseCondition.signalAll();
514-
} finally {
515-
pauseLockReference.close();
516-
}
517-
}
483+
throttlingLock.unthrottle();
518484
lock = NOOP_LOCK;
519485

520486
assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
@@ -604,6 +570,58 @@ public Condition newCondition() {
604570
}
605571
}
606572

573+
/* A lock implementation that allows us to control how many threads can take the lock
574+
* In particular, this is used to set the number of allowed threads to 1 or 0
575+
* when index throttling is activated.
576+
*/
577+
protected static final class PauseLock implements Lock {
578+
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
579+
private final int allowThreads;
580+
581+
public PauseLock(int allowThreads) {
582+
this.allowThreads = allowThreads;
583+
}
584+
585+
public void lock() {
586+
semaphore.acquireUninterruptibly();
587+
}
588+
589+
@Override
590+
public void lockInterruptibly() throws InterruptedException {
591+
semaphore.acquire();
592+
}
593+
594+
@Override
595+
public void unlock() {
596+
semaphore.release();
597+
}
598+
599+
@Override
600+
public boolean tryLock() {
601+
throw new UnsupportedOperationException();
602+
}
603+
604+
@Override
605+
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
606+
throw new UnsupportedOperationException();
607+
}
608+
609+
@Override
610+
public Condition newCondition() {
611+
throw new UnsupportedOperationException();
612+
}
613+
614+
public void throttle() {
615+
assert semaphore.availablePermits() == Integer.MAX_VALUE;
616+
semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads);
617+
}
618+
619+
public void unthrottle() {
620+
assert semaphore.availablePermits() <= allowThreads;
621+
semaphore.release(Integer.MAX_VALUE - allowThreads);
622+
}
623+
}
624+
607625
/**
608626
* Perform document index operation on the engine
609627
* @param index operation to perform

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public InternalEngine(EngineConfig engineConfig) {
260260
engineConfig.getThreadPoolMergeExecutorService()
261261
);
262262
scheduler = mergeScheduler.getMergeScheduler();
263-
throttle = new IndexThrottle();
263+
throttle = new IndexThrottle(pauseIndexingOnThrottle);
264264
try {
265265
store.trimUnsafeCommits(config().getTranslogConfig().getTranslogPath());
266266
translog = openTranslog(
@@ -2826,11 +2826,7 @@ public void activateThrottling() {
28262826
int count = throttleRequestCount.incrementAndGet();
28272827
assert count >= 1 : "invalid post-increment throttleRequestCount=" + count;
28282828
if (count == 1) {
2829-
if (pauseIndexingOnThrottle) {
2830-
throttle.activatePause();
2831-
} else {
2832-
throttle.activate();
2833-
}
2829+
throttle.activate();
28342830
}
28352831
}
28362832

0 commit comments

Comments
 (0)