diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index ec52092730bb1..8b0e84105055c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -2371,7 +2371,7 @@ public record FlushResult(boolean flushPerformed, long generation) { } /** - * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine(Consumer)}. + * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}. * * In general, resetting the engine should be done with care, to consider any * in-progress operations and listeners (e.g., primary term and generation listeners). diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 28fd80cf5eafa..751f07b144d27 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,8 +44,6 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; -import org.elasticsearch.cluster.service.ClusterApplierService; -import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -153,7 +151,6 @@ import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -182,7 +179,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -247,15 +243,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // ensure happens-before relation between addRefreshListener() and postRecovery() private volatile SubscribableListener postRecoveryComplete; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - - // read/write lock for mutating the engine (lock ordering: closeMutex -> engineLock.writeLock -> mutex) - private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); - private Engine currentEngine = null; // must be accessed while holding engineLock + private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex + private final AtomicReference currentEngineReference = new AtomicReference<>(); final EngineFactory engineFactory; - // mutex for closing the shard - private final Object closeMutex = new Object(); - private final IndexingOperationListener indexingOperationListeners; private final GlobalCheckpointSyncer globalCheckpointSyncer; @@ -1652,20 +1643,15 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - synchronized (closeMutex) { - engineLock.readLock().lock(); - try { - // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - final Engine engine = getEngineOrNull(); - if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); - } - if (indexCommit == null) { - return store.getMetadata(null, true); - } - } finally { - engineLock.readLock().unlock(); + synchronized (engineMutex) { + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. + final Engine engine = getEngineOrNull(); + if (engine != null) { + indexCommit = engine.acquireLastIndexCommit(false); + } + if (indexCommit == null) { + return store.getMetadata(null, true); } } return store.getMetadata(indexCommit.getIndexCommit()); @@ -1820,63 +1806,39 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { - synchronized (closeMutex) { - Engine engineOrNull = null; + synchronized (engineMutex) { try { - // engine reference and shard state are changed under the engine write lock - engineLock.writeLock().lock(); - try { - try { - synchronized (mutex) { - changeState(IndexShardState.CLOSED, reason); - } - checkAndCallWaitForEngineOrClosedShardListeners(); - } finally { - engineOrNull = getAndSetCurrentEngine(null); - // downgrade to read lock for submitting the engine closing task - // (not strictly required because engine changes are no longer allowed after the state was changed to CLOSED) - engineLock.readLock().lock(); - } - } finally { - engineLock.writeLock().unlock(); + synchronized (mutex) { + changeState(IndexShardState.CLOSED, reason); } + checkAndCallWaitForEngineOrClosedShardListeners(); } finally { - assert engineLock.getReadHoldCount() > 0 : "hold the read lock when submitting the engine closing task"; - try { - final Engine engine = engineOrNull; - // When closeExecutor is EsExecutors.DIRECT_EXECUTOR_SERVICE, the following runnable will run within the current thread - // while the read lock is held, which is OK. When closeExecutor is a generic thread or the cluster state applier thread - // it will then run without any engine lock held, which is OK because no engine changes are allowed after the state is - // changed to CLOSED. - closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { - @Override - public void run() throws Exception { - try { - if (engine != null && flushEngine) { - assert engineLock.isWriteLockedByCurrentThread() == false : "do not flush under engine write lock"; - engine.flushAndClose(); - } - } finally { - // playing safe here and close the engine even if the above succeeds - close can be called multiple times - // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close( - engine, - globalCheckpointListeners, - refreshListeners, - pendingReplicationActions, - indexShardOperationPermits - ); + final Engine engine = this.currentEngineReference.getAndSet(null); + closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { + @Override + public void run() throws Exception { + try { + if (engine != null && flushEngine) { + engine.flushAndClose(); } + } finally { + // playing safe here and close the engine even if the above succeeds - close can be called multiple times + // Also closing refreshListeners to prevent us from accumulating any more listeners + IOUtils.close( + engine, + globalCheckpointListeners, + refreshListeners, + pendingReplicationActions, + indexShardOperationPermits + ); } + } - @Override - public String toString() { - return "IndexShard#close[" + shardId + "]"; - } - })); - } finally { - engineLock.readLock().unlock(); - } + @Override + public String toString() { + return "IndexShard#close[" + shardId + "]"; + } + })); } } } @@ -1925,7 +1887,7 @@ public void prepareForIndexRecovery() { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); - assert getEngineOrNull() == null; + assert currentEngineReference.get() == null; } /** @@ -2004,12 +1966,8 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; - engineLock.writeLock().lock(); - try { - verifyNotClosed(); - IOUtils.close(getAndSetCurrentEngine(null)); - } finally { - engineLock.writeLock().unlock(); + synchronized (engineMutex) { + IOUtils.close(currentEngineReference.getAndSet(null)); } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -2239,19 +2197,16 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - engineLock.writeLock().lock(); - try { - assert currentEngine == null : "engine is running"; + synchronized (engineMutex) { + assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); onNewEngine(newEngine); - getAndSetCurrentEngine(newEngine); + currentEngineReference.set(newEngine); // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); - } finally { - engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -2316,7 +2271,7 @@ private boolean assertLastestCommitUserData() throws IOException { } private void onNewEngine(Engine newEngine) { - assert engineLock.isWriteLockedByCurrentThread(); + assert Thread.holdsLock(engineMutex); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); @@ -2327,13 +2282,10 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; - engineLock.writeLock().lock(); - try { + synchronized (engineMutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(getAndSetCurrentEngine(null)); + IOUtils.close(currentEngineReference.getAndSet(null)); resetRecoveryStage(); - } finally { - engineLock.writeLock().unlock(); } } @@ -2342,7 +2294,7 @@ public void performRecoveryRestart() throws IOException { */ public void resetRecoveryStage() { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - assert getEngineOrNull() == null; + assert currentEngineReference.get() == null; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -2671,20 +2623,9 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - // This method can be called within the cluster state applier thread - if (engineLock.readLock().tryLock() == false) { - // Attempt to acquire a read lock failed: - // - the engine is closing, in which case we don't need to apply the updated index settings - // - otherwise the onSettingsChanged() should be called again after the new engine is created and the write lock is released - return; - } - try { - var engineOrNull = getCurrentEngine(true); - if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); - } - } finally { - engineLock.readLock().unlock(); + Engine engineOrNull = getEngineOrNull(); + if (engineOrNull != null) { + engineOrNull.onSettingsChanged(); } } @@ -3375,12 +3316,11 @@ private void doCheckIndex() throws IOException { } Engine getEngine() { - engineLock.readLock().lock(); - try { - return getCurrentEngine(false); - } finally { - engineLock.readLock().unlock(); + Engine engine = getEngineOrNull(); + if (engine == null) { + throw new AlreadyClosedException("engine is closed"); } + return engine; } /** @@ -3388,82 +3328,7 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - engineLock.readLock().lock(); - try { - return getCurrentEngine(true); - } finally { - engineLock.readLock().unlock(); - } - } - - private Engine getCurrentEngine(boolean allowNoEngine) { - assert engineLock.getReadHoldCount() > 0 || engineLock.isWriteLockedByCurrentThread(); - var engine = this.currentEngine; - if (engine == null && allowNoEngine == false) { - throw new AlreadyClosedException("engine is closed"); - } - return engine; - } - - private Engine getAndSetCurrentEngine(Engine newEngine) { - assert engineLock.isWriteLockedByCurrentThread(); - var previousEngine = this.currentEngine; - this.currentEngine = newEngine; - return previousEngine; - } - - /** - * Executes an operation while preventing the shard's engine instance to be reset during the execution. - * The operation might be executed with a {@code null} engine instance. The engine might be closed while the operation is executed. - * - * @param operation the operation to execute - * @return the result of the operation - * @param the type of the result - */ - public R withEngineOrNull(Function operation) { - return withEngine(operation, true); - } - - /** - * Executes an operation while preventing the shard's engine instance to be reset during the execution. - * If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The - * engine might be closed while the operation is executed. - * - * @param operation the operation to execute - * @return the result of the operation - * @param the type of the result - * @throws AlreadyClosedException if the current engine instance is {@code null}. - */ - public R withEngine(Function operation) { - return withEngine(operation, false); - } - - /** - * Executes an operation while preventing the shard's engine instance to be reset during the execution - * (see {@link #resetEngine(Consumer)}. - * NOTE: It does not prevent the engine to be closed by {@link #close(String, boolean, Executor, ActionListener)} though. - * The parameter {@code allowNoEngine} is used to allow the operation to be executed when the current engine instance is {@code null}. - * When {@code allowNoEngine} is set to {@code `false`} the method will throw an {@link AlreadyClosedException} if the current engine - * instance is {@code null}. - * - * @param operation the operation to execute - * @param allowNoEngine if the operation can be executed even if the current engine instance is {@code null} - * @return the result of the operation - * @param the type of the result - */ - private R withEngine(Function operation, boolean allowNoEngine) { - assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); - assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); - assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); - assert operation != null; - - engineLock.readLock().lock(); - try { - var engine = getCurrentEngine(allowNoEngine); - return operation.apply(engine); - } finally { - engineLock.readLock().unlock(); - } + return this.currentEngineReference.get(); } public void startRecovery( @@ -4461,52 +4326,17 @@ public void afterRefresh(boolean didRefresh) { * * In general, resetting the engine should be done with care, to consider any in-progress operations and listeners. * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. - * - * @param postResetNewEngineConsumer A consumer that will be called with the newly created engine after the reset - * is complete, allowing for post-reset operations on the new engine instance. - * The provided engine reference should not be retained by the consumer. */ - public void resetEngine(Consumer postResetNewEngineConsumer) { + public void resetEngine() { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { - engineLock.readLock().lock(); - var release = true; - try { + synchronized (engineMutex) { verifyNotClosed(); - // Preparing the engine for reset flushes and blocks for refresh: it should not be executed under the write lock because - // another thread might already be refreshing, and the refresh listeners in that thread will need to access to the engine - // using the read lock. If we were using the write lock here, it would deadlock. - currentEngine.prepareForEngineReset(); - engineLock.readLock().unlock(); - release = false; - - // Promote to write lock in order to swap engines - engineLock.writeLock().lock(); - Engine previousEngine = null; - try { - - // How do we ensure that no indexing operations have been processed since prepareForEngineReset() here? We're not - // blocking all operations when resetting the engine nor we are blocking flushes or force-merges. - - assert state != IndexShardState.CLOSED : "state has changed without holding engine write lock!"; - var newEngine = createEngine(newEngineConfig(replicationTracker)); - previousEngine = getAndSetCurrentEngine(newEngine); - postResetNewEngineConsumer.accept(newEngine); - onNewEngine(newEngine); - } finally { - engineLock.readLock().lock(); - try { - engineLock.writeLock().unlock(); - IOUtils.close(previousEngine); - } finally { - engineLock.readLock().unlock(); - } - } - } finally { - if (release) { - engineLock.readLock().unlock(); - } + getEngine().prepareForEngineReset(); + var newEngine = createEngine(newEngineConfig(replicationTracker)); + IOUtils.close(currentEngineReference.getAndSet(newEngine)); + onNewEngine(newEngine); } onSettingsChanged(); } catch (Exception e) { @@ -4531,8 +4361,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - engineLock.writeLock().lock(); - try { + synchronized (engineMutex) { verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. @@ -4547,52 +4376,41 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED ) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - engineLock.readLock().lock(); - try { + synchronized (engineMutex) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); - } finally { - engineLock.readLock().unlock(); } } @Override public IndexCommitRef acquireSafeIndexCommit() { - engineLock.readLock().lock(); - try { + synchronized (engineMutex) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } return newEngineReference.get().acquireSafeIndexCommit(); - } finally { - engineLock.readLock().unlock(); } } @Override public void close() throws IOException { Engine newEngine; - engineLock.readLock().lock(); - try { + synchronized (engineMutex) { newEngine = newEngineReference.get(); - if (newEngine == getCurrentEngine(true)) { + if (newEngine == currentEngineReference.get()) { // we successfully installed the new engine so do not close it. newEngine = null; } - } finally { - engineLock.readLock().unlock(); } IOUtils.close(super::close, newEngine); } }; - IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); + IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); - } finally { - engineLock.writeLock().unlock(); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, @@ -4604,15 +4422,12 @@ public void close() throws IOException { ); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); newEngineReference.get().refresh("reset_engine"); - engineLock.writeLock().lock(); - try { + synchronized (engineMutex) { verifyNotClosed(); - IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); + IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); - } finally { - engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9e203f179e6fc..5d689804f9cbb 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -67,7 +67,6 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedFunction; -import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -4595,7 +4594,7 @@ public void prepareForEngineReset() throws IOException {} var onAcquired = new PlainActionFuture(); indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L)); try (var permits = safeGet(onAcquired)) { - indexShard.resetEngine(newEngine -> {}); + indexShard.resetEngine(); } safeAwait(newEngineCreated); safeAwait(newEngineNotification); @@ -5095,67 +5094,6 @@ public void testRecordsForceMerges() throws IOException { closeShards(shard); } - public void testCloseShardWhileRetainingEngine() throws Exception { - final var primary = newStartedShard(true); - try { - final var release = new CountDownLatch(1); - final var hold = new PlainActionFuture(); - final var holdEngineThread = new Thread(() -> { - primary.withEngine(engine -> { - assertThat(engine, notNullValue()); - EngineTestCase.ensureOpen(engine); - hold.onResponse(engine); - safeAwait(release); - return null; - }); - }); - holdEngineThread.start(); - - final var secondReaderExecuting = new CountDownLatch(1); - final var closed = new CountDownLatch(1); - final var closeEngineThread = new Thread(() -> { - try { - safeGet(hold); - // Unfair ReentrantReadWriteLock would prioritize writers over readers to avoid starving writers, - // hence we need to wait to close the engine until the second reader has acquired the read lock before - // closing, otherwise the test would deadlock. - safeAwait(secondReaderExecuting); - closeShardNoCheck(primary); - assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); - closed.countDown(); - } catch (IOException e) { - throw new AssertionError(e); - } - }); - closeEngineThread.start(); - - final var retainedInstance = asInstanceOf(InternalEngine.class, safeGet(hold)); - assertSame(retainedInstance, primary.getEngineOrNull()); - assertThat(primary.state(), equalTo(IndexShardState.STARTED)); - primary.withEngineOrNull(engine -> { - secondReaderExecuting.countDown(); - assertSame(retainedInstance, engine); - EngineTestCase.ensureOpen(engine); - return null; - }); - - release.countDown(); - safeAwait(closed); - - assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); - assertThat(primary.getEngineOrNull(), nullValue()); - primary.withEngineOrNull(engine -> { - assertThat(engine, nullValue()); - return null; - }); - - holdEngineThread.join(); - closeEngineThread.join(); - } finally { - IOUtils.close(primary.store()); - } - } - public void testShardExposesWriteLoadStats() throws Exception { final IndexShard primary = newStartedShard( true,