178178import java .util .concurrent .atomic .AtomicInteger ;
179179import java .util .concurrent .atomic .AtomicLong ;
180180import java .util .concurrent .atomic .AtomicReference ;
181+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
181182import java .util .function .BiConsumer ;
182183import java .util .function .Consumer ;
183184import java .util .function .Function ;
@@ -240,8 +241,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
240241 // ensure happens-before relation between addRefreshListener() and postRecovery()
241242 private volatile SubscribableListener <Void > postRecoveryComplete ;
242243 private volatile long pendingPrimaryTerm ; // see JavaDocs for getPendingPrimaryTerm
243- private final Object engineMutex = new Object (); // lock ordering: engineMutex -> mutex
244- private final AtomicReference <Engine > currentEngineReference = new AtomicReference <>();
244+
245+ private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock (); // lock ordering: engineLock.writeLock -> mutex
246+ private Engine currentEngine = null ;
245247 final EngineFactory engineFactory ;
246248
247249 private final IndexingOperationListener indexingOperationListeners ;
@@ -1613,18 +1615,14 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16131615 Engine .IndexCommitRef indexCommit = null ;
16141616 store .incRef ();
16151617 try {
1616- synchronized (engineMutex ) {
1617- // if the engine is not running, we can access the store directly, but we need to make sure no one starts
1618- // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1619- final Engine engine = getEngineOrNull ();
1620- if (engine != null ) {
1621- indexCommit = engine .acquireLastIndexCommit (false );
1622- }
1623- if (indexCommit == null ) {
1624- return store .getMetadata (null , true );
1625- }
1618+ // if the engine is not running, we can access the store directly, but we need to make sure no one starts
1619+ // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1620+ indexCommit = withEngineOrNull (engine -> engine != null ? engine .acquireLastIndexCommit (false ) : null );
1621+ if (indexCommit != null ) {
1622+ return store .getMetadata (indexCommit .getIndexCommit ());
1623+ } else {
1624+ return store .getMetadata (null , true );
16261625 }
1627- return store .getMetadata (indexCommit .getIndexCommit ());
16281626 } finally {
16291627 store .decRef ();
16301628 IOUtils .close (indexCommit );
@@ -1776,14 +1774,15 @@ public CacheHelper getReaderCacheHelper() {
17761774 }
17771775
17781776 public void close (String reason , boolean flushEngine , Executor closeExecutor , ActionListener <Void > closeListener ) throws IOException {
1779- synchronized (engineMutex ) {
1777+ engineLock .writeLock ().lock ();
1778+ try {
1779+ synchronized (mutex ) {
1780+ changeState (IndexShardState .CLOSED , reason );
1781+ }
1782+ checkAndCallWaitForEngineOrClosedShardListeners ();
1783+ } finally {
17801784 try {
1781- synchronized (mutex ) {
1782- changeState (IndexShardState .CLOSED , reason );
1783- }
1784- checkAndCallWaitForEngineOrClosedShardListeners ();
1785- } finally {
1786- final Engine engine = this .currentEngineReference .getAndSet (null );
1785+ final Engine engine = getAndSetCurrentEngine (null );
17871786 closeExecutor .execute (ActionRunnable .run (closeListener , new CheckedRunnable <>() {
17881787 @ Override
17891788 public void run () throws Exception {
@@ -1809,6 +1808,8 @@ public String toString() {
18091808 return "IndexShard#close[" + shardId + "]" ;
18101809 }
18111810 }));
1811+ } finally {
1812+ engineLock .writeLock ().unlock ();
18121813 }
18131814 }
18141815 }
@@ -1857,7 +1858,7 @@ public void prepareForIndexRecovery() {
18571858 throw new IndexShardNotRecoveringException (shardId , state );
18581859 }
18591860 recoveryState .setStage (RecoveryState .Stage .INDEX );
1860- assert currentEngineReference . get () == null ;
1861+ assert this . currentEngine == null ;
18611862 }
18621863
18631864 /**
@@ -1936,8 +1937,11 @@ private void doLocalRecovery(
19361937 // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
19371938 .<Void >newForked (l -> ActionListener .runWithResource (ActionListener .assertOnce (l ), () -> () -> {
19381939 assert Thread .holdsLock (mutex ) == false : "must not hold the mutex here" ;
1939- synchronized (engineMutex ) {
1940- IOUtils .close (currentEngineReference .getAndSet (null ));
1940+ engineLock .writeLock ().lock ();
1941+ try {
1942+ IOUtils .close (getAndSetCurrentEngine (null ));
1943+ } finally {
1944+ engineLock .writeLock ().unlock ();
19411945 }
19421946 }, (recoveryCompleteListener , ignoredRef ) -> {
19431947 assert Thread .holdsLock (mutex ) == false : "must not hold the mutex here" ;
@@ -2167,16 +2171,19 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21672171 + recoveryState .getRecoverySource ()
21682172 + "] but got "
21692173 + getRetentionLeases ();
2170- synchronized (engineMutex ) {
2171- assert currentEngineReference .get () == null : "engine is running" ;
2174+ engineLock .writeLock ().lock ();
2175+ try {
2176+ assert this .currentEngine == null : "engine is running" ;
21722177 verifyNotClosed ();
21732178 // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
21742179 final Engine newEngine = createEngine (config );
21752180 onNewEngine (newEngine );
2176- currentEngineReference . set (newEngine );
2181+ getAndSetCurrentEngine (newEngine );
21772182 // We set active because we are now writing operations to the engine; this way,
21782183 // we can flush if we go idle after some time and become inactive.
21792184 active .set (true );
2185+ } finally {
2186+ engineLock .writeLock ().unlock ();
21802187 }
21812188 // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
21822189 // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
@@ -2241,7 +2248,7 @@ private boolean assertLastestCommitUserData() throws IOException {
22412248 }
22422249
22432250 private void onNewEngine (Engine newEngine ) {
2244- assert Thread . holdsLock ( engineMutex );
2251+ assert engineLock . isWriteLockedByCurrentThread ( );
22452252 refreshListeners .setCurrentRefreshLocationSupplier (newEngine ::getTranslogLastWriteLocation );
22462253 refreshListeners .setCurrentProcessedCheckpointSupplier (newEngine ::getProcessedLocalCheckpoint );
22472254 refreshListeners .setMaxIssuedSeqNoSupplier (newEngine ::getMaxSeqNo );
@@ -2252,10 +2259,13 @@ private void onNewEngine(Engine newEngine) {
22522259 */
22532260 public void performRecoveryRestart () throws IOException {
22542261 assert Thread .holdsLock (mutex ) == false : "restart recovery under mutex" ;
2255- synchronized (engineMutex ) {
2262+ engineLock .writeLock ().lock ();
2263+ try {
22562264 assert refreshListeners .pendingCount () == 0 : "we can't restart with pending listeners" ;
2257- IOUtils .close (currentEngineReference . getAndSet (null ));
2265+ IOUtils .close (getAndSetCurrentEngine (null ));
22582266 resetRecoveryStage ();
2267+ } finally {
2268+ engineLock .writeLock ().unlock ();
22592269 }
22602270 }
22612271
@@ -2264,7 +2274,7 @@ public void performRecoveryRestart() throws IOException {
22642274 */
22652275 public void resetRecoveryStage () {
22662276 assert routingEntry ().recoverySource ().getType () == RecoverySource .Type .PEER : "not a peer recovery [" + routingEntry () + "]" ;
2267- assert currentEngineReference . get () == null ;
2277+ assert this . currentEngine == null ;
22682278 if (state != IndexShardState .RECOVERING ) {
22692279 throw new IndexShardNotRecoveringException (shardId , state );
22702280 }
@@ -3286,19 +3296,80 @@ private void doCheckIndex() throws IOException {
32863296 }
32873297
32883298 Engine getEngine () {
3289- Engine engine = getEngineOrNull ();
3290- if (engine == null ) {
3291- throw new AlreadyClosedException ("engine is closed" );
3299+ engineLock .readLock ().lock ();
3300+ try {
3301+ return getCurrentEngine (false );
3302+ } finally {
3303+ engineLock .readLock ().unlock ();
32923304 }
3293- return engine ;
32943305 }
32953306
32963307 /**
32973308 * NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
32983309 * closed.
32993310 */
33003311 public Engine getEngineOrNull () {
3301- return this .currentEngineReference .get ();
3312+ engineLock .readLock ().lock ();
3313+ try {
3314+ return getCurrentEngine (true );
3315+ } finally {
3316+ engineLock .readLock ().unlock ();
3317+ }
3318+ }
3319+
3320+ public <R > R withEngine (Function <Engine , R > operation ) {
3321+ return withEngine (operation , false );
3322+ }
3323+
3324+ public <R > R withEngineOrNull (Function <Engine , R > operation ) {
3325+ return withEngine (operation , true );
3326+ }
3327+
3328+ private <R > R withEngine (Function <Engine , R > operation , boolean allowNoEngine ) {
3329+ assert operation != null ;
3330+ engineLock .readLock ().lock ();
3331+ var release = true ;
3332+ try {
3333+ var engine = getCurrentEngine (allowNoEngine );
3334+ if (engine != null && engine .isOperable () == false ) {
3335+ engineLock .readLock ().unlock ();
3336+ release = false ;
3337+ engineLock .writeLock ().lock ();
3338+ try {
3339+ engine = getCurrentEngine (allowNoEngine );
3340+ if (engine != null && engine .isOperable () == false ) {
3341+ resetEngine ();
3342+ engine = getCurrentEngine (allowNoEngine );
3343+ }
3344+ engineLock .readLock ().lock ();
3345+ release = true ;
3346+ } finally {
3347+ engineLock .writeLock ().unlock ();
3348+ }
3349+ }
3350+ assert engine == null || engine .isOperable ();
3351+ return operation .apply (engine );
3352+ } finally {
3353+ if (release ) {
3354+ engineLock .readLock ().unlock ();
3355+ }
3356+ }
3357+ }
3358+
3359+ private Engine getCurrentEngine (boolean allowNoEngine ) {
3360+ assert engineLock .getReadHoldCount () > 0 ;
3361+ var engine = this .currentEngine ;
3362+ if (engine == null && allowNoEngine == false ) {
3363+ throw new AlreadyClosedException ("engine is closed" );
3364+ }
3365+ return engine ;
3366+ }
3367+
3368+ private Engine getAndSetCurrentEngine (Engine newEngine ) {
3369+ assert engineLock .isWriteLockedByCurrentThread ();
3370+ var previousEngine = this .currentEngine ;
3371+ this .currentEngine = newEngine ;
3372+ return previousEngine ;
33023373 }
33033374
33043375 public void startRecovery (
@@ -4312,12 +4383,15 @@ public void resetEngine() {
43124383 assert Thread .holdsLock (mutex ) == false : "resetting engine under mutex" ;
43134384 assert waitForEngineOrClosedShardListeners .isDone ();
43144385 try {
4315- synchronized (engineMutex ) {
4386+ engineLock .writeLock ().lock (); // might already be held
4387+ try {
43164388 verifyNotClosed ();
43174389 getEngine ().prepareForEngineReset ();
43184390 var newEngine = createEngine (newEngineConfig (replicationTracker ));
4319- IOUtils .close (currentEngineReference . getAndSet (newEngine ));
4391+ IOUtils .close (getAndSetCurrentEngine (newEngine ));
43204392 onNewEngine (newEngine );
4393+ } finally {
4394+ engineLock .writeLock ().unlock ();
43214395 }
43224396 onSettingsChanged ();
43234397 } catch (Exception e ) {
@@ -4342,7 +4416,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43424416 SetOnce <Engine > newEngineReference = new SetOnce <>();
43434417 final long globalCheckpoint = getLastKnownGlobalCheckpoint ();
43444418 assert globalCheckpoint == getLastSyncedGlobalCheckpoint ();
4345- synchronized (engineMutex ) {
4419+ engineLock .writeLock ().lock ();
4420+ try {
43464421 verifyNotClosed ();
43474422 // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
43484423 // acquireXXXCommit and close works.
@@ -4357,41 +4432,52 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43574432 ) {
43584433 @ Override
43594434 public IndexCommitRef acquireLastIndexCommit (boolean flushFirst ) {
4360- synchronized (engineMutex ) {
4435+ engineLock .readLock ().lock ();
4436+ try {
43614437 if (newEngineReference .get () == null ) {
43624438 throw new AlreadyClosedException ("engine was closed" );
43634439 }
43644440 // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay
43654441 return newEngineReference .get ().acquireLastIndexCommit (false );
4442+ } finally {
4443+ engineLock .readLock ().unlock ();
43664444 }
43674445 }
43684446
43694447 @ Override
43704448 public IndexCommitRef acquireSafeIndexCommit () {
4371- synchronized (engineMutex ) {
4449+ engineLock .readLock ().lock ();
4450+ try {
43724451 if (newEngineReference .get () == null ) {
43734452 throw new AlreadyClosedException ("engine was closed" );
43744453 }
43754454 return newEngineReference .get ().acquireSafeIndexCommit ();
4455+ } finally {
4456+ engineLock .readLock ().unlock ();
43764457 }
43774458 }
43784459
43794460 @ Override
43804461 public void close () throws IOException {
43814462 Engine newEngine ;
4382- synchronized (engineMutex ) {
4463+ engineLock .readLock ().lock ();
4464+ try {
43834465 newEngine = newEngineReference .get ();
4384- if (newEngine == currentEngineReference . get ( )) {
4466+ if (newEngine == getCurrentEngine ( true )) {
43854467 // we successfully installed the new engine so do not close it.
43864468 newEngine = null ;
43874469 }
4470+ } finally {
4471+ engineLock .readLock ().unlock ();
43884472 }
43894473 IOUtils .close (super ::close , newEngine );
43904474 }
43914475 };
4392- IOUtils .close (currentEngineReference . getAndSet (readOnlyEngine ));
4476+ IOUtils .close (getAndSetCurrentEngine (readOnlyEngine ));
43934477 newEngineReference .set (engineFactory .newReadWriteEngine (newEngineConfig (replicationTracker )));
43944478 onNewEngine (newEngineReference .get ());
4479+ } finally {
4480+ engineLock .writeLock ().unlock ();
43954481 }
43964482 final Engine .TranslogRecoveryRunner translogRunner = (engine , snapshot ) -> runTranslogRecovery (
43974483 engine ,
@@ -4403,12 +4489,15 @@ public void close() throws IOException {
44034489 );
44044490 newEngineReference .get ().recoverFromTranslog (translogRunner , globalCheckpoint );
44054491 newEngineReference .get ().refresh ("reset_engine" );
4406- synchronized (engineMutex ) {
4492+ engineLock .writeLock ().lock ();
4493+ try {
44074494 verifyNotClosed ();
4408- IOUtils .close (currentEngineReference . getAndSet (newEngineReference .get ()));
4495+ IOUtils .close (getAndSetCurrentEngine (newEngineReference .get ()));
44094496 // We set active because we are now writing operations to the engine; this way,
44104497 // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
44114498 active .set (true );
4499+ } finally {
4500+ engineLock .writeLock ().unlock ();
44124501 }
44134502 // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
44144503 // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
0 commit comments