99
1010package org .elasticsearch .index .engine ;
1111
12+ import org .apache .logging .log4j .LogManager ;
1213import org .apache .logging .log4j .Logger ;
1314import org .apache .lucene .codecs .perfield .PerFieldKnnVectorsFormat ;
1415import org .apache .lucene .index .ByteVectorValues ;
8788import org .elasticsearch .index .store .Store ;
8889import org .elasticsearch .index .translog .Translog ;
8990import org .elasticsearch .index .translog .TranslogStats ;
91+ import org .elasticsearch .indices .IndexingMemoryController ;
9092import org .elasticsearch .indices .recovery .RecoverySettings ;
9193import org .elasticsearch .search .suggest .completion .CompletionStats ;
9294import org .elasticsearch .threadpool .ThreadPool ;
108110import java .util .Set ;
109111import java .util .concurrent .CountDownLatch ;
110112import java .util .concurrent .ExecutionException ;
113+ import java .util .concurrent .Semaphore ;
111114import java .util .concurrent .TimeUnit ;
112115import java .util .concurrent .atomic .AtomicBoolean ;
113116import java .util .concurrent .locks .Condition ;
@@ -145,6 +148,7 @@ public abstract class Engine implements Closeable {
145148 protected final ReentrantLock failEngineLock = new ReentrantLock ();
146149 protected final SetOnce <Exception > failedEngine = new SetOnce <>();
147150 protected final boolean enableRecoverySource ;
151+ protected final boolean pauseIndexingOnThrottle ;
148152
149153 private final AtomicBoolean isClosing = new AtomicBoolean ();
150154 private final SubscribableListener <Void > drainOnCloseListener = new SubscribableListener <>();
@@ -176,6 +180,9 @@ protected Engine(EngineConfig engineConfig) {
176180 this .enableRecoverySource = RecoverySettings .INDICES_RECOVERY_SOURCE_ENABLED_SETTING .get (
177181 engineConfig .getIndexSettings ().getSettings ()
178182 );
183+ this .pauseIndexingOnThrottle = IndexingMemoryController .PAUSE_INDEXING_ON_THROTTLE .get (
184+ engineConfig .getIndexSettings ().getSettings ()
185+ );
179186 }
180187
181188 /**
@@ -444,12 +451,19 @@ public interface IndexCommitListener {
444451 * is enabled
445452 */
446453 protected static final class IndexThrottle {
454+ private static final Logger logger = LogManager .getLogger (IndexThrottle .class );
447455 private final CounterMetric throttleTimeMillisMetric = new CounterMetric ();
448456 private volatile long startOfThrottleNS ;
449457 private static final ReleasableLock NOOP_LOCK = new ReleasableLock (new NoOpLock ());
450- private final ReleasableLock lockReference = new ReleasableLock (new ReentrantLock ());
458+ private final PauseLock throttlingLock ;
459+ private final ReleasableLock lockReference ;
451460 private volatile ReleasableLock lock = NOOP_LOCK ;
452461
462+ public IndexThrottle (boolean pause ) {
463+ throttlingLock = new PauseLock (pause ? 0 : 1 );
464+ lockReference = new ReleasableLock (throttlingLock );
465+ }
466+
453467 public Releasable acquireThrottle () {
454468 return lock .acquire ();
455469 }
@@ -458,12 +472,15 @@ public Releasable acquireThrottle() {
458472 public void activate () {
459473 assert lock == NOOP_LOCK : "throttling activated while already active" ;
460474 startOfThrottleNS = System .nanoTime ();
475+ throttlingLock .throttle ();
461476 lock = lockReference ;
462477 }
463478
464479 /** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
465480 public void deactivate () {
466481 assert lock != NOOP_LOCK : "throttling deactivated but not active" ;
482+
483+ throttlingLock .unthrottle ();
467484 lock = NOOP_LOCK ;
468485
469486 assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS" ;
@@ -553,6 +570,58 @@ public Condition newCondition() {
553570 }
554571 }
555572
573+ /* A lock implementation that allows us to control how many threads can take the lock
574+ * In particular, this is used to set the number of allowed threads to 1 or 0
575+ * when index throttling is activated.
576+ */
577+ protected static final class PauseLock implements Lock {
578+ private final Semaphore semaphore = new Semaphore (Integer .MAX_VALUE );
579+ private final int allowThreads ;
580+
581+ public PauseLock (int allowThreads ) {
582+ this .allowThreads = allowThreads ;
583+ }
584+
585+ public void lock () {
586+ semaphore .acquireUninterruptibly ();
587+ }
588+
589+ @ Override
590+ public void lockInterruptibly () throws InterruptedException {
591+ semaphore .acquire ();
592+ }
593+
594+ @ Override
595+ public void unlock () {
596+ semaphore .release ();
597+ }
598+
599+ @ Override
600+ public boolean tryLock () {
601+ throw new UnsupportedOperationException ();
602+ }
603+
604+ @ Override
605+ public boolean tryLock (long time , TimeUnit unit ) throws InterruptedException {
606+ throw new UnsupportedOperationException ();
607+ }
608+
609+ @ Override
610+ public Condition newCondition () {
611+ throw new UnsupportedOperationException ();
612+ }
613+
614+ public void throttle () {
615+ assert semaphore .availablePermits () == Integer .MAX_VALUE ;
616+ semaphore .acquireUninterruptibly (Integer .MAX_VALUE - allowThreads );
617+ }
618+
619+ public void unthrottle () {
620+ assert semaphore .availablePermits () <= allowThreads ;
621+ semaphore .release (Integer .MAX_VALUE - allowThreads );
622+ }
623+ }
624+
556625 /**
557626 * Perform document index operation on the engine
558627 * @param index operation to perform
0 commit comments