diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8b0e84105055c..ec52092730bb1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -2371,7 +2371,7 @@ public record FlushResult(boolean flushPerformed, long generation) { } /** - * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}. + * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine(Consumer)}. * * In general, resetting the engine should be done with care, to consider any * in-progress operations and listeners (e.g., primary term and generation listeners). diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bb091fa0c409b..4bff703d10e49 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,6 +44,8 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -151,6 +153,7 @@ import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -177,6 +180,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -241,10 +245,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // ensure happens-before relation between addRefreshListener() and postRecovery() private volatile SubscribableListener postRecoveryComplete; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex - private final AtomicReference currentEngineReference = new AtomicReference<>(); + + // read/write lock for mutating the engine (lock ordering: closeMutex -> engineLock.writeLock -> mutex) + private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); + private Engine currentEngine = null; // must be accessed while holding engineLock final EngineFactory engineFactory; + // mutex for closing the shard + private final Object closeMutex = new Object(); + private final IndexingOperationListener indexingOperationListeners; private final GlobalCheckpointSyncer globalCheckpointSyncer; @@ -1628,15 +1637,20 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - synchronized (engineMutex) { - // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - final Engine engine = getEngineOrNull(); - if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); - } - if (indexCommit == null) { - return store.getMetadata(null, true); + synchronized (closeMutex) { + engineLock.readLock().lock(); + try { + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. + final Engine engine = getEngineOrNull(); + if (engine != null) { + indexCommit = engine.acquireLastIndexCommit(false); + } + if (indexCommit == null) { + return store.getMetadata(null, true); + } + } finally { + engineLock.readLock().unlock(); } } return store.getMetadata(indexCommit.getIndexCommit()); @@ -1791,39 +1805,63 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { - synchronized (engineMutex) { + synchronized (closeMutex) { + Engine engineOrNull = null; try { - synchronized (mutex) { - changeState(IndexShardState.CLOSED, reason); + // engine reference and shard state are changed under the engine write lock + engineLock.writeLock().lock(); + try { + try { + synchronized (mutex) { + changeState(IndexShardState.CLOSED, reason); + } + checkAndCallWaitForEngineOrClosedShardListeners(); + } finally { + engineOrNull = getAndSetCurrentEngine(null); + // downgrade to read lock for submitting the engine closing task + // (not strictly required because engine changes are no longer allowed after the state was changed to CLOSED) + engineLock.readLock().lock(); + } + } finally { + engineLock.writeLock().unlock(); } - checkAndCallWaitForEngineOrClosedShardListeners(); } finally { - final Engine engine = this.currentEngineReference.getAndSet(null); - closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { - @Override - public void run() throws Exception { - try { - if (engine != null && flushEngine) { - engine.flushAndClose(); + assert engineLock.getReadHoldCount() > 0 : "hold the read lock when submitting the engine closing task"; + try { + final Engine engine = engineOrNull; + // When closeExecutor is EsExecutors.DIRECT_EXECUTOR_SERVICE, the following runnable will run within the current thread + // while the read lock is held, which is OK. When closeExecutor is a generic thread or the cluster state applier thread + // it will then run without any engine lock held, which is OK because no engine changes are allowed after the state is + // changed to CLOSED. + closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { + @Override + public void run() throws Exception { + try { + if (engine != null && flushEngine) { + assert engineLock.isWriteLockedByCurrentThread() == false : "do not flush under engine write lock"; + engine.flushAndClose(); + } + } finally { + // playing safe here and close the engine even if the above succeeds - close can be called multiple times + // Also closing refreshListeners to prevent us from accumulating any more listeners + IOUtils.close( + engine, + globalCheckpointListeners, + refreshListeners, + pendingReplicationActions, + indexShardOperationPermits + ); } - } finally { - // playing safe here and close the engine even if the above succeeds - close can be called multiple times - // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close( - engine, - globalCheckpointListeners, - refreshListeners, - pendingReplicationActions, - indexShardOperationPermits - ); } - } - @Override - public String toString() { - return "IndexShard#close[" + shardId + "]"; - } - })); + @Override + public String toString() { + return "IndexShard#close[" + shardId + "]"; + } + })); + } finally { + engineLock.readLock().unlock(); + } } } } @@ -1872,7 +1910,7 @@ public void prepareForIndexRecovery() { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); - assert currentEngineReference.get() == null; + assert getEngineOrNull() == null; } /** @@ -1951,8 +1989,12 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; - synchronized (engineMutex) { - IOUtils.close(currentEngineReference.getAndSet(null)); + engineLock.writeLock().lock(); + try { + verifyNotClosed(); + IOUtils.close(getAndSetCurrentEngine(null)); + } finally { + engineLock.writeLock().unlock(); } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -2182,16 +2224,19 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - synchronized (engineMutex) { - assert currentEngineReference.get() == null : "engine is running"; + engineLock.writeLock().lock(); + try { + assert currentEngine == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); onNewEngine(newEngine); - currentEngineReference.set(newEngine); + getAndSetCurrentEngine(newEngine); // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); + } finally { + engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -2256,7 +2301,7 @@ private boolean assertLastestCommitUserData() throws IOException { } private void onNewEngine(Engine newEngine) { - assert Thread.holdsLock(engineMutex); + assert engineLock.isWriteLockedByCurrentThread(); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); @@ -2267,10 +2312,13 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(currentEngineReference.getAndSet(null)); + IOUtils.close(getAndSetCurrentEngine(null)); resetRecoveryStage(); + } finally { + engineLock.writeLock().unlock(); } } @@ -2279,7 +2327,7 @@ public void performRecoveryRestart() throws IOException { */ public void resetRecoveryStage() { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - assert currentEngineReference.get() == null; + assert getEngineOrNull() == null; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -2608,9 +2656,20 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - Engine engineOrNull = getEngineOrNull(); - if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); + // This method can be called within the cluster state applier thread + if (engineLock.readLock().tryLock() == false) { + // Attempt to acquire a read lock failed: + // - the engine is closing, in which case we don't need to apply the updated index settings + // - otherwise the onSettingsChanged() should be called again after the new engine is created and the write lock is released + return; + } + try { + var engineOrNull = getCurrentEngine(true); + if (engineOrNull != null) { + engineOrNull.onSettingsChanged(); + } + } finally { + engineLock.readLock().unlock(); } } @@ -3301,11 +3360,12 @@ private void doCheckIndex() throws IOException { } Engine getEngine() { - Engine engine = getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine is closed"); + engineLock.readLock().lock(); + try { + return getCurrentEngine(false); + } finally { + engineLock.readLock().unlock(); } - return engine; } /** @@ -3313,7 +3373,82 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - return this.currentEngineReference.get(); + engineLock.readLock().lock(); + try { + return getCurrentEngine(true); + } finally { + engineLock.readLock().unlock(); + } + } + + private Engine getCurrentEngine(boolean allowNoEngine) { + assert engineLock.getReadHoldCount() > 0 || engineLock.isWriteLockedByCurrentThread(); + var engine = this.currentEngine; + if (engine == null && allowNoEngine == false) { + throw new AlreadyClosedException("engine is closed"); + } + return engine; + } + + private Engine getAndSetCurrentEngine(Engine newEngine) { + assert engineLock.isWriteLockedByCurrentThread(); + var previousEngine = this.currentEngine; + this.currentEngine = newEngine; + return previousEngine; + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution. + * The operation might be executed with a {@code null} engine instance. The engine might be closed while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + */ + public R withEngineOrNull(Function operation) { + return withEngine(operation, true); + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution. + * If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The + * engine might be closed while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + * @throws AlreadyClosedException if the current engine instance is {@code null}. + */ + public R withEngine(Function operation) { + return withEngine(operation, false); + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution + * (see {@link #resetEngine(Consumer)}. + * NOTE: It does not prevent the engine to be closed by {@link #close(String, boolean, Executor, ActionListener)} though. + * The parameter {@code allowNoEngine} is used to allow the operation to be executed when the current engine instance is {@code null}. + * When {@code allowNoEngine} is set to {@code `false`} the method will throw an {@link AlreadyClosedException} if the current engine + * instance is {@code null}. + * + * @param operation the operation to execute + * @param allowNoEngine if the operation can be executed even if the current engine instance is {@code null} + * @return the result of the operation + * @param the type of the result + */ + private R withEngine(Function operation, boolean allowNoEngine) { + assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); + assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); + assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); + assert operation != null; + + engineLock.readLock().lock(); + try { + var engine = getCurrentEngine(allowNoEngine); + return operation.apply(engine); + } finally { + engineLock.readLock().unlock(); + } } public void startRecovery( @@ -4306,17 +4441,52 @@ public void afterRefresh(boolean didRefresh) { * * In general, resetting the engine should be done with care, to consider any in-progress operations and listeners. * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. + * + * @param postResetNewEngineConsumer A consumer that will be called with the newly created engine after the reset + * is complete, allowing for post-reset operations on the new engine instance. + * The provided engine reference should not be retained by the consumer. */ - public void resetEngine() { + public void resetEngine(Consumer postResetNewEngineConsumer) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { - synchronized (engineMutex) { + engineLock.readLock().lock(); + var release = true; + try { verifyNotClosed(); - getEngine().prepareForEngineReset(); - var newEngine = createEngine(newEngineConfig(replicationTracker)); - IOUtils.close(currentEngineReference.getAndSet(newEngine)); - onNewEngine(newEngine); + // Preparing the engine for reset flushes and blocks for refresh: it should not be executed under the write lock because + // another thread might already be refreshing, and the refresh listeners in that thread will need to access to the engine + // using the read lock. If we were using the write lock here, it would deadlock. + currentEngine.prepareForEngineReset(); + engineLock.readLock().unlock(); + release = false; + + // Promote to write lock in order to swap engines + engineLock.writeLock().lock(); + Engine previousEngine = null; + try { + + // How do we ensure that no indexing operations have been processed since prepareForEngineReset() here? We're not + // blocking all operations when resetting the engine nor we are blocking flushes or force-merges. + + assert state != IndexShardState.CLOSED : "state has changed without holding engine write lock!"; + var newEngine = createEngine(newEngineConfig(replicationTracker)); + previousEngine = getAndSetCurrentEngine(newEngine); + postResetNewEngineConsumer.accept(newEngine); + onNewEngine(newEngine); + } finally { + engineLock.readLock().lock(); + try { + engineLock.writeLock().unlock(); + IOUtils.close(previousEngine); + } finally { + engineLock.readLock().unlock(); + } + } + } finally { + if (release) { + engineLock.readLock().unlock(); + } } onSettingsChanged(); } catch (Exception e) { @@ -4341,7 +4511,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. @@ -4356,41 +4527,52 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED ) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); + } finally { + engineLock.readLock().unlock(); } } @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } return newEngineReference.get().acquireSafeIndexCommit(); + } finally { + engineLock.readLock().unlock(); } } @Override public void close() throws IOException { Engine newEngine; - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { newEngine = newEngineReference.get(); - if (newEngine == currentEngineReference.get()) { + if (newEngine == getCurrentEngine(true)) { // we successfully installed the new engine so do not close it. newEngine = null; } + } finally { + engineLock.readLock().unlock(); } IOUtils.close(super::close, newEngine); } }; - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); + } finally { + engineLock.writeLock().unlock(); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, @@ -4402,12 +4584,15 @@ public void close() throws IOException { ); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); newEngineReference.get().refresh("reset_engine"); - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); + IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); + } finally { + engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c0a9f5d84fe3b..e79227c53a7c5 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -67,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -4593,7 +4594,7 @@ public void prepareForEngineReset() throws IOException {} var onAcquired = new PlainActionFuture(); indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L)); try (var permits = safeGet(onAcquired)) { - indexShard.resetEngine(); + indexShard.resetEngine(newEngine -> {}); } safeAwait(newEngineCreated); safeAwait(newEngineNotification); @@ -5093,6 +5094,67 @@ public void testRecordsForceMerges() throws IOException { closeShards(shard); } + public void testCloseShardWhileRetainingEngine() throws Exception { + final var primary = newStartedShard(true); + try { + final var release = new CountDownLatch(1); + final var hold = new PlainActionFuture(); + final var holdEngineThread = new Thread(() -> { + primary.withEngine(engine -> { + assertThat(engine, notNullValue()); + EngineTestCase.ensureOpen(engine); + hold.onResponse(engine); + safeAwait(release); + return null; + }); + }); + holdEngineThread.start(); + + final var secondReaderExecuting = new CountDownLatch(1); + final var closed = new CountDownLatch(1); + final var closeEngineThread = new Thread(() -> { + try { + safeGet(hold); + // Unfair ReentrantReadWriteLock would prioritize writers over readers to avoid starving writers, + // hence we need to wait to close the engine until the second reader has acquired the read lock before + // closing, otherwise the test would deadlock. + safeAwait(secondReaderExecuting); + closeShardNoCheck(primary); + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + closed.countDown(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + closeEngineThread.start(); + + final var retainedInstance = asInstanceOf(InternalEngine.class, safeGet(hold)); + assertSame(retainedInstance, primary.getEngineOrNull()); + assertThat(primary.state(), equalTo(IndexShardState.STARTED)); + primary.withEngineOrNull(engine -> { + secondReaderExecuting.countDown(); + assertSame(retainedInstance, engine); + EngineTestCase.ensureOpen(engine); + return null; + }); + + release.countDown(); + safeAwait(closed); + + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + assertThat(primary.getEngineOrNull(), nullValue()); + primary.withEngineOrNull(engine -> { + assertThat(engine, nullValue()); + return null; + }); + + holdEngineThread.join(); + closeEngineThread.join(); + } finally { + IOUtils.close(primary.store()); + } + } + public void testShardExposesWriteLoadStats() throws Exception { final IndexShard primary = newStartedShard( true,