@@ -240,8 +240,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
240240 // ensure happens-before relation between addRefreshListener() and postRecovery()
241241 private volatile SubscribableListener <Void > postRecoveryComplete ;
242242 private volatile long pendingPrimaryTerm ; // see JavaDocs for getPendingPrimaryTerm
243- private final Object engineMutex = new Object (); // lock ordering: engineMutex -> mutex
244- private final AtomicReference < Engine > currentEngineReference = new AtomicReference <> ();
243+
244+ private final EngineReferenceManager engineReferenceManager = new EngineReferenceManager ();
245245 final EngineFactory engineFactory ;
246246
247247 private final IndexingOperationListener indexingOperationListeners ;
@@ -1277,10 +1277,13 @@ private Engine.GetResult innerGet(Engine.Get get, boolean translogOnly, Function
12771277 if (indexSettings .getIndexVersionCreated ().isLegacyIndexVersion ()) {
12781278 throw new IllegalStateException ("get operations not allowed on a legacy index" );
12791279 }
1280- if (translogOnly ) {
1281- return getEngine ().getFromTranslog (get , mappingLookup , mapperService .documentParser (), searcherWrapper );
1280+ try (var engineRef = engineReferenceManager .getEngineRef ()) {
1281+ var engine = engineRef .getEngine ();
1282+ if (translogOnly ) {
1283+ return engine .getFromTranslog (get , mappingLookup , mapperService .documentParser (), searcherWrapper );
1284+ }
1285+ return engine .get (get , mappingLookup , mapperService .documentParser (), searcherWrapper );
12821286 }
1283- return getEngine ().get (get , mappingLookup , mapperService .documentParser (), searcherWrapper );
12841287 }
12851288
12861289 /**
@@ -1613,10 +1616,10 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
16131616 Engine .IndexCommitRef indexCommit = null ;
16141617 store .incRef ();
16151618 try {
1616- synchronized ( engineMutex ) {
1619+ try ( var engineRef = engineReferenceManager . getEngineRef () ) {
16171620 // if the engine is not running, we can access the store directly, but we need to make sure no one starts
16181621 // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1619- final Engine engine = getEngineOrNull ();
1622+ final Engine engine = engineRef . getEngineOrNull ();
16201623 if (engine != null ) {
16211624 indexCommit = engine .acquireLastIndexCommit (false );
16221625 }
@@ -1776,14 +1779,14 @@ public CacheHelper getReaderCacheHelper() {
17761779 }
17771780
17781781 public void close (String reason , boolean flushEngine , Executor closeExecutor , ActionListener <Void > closeListener ) throws IOException {
1779- synchronized ( engineMutex ) {
1782+ try ( var ignored = engineReferenceManager . acquireEngineLock () ) {
17801783 try {
17811784 synchronized (mutex ) {
17821785 changeState (IndexShardState .CLOSED , reason );
17831786 }
17841787 checkAndCallWaitForEngineOrClosedShardListeners ();
17851788 } finally {
1786- final Engine engine = this . currentEngineReference . getAndSet ( null );
1789+ final Engine engine = engineReferenceManager . getEngineAndSet (() -> null );
17871790 closeExecutor .execute (ActionRunnable .run (closeListener , new CheckedRunnable <>() {
17881791 @ Override
17891792 public void run () throws Exception {
@@ -1857,7 +1860,7 @@ public void prepareForIndexRecovery() {
18571860 throw new IndexShardNotRecoveringException (shardId , state );
18581861 }
18591862 recoveryState .setStage (RecoveryState .Stage .INDEX );
1860- assert currentEngineReference . get () == null ;
1863+ assert assertEngineReferenceIsNull ( "prepare for recovery, engine should be null" ) ;
18611864 }
18621865
18631866 /**
@@ -1936,8 +1939,8 @@ private void doLocalRecovery(
19361939 // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
19371940 .<Void >newForked (l -> ActionListener .runWithResource (ActionListener .assertOnce (l ), () -> () -> {
19381941 assert Thread .holdsLock (mutex ) == false : "must not hold the mutex here" ;
1939- synchronized ( engineMutex ) {
1940- IOUtils .close (currentEngineReference . getAndSet ( null ));
1942+ try ( var ignored = engineReferenceManager . acquireEngineLock () ) {
1943+ IOUtils .close (engineReferenceManager . getEngineAndSet (() -> null ));
19411944 }
19421945 }, (recoveryCompleteListener , ignoredRef ) -> {
19431946 assert Thread .holdsLock (mutex ) == false : "must not hold the mutex here" ;
@@ -2167,13 +2170,14 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
21672170 + recoveryState .getRecoverySource ()
21682171 + "] but got "
21692172 + getRetentionLeases ();
2170- synchronized ( engineMutex ) {
2171- assert currentEngineReference . get () == null : "engine is running" ;
2173+ try ( var ignored = engineReferenceManager . acquireEngineLock () ) {
2174+ assert assertEngineReferenceIsNull ( "engine is running" ) ;
21722175 verifyNotClosed ();
2173- // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2176+ // we must create a new engine under lock (see IndexShard#snapshotStoreMetadata).
21742177 final Engine newEngine = createEngine (config );
21752178 onNewEngine (newEngine );
2176- currentEngineReference .set (newEngine );
2179+ var previous = engineReferenceManager .getEngineAndSet (() -> newEngine );
2180+ assert previous == null ;
21772181 // We set active because we are now writing operations to the engine; this way,
21782182 // we can flush if we go idle after some time and become inactive.
21792183 active .set (true );
@@ -2241,7 +2245,7 @@ private boolean assertLastestCommitUserData() throws IOException {
22412245 }
22422246
22432247 private void onNewEngine (Engine newEngine ) {
2244- assert Thread . holdsLock ( engineMutex );
2248+ assert engineReferenceManager . isEngineLockHeldByCurrentThread ( );
22452249 refreshListeners .setCurrentRefreshLocationSupplier (newEngine ::getTranslogLastWriteLocation );
22462250 refreshListeners .setCurrentProcessedCheckpointSupplier (newEngine ::getProcessedLocalCheckpoint );
22472251 refreshListeners .setMaxIssuedSeqNoSupplier (newEngine ::getMaxSeqNo );
@@ -2252,9 +2256,9 @@ private void onNewEngine(Engine newEngine) {
22522256 */
22532257 public void performRecoveryRestart () throws IOException {
22542258 assert Thread .holdsLock (mutex ) == false : "restart recovery under mutex" ;
2255- synchronized ( engineMutex ) {
2259+ try ( var ignored = engineReferenceManager . acquireEngineLock () ) {
22562260 assert refreshListeners .pendingCount () == 0 : "we can't restart with pending listeners" ;
2257- IOUtils .close (currentEngineReference . getAndSet ( null ));
2261+ IOUtils .close (engineReferenceManager . getEngineAndSet (() -> null ));
22582262 resetRecoveryStage ();
22592263 }
22602264 }
@@ -2264,7 +2268,7 @@ public void performRecoveryRestart() throws IOException {
22642268 */
22652269 public void resetRecoveryStage () {
22662270 assert routingEntry ().recoverySource ().getType () == RecoverySource .Type .PEER : "not a peer recovery [" + routingEntry () + "]" ;
2267- assert currentEngineReference . get () == null ;
2271+ assert assertEngineReferenceIsNull ( "reset recovery stage, engine should be null" ) ;
22682272 if (state != IndexShardState .RECOVERING ) {
22692273 throw new IndexShardNotRecoveringException (shardId , state );
22702274 }
@@ -3298,7 +3302,7 @@ Engine getEngine() {
32983302 * closed.
32993303 */
33003304 public Engine getEngineOrNull () {
3301- return this . currentEngineReference .get ();
3305+ return engineReferenceManager .get ();
33023306 }
33033307
33043308 public void startRecovery (
@@ -4312,11 +4316,11 @@ public void resetEngine() {
43124316 assert Thread .holdsLock (mutex ) == false : "resetting engine under mutex" ;
43134317 assert waitForEngineOrClosedShardListeners .isDone ();
43144318 try {
4315- synchronized ( engineMutex ) {
4319+ try ( var ignored = engineReferenceManager . acquireEngineLock () ) {
43164320 verifyNotClosed ();
43174321 getEngine ().prepareForEngineReset ();
43184322 var newEngine = createEngine (newEngineConfig (replicationTracker ));
4319- IOUtils .close (currentEngineReference . getAndSet ( newEngine ));
4323+ IOUtils .close (engineReferenceManager . getEngineAndSet (() -> newEngine ));
43204324 onNewEngine (newEngine );
43214325 }
43224326 onSettingsChanged ();
@@ -4342,7 +4346,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43424346 SetOnce <Engine > newEngineReference = new SetOnce <>();
43434347 final long globalCheckpoint = getLastKnownGlobalCheckpoint ();
43444348 assert globalCheckpoint == getLastSyncedGlobalCheckpoint ();
4345- synchronized ( engineMutex ) {
4349+ try ( var ignored = engineReferenceManager . acquireEngineLock () ) {
43464350 verifyNotClosed ();
43474351 // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
43484352 // acquireXXXCommit and close works.
@@ -4357,7 +4361,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
43574361 ) {
43584362 @ Override
43594363 public IndexCommitRef acquireLastIndexCommit (boolean flushFirst ) {
4360- synchronized ( engineMutex ) {
4364+ try ( var ignored = engineReferenceManager . getEngineRef () ) {
43614365 if (newEngineReference .get () == null ) {
43624366 throw new AlreadyClosedException ("engine was closed" );
43634367 }
@@ -4368,7 +4372,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
43684372
43694373 @ Override
43704374 public IndexCommitRef acquireSafeIndexCommit () {
4371- synchronized ( engineMutex ) {
4375+ try ( var ignored = engineReferenceManager . getEngineRef () ) {
43724376 if (newEngineReference .get () == null ) {
43734377 throw new AlreadyClosedException ("engine was closed" );
43744378 }
@@ -4379,17 +4383,17 @@ public IndexCommitRef acquireSafeIndexCommit() {
43794383 @ Override
43804384 public void close () throws IOException {
43814385 Engine newEngine ;
4382- synchronized ( engineMutex ) {
4386+ try ( var engineRef = engineReferenceManager . getEngineRef () ) {
43834387 newEngine = newEngineReference .get ();
4384- if (newEngine == currentEngineReference . get ()) {
4388+ if (newEngine == engineRef . getEngineOrNull ()) {
43854389 // we successfully installed the new engine so do not close it.
43864390 newEngine = null ;
43874391 }
43884392 }
43894393 IOUtils .close (super ::close , newEngine );
43904394 }
43914395 };
4392- IOUtils .close (currentEngineReference . getAndSet ( readOnlyEngine ));
4396+ IOUtils .close (engineReferenceManager . getEngineAndSet (() -> readOnlyEngine ));
43934397 newEngineReference .set (engineFactory .newReadWriteEngine (newEngineConfig (replicationTracker )));
43944398 onNewEngine (newEngineReference .get ());
43954399 }
@@ -4403,9 +4407,9 @@ public void close() throws IOException {
44034407 );
44044408 newEngineReference .get ().recoverFromTranslog (translogRunner , globalCheckpoint );
44054409 newEngineReference .get ().refresh ("reset_engine" );
4406- synchronized ( engineMutex ) {
4410+ try ( var ignored = engineReferenceManager . acquireEngineLock () ) {
44074411 verifyNotClosed ();
4408- IOUtils .close (currentEngineReference . getAndSet (newEngineReference . get () ));
4412+ IOUtils .close (engineReferenceManager . getEngineAndSet (newEngineReference :: get ));
44094413 // We set active because we are now writing operations to the engine; this way,
44104414 // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
44114415 active .set (true );
@@ -4516,4 +4520,11 @@ public void ensureMutable(ActionListener<Void> listener) {
45164520 l .onResponse (null );
45174521 }));
45184522 }
4523+
4524+ private boolean assertEngineReferenceIsNull (String message ) {
4525+ // use accessor with no lock as this asserting method can be called anywhere,
4526+ // including under the refresh lock of the index reader.
4527+ assert engineReferenceManager .get () == null : message ;
4528+ return true ;
4529+ }
45194530}
0 commit comments