From d290507ad5d8840d951dabbbef7a4a5577b3d050 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 1 Apr 2025 08:51:35 +0200 Subject: [PATCH 01/21] Revert "Revert "Use read/write engine lock to guard operations against resets (#124635)" (#125915)" This reverts commit 7fadeeb7c9b168db3a2a99f4ca3e4c7db41e8044. --- .../elasticsearch/index/engine/Engine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 325 ++++++++++++++---- .../index/shard/IndexShardTests.java | 64 +++- 3 files changed, 319 insertions(+), 72 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8b0e84105055c..ec52092730bb1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -2371,7 +2371,7 @@ public record FlushResult(boolean flushPerformed, long generation) { } /** - * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}. + * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine(Consumer)}. * * In general, resetting the engine should be done with care, to consider any * in-progress operations and listeners (e.g., primary term and generation listeners). diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bf2e20b70b441..6bf0a8e799347 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,6 +44,8 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -151,6 +153,7 @@ import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -179,6 +182,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -243,10 +247,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // ensure happens-before relation between addRefreshListener() and postRecovery() private volatile SubscribableListener postRecoveryComplete; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex - private final AtomicReference currentEngineReference = new AtomicReference<>(); + + // read/write lock for mutating the engine (lock ordering: closeMutex -> engineLock.writeLock -> mutex) + private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); + private Engine currentEngine = null; // must be accessed while holding engineLock final EngineFactory engineFactory; + // mutex for closing the shard + private final Object closeMutex = new Object(); + private final IndexingOperationListener indexingOperationListeners; private final GlobalCheckpointSyncer globalCheckpointSyncer; @@ -1643,15 +1652,20 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - synchronized (engineMutex) { - // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - final Engine engine = getEngineOrNull(); - if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); - } - if (indexCommit == null) { - return store.getMetadata(null, true); + synchronized (closeMutex) { + engineLock.readLock().lock(); + try { + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. + final Engine engine = getEngineOrNull(); + if (engine != null) { + indexCommit = engine.acquireLastIndexCommit(false); + } + if (indexCommit == null) { + return store.getMetadata(null, true); + } + } finally { + engineLock.readLock().unlock(); } } return store.getMetadata(indexCommit.getIndexCommit()); @@ -1806,39 +1820,63 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { - synchronized (engineMutex) { + synchronized (closeMutex) { + Engine engineOrNull = null; try { - synchronized (mutex) { - changeState(IndexShardState.CLOSED, reason); + // engine reference and shard state are changed under the engine write lock + engineLock.writeLock().lock(); + try { + try { + synchronized (mutex) { + changeState(IndexShardState.CLOSED, reason); + } + checkAndCallWaitForEngineOrClosedShardListeners(); + } finally { + engineOrNull = getAndSetCurrentEngine(null); + // downgrade to read lock for submitting the engine closing task + // (not strictly required because engine changes are no longer allowed after the state was changed to CLOSED) + engineLock.readLock().lock(); + } + } finally { + engineLock.writeLock().unlock(); } - checkAndCallWaitForEngineOrClosedShardListeners(); } finally { - final Engine engine = this.currentEngineReference.getAndSet(null); - closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { - @Override - public void run() throws Exception { - try { - if (engine != null && flushEngine) { - engine.flushAndClose(); + assert engineLock.getReadHoldCount() > 0 : "hold the read lock when submitting the engine closing task"; + try { + final Engine engine = engineOrNull; + // When closeExecutor is EsExecutors.DIRECT_EXECUTOR_SERVICE, the following runnable will run within the current thread + // while the read lock is held, which is OK. When closeExecutor is a generic thread or the cluster state applier thread + // it will then run without any engine lock held, which is OK because no engine changes are allowed after the state is + // changed to CLOSED. + closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { + @Override + public void run() throws Exception { + try { + if (engine != null && flushEngine) { + assert engineLock.isWriteLockedByCurrentThread() == false : "do not flush under engine write lock"; + engine.flushAndClose(); + } + } finally { + // playing safe here and close the engine even if the above succeeds - close can be called multiple times + // Also closing refreshListeners to prevent us from accumulating any more listeners + IOUtils.close( + engine, + globalCheckpointListeners, + refreshListeners, + pendingReplicationActions, + indexShardOperationPermits + ); } - } finally { - // playing safe here and close the engine even if the above succeeds - close can be called multiple times - // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close( - engine, - globalCheckpointListeners, - refreshListeners, - pendingReplicationActions, - indexShardOperationPermits - ); } - } - @Override - public String toString() { - return "IndexShard#close[" + shardId + "]"; - } - })); + @Override + public String toString() { + return "IndexShard#close[" + shardId + "]"; + } + })); + } finally { + engineLock.readLock().unlock(); + } } } } @@ -1887,7 +1925,7 @@ public void prepareForIndexRecovery() { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); - assert currentEngineReference.get() == null; + assert getEngineOrNull() == null; } /** @@ -1966,8 +2004,12 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; - synchronized (engineMutex) { - IOUtils.close(currentEngineReference.getAndSet(null)); + engineLock.writeLock().lock(); + try { + verifyNotClosed(); + IOUtils.close(getAndSetCurrentEngine(null)); + } finally { + engineLock.writeLock().unlock(); } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -2197,16 +2239,19 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - synchronized (engineMutex) { - assert currentEngineReference.get() == null : "engine is running"; + engineLock.writeLock().lock(); + try { + assert currentEngine == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); onNewEngine(newEngine); - currentEngineReference.set(newEngine); + getAndSetCurrentEngine(newEngine); // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); + } finally { + engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -2271,7 +2316,7 @@ private boolean assertLastestCommitUserData() throws IOException { } private void onNewEngine(Engine newEngine) { - assert Thread.holdsLock(engineMutex); + assert engineLock.isWriteLockedByCurrentThread(); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); @@ -2282,10 +2327,13 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(currentEngineReference.getAndSet(null)); + IOUtils.close(getAndSetCurrentEngine(null)); resetRecoveryStage(); + } finally { + engineLock.writeLock().unlock(); } } @@ -2294,7 +2342,7 @@ public void performRecoveryRestart() throws IOException { */ public void resetRecoveryStage() { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - assert currentEngineReference.get() == null; + assert getEngineOrNull() == null; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -2623,9 +2671,20 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - Engine engineOrNull = getEngineOrNull(); - if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); + // This method can be called within the cluster state applier thread + if (engineLock.readLock().tryLock() == false) { + // Attempt to acquire a read lock failed: + // - the engine is closing, in which case we don't need to apply the updated index settings + // - otherwise the onSettingsChanged() should be called again after the new engine is created and the write lock is released + return; + } + try { + var engineOrNull = getCurrentEngine(true); + if (engineOrNull != null) { + engineOrNull.onSettingsChanged(); + } + } finally { + engineLock.readLock().unlock(); } } @@ -3316,11 +3375,12 @@ private void doCheckIndex() throws IOException { } Engine getEngine() { - Engine engine = getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine is closed"); + engineLock.readLock().lock(); + try { + return getCurrentEngine(false); + } finally { + engineLock.readLock().unlock(); } - return engine; } /** @@ -3328,7 +3388,82 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - return this.currentEngineReference.get(); + engineLock.readLock().lock(); + try { + return getCurrentEngine(true); + } finally { + engineLock.readLock().unlock(); + } + } + + private Engine getCurrentEngine(boolean allowNoEngine) { + assert engineLock.getReadHoldCount() > 0 || engineLock.isWriteLockedByCurrentThread(); + var engine = this.currentEngine; + if (engine == null && allowNoEngine == false) { + throw new AlreadyClosedException("engine is closed"); + } + return engine; + } + + private Engine getAndSetCurrentEngine(Engine newEngine) { + assert engineLock.isWriteLockedByCurrentThread(); + var previousEngine = this.currentEngine; + this.currentEngine = newEngine; + return previousEngine; + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution. + * The operation might be executed with a {@code null} engine instance. The engine might be closed while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + */ + public R withEngineOrNull(Function operation) { + return withEngine(operation, true); + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution. + * If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The + * engine might be closed while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + * @throws AlreadyClosedException if the current engine instance is {@code null}. + */ + public R withEngine(Function operation) { + return withEngine(operation, false); + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution + * (see {@link #resetEngine(Consumer)}. + * NOTE: It does not prevent the engine to be closed by {@link #close(String, boolean, Executor, ActionListener)} though. + * The parameter {@code allowNoEngine} is used to allow the operation to be executed when the current engine instance is {@code null}. + * When {@code allowNoEngine} is set to {@code `false`} the method will throw an {@link AlreadyClosedException} if the current engine + * instance is {@code null}. + * + * @param operation the operation to execute + * @param allowNoEngine if the operation can be executed even if the current engine instance is {@code null} + * @return the result of the operation + * @param the type of the result + */ + private R withEngine(Function operation, boolean allowNoEngine) { + assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); + assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); + assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); + assert operation != null; + + engineLock.readLock().lock(); + try { + var engine = getCurrentEngine(allowNoEngine); + return operation.apply(engine); + } finally { + engineLock.readLock().unlock(); + } } public void startRecovery( @@ -4326,17 +4461,52 @@ public void afterRefresh(boolean didRefresh) { * * In general, resetting the engine should be done with care, to consider any in-progress operations and listeners. * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. + * + * @param postResetNewEngineConsumer A consumer that will be called with the newly created engine after the reset + * is complete, allowing for post-reset operations on the new engine instance. + * The provided engine reference should not be retained by the consumer. */ - public void resetEngine() { + public void resetEngine(Consumer postResetNewEngineConsumer) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { - synchronized (engineMutex) { + engineLock.readLock().lock(); + var release = true; + try { verifyNotClosed(); - getEngine().prepareForEngineReset(); - var newEngine = createEngine(newEngineConfig(replicationTracker)); - IOUtils.close(currentEngineReference.getAndSet(newEngine)); - onNewEngine(newEngine); + // Preparing the engine for reset flushes and blocks for refresh: it should not be executed under the write lock because + // another thread might already be refreshing, and the refresh listeners in that thread will need to access to the engine + // using the read lock. If we were using the write lock here, it would deadlock. + currentEngine.prepareForEngineReset(); + engineLock.readLock().unlock(); + release = false; + + // Promote to write lock in order to swap engines + engineLock.writeLock().lock(); + Engine previousEngine = null; + try { + + // How do we ensure that no indexing operations have been processed since prepareForEngineReset() here? We're not + // blocking all operations when resetting the engine nor we are blocking flushes or force-merges. + + assert state != IndexShardState.CLOSED : "state has changed without holding engine write lock!"; + var newEngine = createEngine(newEngineConfig(replicationTracker)); + previousEngine = getAndSetCurrentEngine(newEngine); + postResetNewEngineConsumer.accept(newEngine); + onNewEngine(newEngine); + } finally { + engineLock.readLock().lock(); + try { + engineLock.writeLock().unlock(); + IOUtils.close(previousEngine); + } finally { + engineLock.readLock().unlock(); + } + } + } finally { + if (release) { + engineLock.readLock().unlock(); + } } onSettingsChanged(); } catch (Exception e) { @@ -4361,7 +4531,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { verifyNotClosed(); // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. @@ -4376,41 +4547,52 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED ) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); + } finally { + engineLock.readLock().unlock(); } } @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); } return newEngineReference.get().acquireSafeIndexCommit(); + } finally { + engineLock.readLock().unlock(); } } @Override public void close() throws IOException { Engine newEngine; - synchronized (engineMutex) { + engineLock.readLock().lock(); + try { newEngine = newEngineReference.get(); - if (newEngine == currentEngineReference.get()) { + if (newEngine == getCurrentEngine(true)) { // we successfully installed the new engine so do not close it. newEngine = null; } + } finally { + engineLock.readLock().unlock(); } IOUtils.close(super::close, newEngine); } }; - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); + } finally { + engineLock.writeLock().unlock(); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, @@ -4422,12 +4604,15 @@ public void close() throws IOException { ); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); newEngineReference.get().refresh("reset_engine"); - synchronized (engineMutex) { + engineLock.writeLock().lock(); + try { verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); + IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); + } finally { + engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5d689804f9cbb..9e203f179e6fc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -67,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -4594,7 +4595,7 @@ public void prepareForEngineReset() throws IOException {} var onAcquired = new PlainActionFuture(); indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L)); try (var permits = safeGet(onAcquired)) { - indexShard.resetEngine(); + indexShard.resetEngine(newEngine -> {}); } safeAwait(newEngineCreated); safeAwait(newEngineNotification); @@ -5094,6 +5095,67 @@ public void testRecordsForceMerges() throws IOException { closeShards(shard); } + public void testCloseShardWhileRetainingEngine() throws Exception { + final var primary = newStartedShard(true); + try { + final var release = new CountDownLatch(1); + final var hold = new PlainActionFuture(); + final var holdEngineThread = new Thread(() -> { + primary.withEngine(engine -> { + assertThat(engine, notNullValue()); + EngineTestCase.ensureOpen(engine); + hold.onResponse(engine); + safeAwait(release); + return null; + }); + }); + holdEngineThread.start(); + + final var secondReaderExecuting = new CountDownLatch(1); + final var closed = new CountDownLatch(1); + final var closeEngineThread = new Thread(() -> { + try { + safeGet(hold); + // Unfair ReentrantReadWriteLock would prioritize writers over readers to avoid starving writers, + // hence we need to wait to close the engine until the second reader has acquired the read lock before + // closing, otherwise the test would deadlock. + safeAwait(secondReaderExecuting); + closeShardNoCheck(primary); + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + closed.countDown(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + closeEngineThread.start(); + + final var retainedInstance = asInstanceOf(InternalEngine.class, safeGet(hold)); + assertSame(retainedInstance, primary.getEngineOrNull()); + assertThat(primary.state(), equalTo(IndexShardState.STARTED)); + primary.withEngineOrNull(engine -> { + secondReaderExecuting.countDown(); + assertSame(retainedInstance, engine); + EngineTestCase.ensureOpen(engine); + return null; + }); + + release.countDown(); + safeAwait(closed); + + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + assertThat(primary.getEngineOrNull(), nullValue()); + primary.withEngineOrNull(engine -> { + assertThat(engine, nullValue()); + return null; + }); + + holdEngineThread.join(); + closeEngineThread.join(); + } finally { + IOUtils.close(primary.store()); + } + } + public void testShardExposesWriteLoadStats() throws Exception { final IndexShard primary = newStartedShard( true, From 87e403f56a1db5c23ceb774df4f6657f67170440 Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 2 Apr 2025 15:44:06 +0200 Subject: [PATCH 02/21] add EngineReadWriteLock --- .../indices/IndexingMemoryControllerIT.java | 3 +- .../index/engine/EngineConfig.java | 10 ++- .../index/engine/EngineReadWriteLock.java | 72 +++++++++++++++++++ .../elasticsearch/index/shard/IndexShard.java | 11 +-- .../index/engine/InternalEngineTests.java | 6 +- .../index/shard/IndexShardTests.java | 3 +- .../index/shard/RefreshListenersTests.java | 4 +- .../index/engine/EngineTestCase.java | 15 ++-- .../index/engine/FollowingEngineTests.java | 4 +- 9 files changed, 111 insertions(+), 17 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/EngineReadWriteLock.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 0ac8c4d0b6fd4..98179c512cbe7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -84,7 +84,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineLock() ); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 1ef42cdb922c3..c9425aa8d7504 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -146,6 +146,8 @@ public Supplier retentionLeasesSupplier() { private final boolean promotableToPrimary; + private final EngineReadWriteLock engineLock; + /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ @@ -177,7 +179,8 @@ public EngineConfig( LongSupplier relativeTimeInNanosSupplier, Engine.IndexCommitListener indexCommitListener, boolean promotableToPrimary, - MapperService mapperService + MapperService mapperService, + EngineReadWriteLock engineLock ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -224,6 +227,7 @@ public EngineConfig( this.promotableToPrimary = promotableToPrimary; // always use compound on flush - reduces # of file-handles on refresh this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true); + this.engineLock = engineLock; } /** @@ -468,4 +472,8 @@ public boolean getUseCompoundFile() { public MapperService getMapperService() { return mapperService; } + + public EngineReadWriteLock getEngineLock() { + return engineLock; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineReadWriteLock.java b/server/src/main/java/org/elasticsearch/index/engine/EngineReadWriteLock.java new file mode 100644 index 0000000000000..7ad0c110ea167 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineReadWriteLock.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Reentrant read/write lock used to guard engine changes in a shard. + * + * Implemented as a simple wrapper around a {@link ReentrantReadWriteLock} to make it easier to add/override methods in the future. + */ +public final class EngineReadWriteLock implements ReadWriteLock { + + private final ReentrantReadWriteLock lock; + + public EngineReadWriteLock() { + this.lock = new ReentrantReadWriteLock(); + } + + @Override + public Lock writeLock() { + return lock.writeLock(); + } + + @Override + public Lock readLock() { + return lock.readLock(); + } + + /** + * See {@link ReentrantReadWriteLock#isWriteLocked()} + */ + public boolean isWriteLocked() { + return lock.isWriteLocked(); + } + + /** + * See {@link ReentrantReadWriteLock#isWriteLockedByCurrentThread()} + */ + public boolean isWriteLockedByCurrentThread() { + return lock.isWriteLockedByCurrentThread(); + } + + /** + * Returns {@code true} if the number of read locks held by any thread is greater than zero. + * This method is designed for use in monitoring system state, not for synchronization control. + * + * @return {@code true} if any thread holds a read lock and {@code false} otherwise + */ + public boolean isReadLocked() { + return lock.getReadLockCount() > 0; + } + + /** + * Returns {@code true} if the number of holds on the read lock by the current thread is greater than zero. + * This method is designed for use in monitoring system state, not for synchronization control. + * + * @return {@code true} if the number of holds on the read lock by the current thread is greater than zero, {@code false} otherwise + */ + public boolean isReadLockedByCurrentThread() { + return lock.getReadHoldCount() > 0; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6bf0a8e799347..5be2e171a5230 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -91,6 +91,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineReadWriteLock; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; @@ -182,7 +183,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -249,7 +249,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm // read/write lock for mutating the engine (lock ordering: closeMutex -> engineLock.writeLock -> mutex) - private final ReentrantReadWriteLock engineLock = new ReentrantReadWriteLock(); + private final EngineReadWriteLock engineLock = new EngineReadWriteLock(); private Engine currentEngine = null; // must be accessed while holding engineLock final EngineFactory engineFactory; @@ -1841,7 +1841,7 @@ public void close(String reason, boolean flushEngine, Executor closeExecutor, Ac engineLock.writeLock().unlock(); } } finally { - assert engineLock.getReadHoldCount() > 0 : "hold the read lock when submitting the engine closing task"; + assert engineLock.isReadLockedByCurrentThread() : "hold the read lock when submitting the engine closing task"; try { final Engine engine = engineOrNull; // When closeExecutor is EsExecutors.DIRECT_EXECUTOR_SERVICE, the following runnable will run within the current thread @@ -3397,7 +3397,7 @@ public Engine getEngineOrNull() { } private Engine getCurrentEngine(boolean allowNoEngine) { - assert engineLock.getReadHoldCount() > 0 || engineLock.isWriteLockedByCurrentThread(); + assert engineLock.isReadLockedByCurrentThread() || engineLock.isWriteLockedByCurrentThread(); var engine = this.currentEngine; if (engine == null && allowNoEngine == false) { throw new AlreadyClosedException("engine is closed"); @@ -3735,7 +3735,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { relativeTimeInNanosSupplier, indexCommitListener, routingEntry().isPromotableToPrimary(), - mapperService() + mapperService(), + engineLock ); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e1128a329023a..6b87745904cc6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3612,7 +3612,8 @@ public void testRecoverFromForeignTranslog() throws IOException { config.getRelativeTimeInNanosSupplier(), null, true, - config.getMapperService() + config.getMapperService(), + config.getEngineLock() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -7175,7 +7176,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineLock() ); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9e203f179e6fc..adb6a15c6260a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5060,7 +5060,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineLock() ); return new InternalEngine(configWithWarmer); }); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 4e280f5443787..a60805b587997 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineReadWriteLock; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; @@ -166,7 +167,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { System::nanoTime, null, true, - EngineTestCase.createMapperService() + EngineTestCase.createMapperService(), + new EngineReadWriteLock() ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index d4554df1617ee..756bbbe5f085b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -304,7 +304,8 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineLock() ); } @@ -337,7 +338,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineLock() ); } @@ -370,7 +372,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineLock() ); } @@ -875,7 +878,8 @@ public EngineConfig config( this::relativeTimeInNanos, indexCommitListener, true, - mapperService + mapperService, + new EngineReadWriteLock() ); } @@ -916,7 +920,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineLock() ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 957570918cde3..6f3348c324bea 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineReadWriteLock; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; @@ -273,7 +274,8 @@ public void onFailedEngine(String reason, Exception e) { System::nanoTime, null, true, - mapperService + mapperService, + new EngineReadWriteLock() ); } From 6e4a5decfa2c5b812c753a6fe7de18f84afe33a3 Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 2 Apr 2025 16:29:14 +0200 Subject: [PATCH 03/21] acquire read lock before refresh --- .../elasticsearch/index/engine/Engine.java | 30 +++++++++++++++++++ .../index/engine/InternalEngine.java | 30 +++++++++++++++---- 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index ec52092730bb1..a35ee271cd376 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; @@ -2384,4 +2385,33 @@ public void prepareForEngineReset() throws IOException { public long getLastUnsafeSegmentGenerationForGets() { throw new UnsupportedOperationException("Doesn't support getting the latest segment generation"); } + + protected static > R wrapForAssertions(R referenceManager, EngineConfig engineConfig) { + if (Assertions.ENABLED) { + referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineLock())); + } + return referenceManager; + } + + /** + * RefreshListener that asserts that the engine read lock is held by the thread refreshing the reference. + */ + private static class AssertRefreshListenerHoldsEngineReadLock implements ReferenceManager.RefreshListener { + + private final EngineReadWriteLock engineLock; + + private AssertRefreshListenerHoldsEngineReadLock(EngineReadWriteLock engineLock) { + this.engineLock = Objects.requireNonNull(engineLock); + } + + @Override + public void beforeRefresh() throws IOException { + assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread(); + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index acd9cec8b064d..9f5148abdd7d2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -301,8 +301,8 @@ public InternalEngine(EngineConfig engineConfig) { } externalReaderManager = createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig)); internalReaderManager = externalReaderManager.internalReaderManager; - this.internalReaderManager = internalReaderManager; - this.externalReaderManager = externalReaderManager; + this.internalReaderManager = wrapForAssertions(internalReaderManager, engineConfig); + this.externalReaderManager = wrapForAssertions(externalReaderManager, engineConfig); internalReaderManager.addListener(versionMap); this.lastUnsafeSegmentGenerationForGets = new AtomicLong(lastCommittedSegmentInfos.getGeneration()); assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; @@ -2040,7 +2040,7 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); - boolean refreshed; + boolean refreshed = false; long segmentGeneration = RefreshResult.UNKNOWN_GENERATION; try { // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way. @@ -2051,12 +2051,30 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea // the second refresh will only do the extra work we have to do for warming caches etc. ReferenceManager referenceManager = getReferenceManager(scope); long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration(); + + // The shard uses a reentrant read/write lock to guard again engine changes, a type of lock that prioritizes the threads + // waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners + // are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the + // engine write lock, so we acquire the read lock upfront before the refresh lock. + final var engineReadLock = engineConfig.getEngineLock().readLock(); + // it is intentional that we never refresh both internal / external together if (block) { - referenceManager.maybeRefreshBlocking(); - refreshed = true; + engineReadLock.lock(); + try { + referenceManager.maybeRefreshBlocking(); + refreshed = true; + } finally { + engineReadLock.unlock(); + } } else { - refreshed = referenceManager.maybeRefresh(); + if (engineReadLock.tryLock()) { + try { + refreshed = referenceManager.maybeRefresh(); + } finally { + engineReadLock.unlock(); + } + } } if (refreshed) { final ElasticsearchDirectoryReader current = referenceManager.acquire(); From 1085560899d8ade041c57281df3bf0a6ed4c91ff Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 4 Apr 2025 16:17:25 +0200 Subject: [PATCH 04/21] Use lock for resets only --- .../index/shard/IndexShardIT.java | 2 +- .../indices/IndexingMemoryControllerIT.java | 2 +- .../elasticsearch/index/engine/Engine.java | 11 +- .../index/engine/EngineConfig.java | 10 +- ...eadWriteLock.java => EngineResetLock.java} | 34 +- .../index/engine/InternalEngine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 342 ++++++++---------- .../index/engine/InternalEngineTests.java | 4 +- .../index/shard/IndexShardTests.java | 139 +++++-- .../index/shard/RefreshListenersTests.java | 4 +- .../index/engine/EngineTestCase.java | 10 +- .../index/engine/FollowingEngineTests.java | 4 +- 12 files changed, 316 insertions(+), 248 deletions(-) rename server/src/main/java/org/elasticsearch/index/engine/{EngineReadWriteLock.java => EngineResetLock.java} (65%) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 41307b92a61a4..507724bbaeb3c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -676,7 +676,7 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti final CountDownLatch engineResetLatch = new CountDownLatch(1); shard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> { try { - shard.resetEngineToGlobalCheckpoint(); + shard.rollbackEngineToGlobalCheckpoint(); } finally { r.close(); engineResetLatch.countDown(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 98179c512cbe7..7f654c712d055 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -85,7 +85,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineLock() + config.getEngineResetLock() ); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index a35ee271cd376..1ad414036bf0e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -2386,9 +2386,12 @@ public long getLastUnsafeSegmentGenerationForGets() { throw new UnsupportedOperationException("Doesn't support getting the latest segment generation"); } - protected static > R wrapForAssertions(R referenceManager, EngineConfig engineConfig) { + protected static > R wrapForAssertions( + R referenceManager, + EngineConfig engineConfig + ) { if (Assertions.ENABLED) { - referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineLock())); + referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineResetLock())); } return referenceManager; } @@ -2398,9 +2401,9 @@ protected static > R wr */ private static class AssertRefreshListenerHoldsEngineReadLock implements ReferenceManager.RefreshListener { - private final EngineReadWriteLock engineLock; + private final EngineResetLock engineLock; - private AssertRefreshListenerHoldsEngineReadLock(EngineReadWriteLock engineLock) { + private AssertRefreshListenerHoldsEngineReadLock(EngineResetLock engineLock) { this.engineLock = Objects.requireNonNull(engineLock); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index c9425aa8d7504..86c79bf99a373 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -146,7 +146,7 @@ public Supplier retentionLeasesSupplier() { private final boolean promotableToPrimary; - private final EngineReadWriteLock engineLock; + private final EngineResetLock engineResetLock; /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} @@ -180,7 +180,7 @@ public EngineConfig( Engine.IndexCommitListener indexCommitListener, boolean promotableToPrimary, MapperService mapperService, - EngineReadWriteLock engineLock + EngineResetLock engineResetLock ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -227,7 +227,7 @@ public EngineConfig( this.promotableToPrimary = promotableToPrimary; // always use compound on flush - reduces # of file-handles on refresh this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true); - this.engineLock = engineLock; + this.engineResetLock = engineResetLock; } /** @@ -473,7 +473,7 @@ public MapperService getMapperService() { return mapperService; } - public EngineReadWriteLock getEngineLock() { - return engineLock; + public EngineResetLock getEngineResetLock() { + return engineResetLock; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineReadWriteLock.java b/server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java similarity index 65% rename from server/src/main/java/org/elasticsearch/index/engine/EngineReadWriteLock.java rename to server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java index 7ad0c110ea167..7f1f7d165bbf6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineReadWriteLock.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java @@ -9,21 +9,25 @@ package org.elasticsearch.index.engine; +import org.elasticsearch.core.Assertions; + +import java.util.Collection; +import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * Reentrant read/write lock used to guard engine changes in a shard. + * Reentrant read/write lock used to control accesses to a shard's engine that can be reset. * - * Implemented as a simple wrapper around a {@link ReentrantReadWriteLock} to make it easier to add/override methods in the future. + * Implemented as a simple wrapper around a {@link ReentrantReadWriteLock} to make it easier to add/override methods. */ -public final class EngineReadWriteLock implements ReadWriteLock { +public final class EngineResetLock implements ReadWriteLock { private final ReentrantReadWriteLock lock; - public EngineReadWriteLock() { - this.lock = new ReentrantReadWriteLock(); + public EngineResetLock() { + this.lock = Assertions.ENABLED ? new QueuedWriterThreadsReentrantReadWriteLock() : new ReentrantReadWriteLock(); } @Override @@ -69,4 +73,24 @@ public boolean isReadLocked() { public boolean isReadLockedByCurrentThread() { return lock.getReadHoldCount() > 0; } + + /** + * See {@link ReentrantReadWriteLock#getQueuedWriterThreads()} + */ + public Collection getQueuedWriterThreads() { + if (lock instanceof QueuedWriterThreadsReentrantReadWriteLock queuedLock) { + return queuedLock.queuedWriterThreads(); + } else { + return List.of(); + } + } + + /** + * Extends ReentrantReadWriteLock to expose the protected {@link ReentrantReadWriteLock#getQueuedWriterThreads()} method + */ + private static class QueuedWriterThreadsReentrantReadWriteLock extends ReentrantReadWriteLock { + Collection queuedWriterThreads() { + return super.getQueuedWriterThreads(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 9f5148abdd7d2..3350dd786b6c6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2056,7 +2056,7 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea // waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners // are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the // engine write lock, so we acquire the read lock upfront before the refresh lock. - final var engineReadLock = engineConfig.getEngineLock().readLock(); + final var engineReadLock = engineConfig.getEngineResetLock().readLock(); // it is intentional that we never refresh both internal / external together if (block) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5be2e171a5230..8123b8269f687 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -91,7 +91,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; -import org.elasticsearch.index.engine.EngineReadWriteLock; +import org.elasticsearch.index.engine.EngineResetLock; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; @@ -248,14 +248,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private volatile SubscribableListener postRecoveryComplete; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - // read/write lock for mutating the engine (lock ordering: closeMutex -> engineLock.writeLock -> mutex) - private final EngineReadWriteLock engineLock = new EngineReadWriteLock(); - private Engine currentEngine = null; // must be accessed while holding engineLock + // mutex for creating/closing the engine + private final Object engineMutex = new Object(); // lock ordering: engineMutex -> engineResetLock -> mutex + // read/write lock for reseting the engine + private final EngineResetLock engineResetLock = new EngineResetLock(); + // reference to the current engine + private final AtomicReference currentEngine = new AtomicReference<>(); // must be accessed holding engineResetLock final EngineFactory engineFactory; - // mutex for closing the shard - private final Object closeMutex = new Object(); - private final IndexingOperationListener indexingOperationListeners; private final GlobalCheckpointSyncer globalCheckpointSyncer; @@ -1652,20 +1652,12 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - synchronized (closeMutex) { - engineLock.readLock().lock(); - try { - // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - final Engine engine = getEngineOrNull(); - if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); - } - if (indexCommit == null) { - return store.getMetadata(null, true); - } - } finally { - engineLock.readLock().unlock(); + synchronized (engineMutex) { + // if the engine is not running, we can access the store directly, but we need to make sure no one starts + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. + indexCommit = withEngineOrNull(engine -> engine != null ? engine.acquireLastIndexCommit(false) : null); + if (indexCommit == null) { + return store.getMetadata(null, true); } } return store.getMetadata(indexCommit.getIndexCommit()); @@ -1820,40 +1812,21 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { - synchronized (closeMutex) { - Engine engineOrNull = null; + synchronized (engineMutex) { + engineResetLock.readLock().lock(); // prevent engine resets while closing try { - // engine reference and shard state are changed under the engine write lock - engineLock.writeLock().lock(); try { - try { - synchronized (mutex) { - changeState(IndexShardState.CLOSED, reason); - } - checkAndCallWaitForEngineOrClosedShardListeners(); - } finally { - engineOrNull = getAndSetCurrentEngine(null); - // downgrade to read lock for submitting the engine closing task - // (not strictly required because engine changes are no longer allowed after the state was changed to CLOSED) - engineLock.readLock().lock(); + synchronized (mutex) { + changeState(IndexShardState.CLOSED, reason); } + checkAndCallWaitForEngineOrClosedShardListeners(); } finally { - engineLock.writeLock().unlock(); - } - } finally { - assert engineLock.isReadLockedByCurrentThread() : "hold the read lock when submitting the engine closing task"; - try { - final Engine engine = engineOrNull; - // When closeExecutor is EsExecutors.DIRECT_EXECUTOR_SERVICE, the following runnable will run within the current thread - // while the read lock is held, which is OK. When closeExecutor is a generic thread or the cluster state applier thread - // it will then run without any engine lock held, which is OK because no engine changes are allowed after the state is - // changed to CLOSED. + final Engine engine = getAndSetCurrentEngine(null); closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { @Override public void run() throws Exception { try { if (engine != null && flushEngine) { - assert engineLock.isWriteLockedByCurrentThread() == false : "do not flush under engine write lock"; engine.flushAndClose(); } } finally { @@ -1874,9 +1847,9 @@ public String toString() { return "IndexShard#close[" + shardId + "]"; } })); - } finally { - engineLock.readLock().unlock(); } + } finally { + engineResetLock.readLock().unlock(); } } } @@ -2004,12 +1977,13 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; - engineLock.writeLock().lock(); - try { - verifyNotClosed(); - IOUtils.close(getAndSetCurrentEngine(null)); - } finally { - engineLock.writeLock().unlock(); + synchronized (engineMutex) { + engineResetLock.readLock().lock(); // prevent engine resets while closing + try { + IOUtils.close(getAndSetCurrentEngine(null)); + } finally { + engineResetLock.readLock().unlock(); + } } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -2239,19 +2213,21 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - engineLock.writeLock().lock(); - try { - assert currentEngine == null : "engine is running"; + synchronized (engineMutex) { + assert getEngineOrNull() == null : "engine is running"; verifyNotClosed(); - // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). - final Engine newEngine = createEngine(config); - onNewEngine(newEngine); - getAndSetCurrentEngine(newEngine); + engineResetLock.readLock().lock(); // prevent engine resets while closing + try { + // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). + final Engine newEngine = createEngine(config); + onNewEngine(newEngine); + getAndSetCurrentEngine(newEngine); + } finally { + engineResetLock.readLock().unlock(); + } // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); - } finally { - engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -2316,7 +2292,7 @@ private boolean assertLastestCommitUserData() throws IOException { } private void onNewEngine(Engine newEngine) { - assert engineLock.isWriteLockedByCurrentThread(); + assert Thread.holdsLock(engineMutex); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); refreshListeners.setCurrentProcessedCheckpointSupplier(newEngine::getProcessedLocalCheckpoint); refreshListeners.setMaxIssuedSeqNoSupplier(newEngine::getMaxSeqNo); @@ -2327,13 +2303,15 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; - engineLock.writeLock().lock(); - try { + synchronized (engineMutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(getAndSetCurrentEngine(null)); + engineResetLock.readLock().lock(); // prevent engine resets while closing + try { + IOUtils.close(getAndSetCurrentEngine(null)); + } finally { + engineResetLock.readLock().unlock(); + } resetRecoveryStage(); - } finally { - engineLock.writeLock().unlock(); } } @@ -2671,20 +2649,14 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - // This method can be called within the cluster state applier thread - if (engineLock.readLock().tryLock() == false) { - // Attempt to acquire a read lock failed: - // - the engine is closing, in which case we don't need to apply the updated index settings - // - otherwise the onSettingsChanged() should be called again after the new engine is created and the write lock is released - return; - } + engineResetLock.readLock().lock(); try { - var engineOrNull = getCurrentEngine(true); - if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); + var engine = getCurrentEngine(true); + if (engine != null) { + engine.onSettingsChanged(); } } finally { - engineLock.readLock().unlock(); + engineResetLock.readLock().unlock(); } } @@ -3375,11 +3347,11 @@ private void doCheckIndex() throws IOException { } Engine getEngine() { - engineLock.readLock().lock(); + engineResetLock.readLock().lock(); try { return getCurrentEngine(false); } finally { - engineLock.readLock().unlock(); + engineResetLock.readLock().unlock(); } } @@ -3388,17 +3360,17 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - engineLock.readLock().lock(); + engineResetLock.readLock().lock(); try { return getCurrentEngine(true); } finally { - engineLock.readLock().unlock(); + engineResetLock.readLock().unlock(); } } private Engine getCurrentEngine(boolean allowNoEngine) { - assert engineLock.isReadLockedByCurrentThread() || engineLock.isWriteLockedByCurrentThread(); - var engine = this.currentEngine; + assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */; + var engine = currentEngine.get(); if (engine == null && allowNoEngine == false) { throw new AlreadyClosedException("engine is closed"); } @@ -3406,10 +3378,9 @@ private Engine getCurrentEngine(boolean allowNoEngine) { } private Engine getAndSetCurrentEngine(Engine newEngine) { - assert engineLock.isWriteLockedByCurrentThread(); - var previousEngine = this.currentEngine; - this.currentEngine = newEngine; - return previousEngine; + assert Thread.holdsLock(engineMutex); + assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */; + return currentEngine.getAndSet(newEngine); } /** @@ -3457,12 +3428,12 @@ private R withEngine(Function operation, boolean allowNoEngine) { assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); assert operation != null; - engineLock.readLock().lock(); + engineResetLock.readLock().lock(); try { var engine = getCurrentEngine(allowNoEngine); return operation.apply(engine); } finally { - engineLock.readLock().unlock(); + engineResetLock.readLock().unlock(); } } @@ -3736,7 +3707,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { indexCommitListener, routingEntry().isPromotableToPrimary(), mapperService(), - engineLock + engineResetLock ); } @@ -4015,7 +3986,7 @@ private void innerAcquireReplicaOperationPermit( maxSeqNo ); if (currentGlobalCheckpoint < maxSeqNo) { - resetEngineToGlobalCheckpoint(); + rollbackEngineToGlobalCheckpoint(); } else { getEngine().rollTranslogGeneration(); } @@ -4470,59 +4441,54 @@ public void afterRefresh(boolean didRefresh) { public void resetEngine(Consumer postResetNewEngineConsumer) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); + Engine previousEngine = null; try { - engineLock.readLock().lock(); - var release = true; - try { + synchronized (engineMutex) { verifyNotClosed(); - // Preparing the engine for reset flushes and blocks for refresh: it should not be executed under the write lock because - // another thread might already be refreshing, and the refresh listeners in that thread will need to access to the engine - // using the read lock. If we were using the write lock here, it would deadlock. - currentEngine.prepareForEngineReset(); - engineLock.readLock().unlock(); - release = false; - - // Promote to write lock in order to swap engines - engineLock.writeLock().lock(); - Engine previousEngine = null; try { - - // How do we ensure that no indexing operations have been processed since prepareForEngineReset() here? We're not - // blocking all operations when resetting the engine nor we are blocking flushes or force-merges. - - assert state != IndexShardState.CLOSED : "state has changed without holding engine write lock!"; - var newEngine = createEngine(newEngineConfig(replicationTracker)); - previousEngine = getAndSetCurrentEngine(newEngine); - postResetNewEngineConsumer.accept(newEngine); - onNewEngine(newEngine); - } finally { - engineLock.readLock().lock(); + engineResetLock.writeLock().lock(); try { - engineLock.writeLock().unlock(); - IOUtils.close(previousEngine); + var engine = getCurrentEngine(false); + engine.prepareForEngineReset(); + var newEngine = createEngine(newEngineConfig(replicationTracker)); + getAndSetCurrentEngine(newEngine); + onNewEngine(newEngine); + postResetNewEngineConsumer.accept(newEngine); + previousEngine = engine; } finally { - engineLock.readLock().unlock(); + if (previousEngine != null) { + // Downgrade to read lock for closing the engine + engineResetLock.readLock().lock(); + } + engineResetLock.writeLock().unlock(); } - } - } finally { - if (release) { - engineLock.readLock().unlock(); + } catch (Exception e) { + // we want to fail the shard in the case prepareForEngineReset throws + failShard("unable to reset engine", e); } } onSettingsChanged(); - } catch (Exception e) { - // we want to fail the shard in the case prepareForEngineReset throws - failShard("unable to reset engine", e); + } finally { + if (previousEngine != null) { + assert engineResetLock.isReadLockedByCurrentThread(); + try { + IOUtils.close(previousEngine); + } catch (Exception e) { + failShard("unable to close previous engine after reset", e); + } finally { + engineResetLock.readLock().unlock(); + } + } } } /** * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ - void resetEngineToGlobalCheckpoint() throws IOException { + void rollbackEngineToGlobalCheckpoint() throws IOException { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED - : "resetting engine without blocking operations; active operations are [" + getActiveOperationsCount() + ']'; + : "engine rollback without blocking operations; active operations are [" + getActiveOperationsCount() + ']'; sync(); // persist the global checkpoint to disk final SeqNoStats seqNoStats = seqNoStats(); final TranslogStats translogStats = translogStats(); @@ -4532,68 +4498,61 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - engineLock.writeLock().lock(); - try { + synchronized (engineMutex) { verifyNotClosed(); - // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, - // acquireXXXCommit and close works. - final Engine readOnlyEngine = new ReadOnlyEngine( - newEngineConfig(replicationTracker), - seqNoStats, - translogStats, - false, - Function.identity(), - true, - false - ) { - @Override - public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - engineLock.readLock().lock(); - try { - if (newEngineReference.get() == null) { - throw new AlreadyClosedException("engine was closed"); + engineResetLock.readLock().lock(); // prevent engine resets during rollback + try { + // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, + // acquireXXXCommit and close works. + final Engine readOnlyEngine = new ReadOnlyEngine( + newEngineConfig(replicationTracker), + seqNoStats, + translogStats, + false, + Function.identity(), + true, + false + ) { + @Override + public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); + } + // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay + return newEngineReference.get().acquireLastIndexCommit(false); } - // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay - return newEngineReference.get().acquireLastIndexCommit(false); - } finally { - engineLock.readLock().unlock(); } - } - @Override - public IndexCommitRef acquireSafeIndexCommit() { - engineLock.readLock().lock(); - try { - if (newEngineReference.get() == null) { - throw new AlreadyClosedException("engine was closed"); + @Override + public IndexCommitRef acquireSafeIndexCommit() { + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); + } + return newEngineReference.get().acquireSafeIndexCommit(); } - return newEngineReference.get().acquireSafeIndexCommit(); - } finally { - engineLock.readLock().unlock(); } - } - @Override - public void close() throws IOException { - Engine newEngine; - engineLock.readLock().lock(); - try { - newEngine = newEngineReference.get(); - if (newEngine == getCurrentEngine(true)) { - // we successfully installed the new engine so do not close it. - newEngine = null; + @Override + public void close() throws IOException { + Engine newEngine; + synchronized (engineMutex) { + newEngine = newEngineReference.get(); + if (newEngine == getCurrentEngine(true)) { + // we successfully installed the new engine so do not close it. + newEngine = null; + } } - } finally { - engineLock.readLock().unlock(); + IOUtils.close(super::close, newEngine); } - IOUtils.close(super::close, newEngine); - } - }; - IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); - newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); - onNewEngine(newEngineReference.get()); - } finally { - engineLock.writeLock().unlock(); + }; + IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); + newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); + onNewEngine(newEngineReference.get()); + } finally { + engineResetLock.readLock().unlock(); + } } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, @@ -4605,15 +4564,17 @@ public void close() throws IOException { ); newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); newEngineReference.get().refresh("reset_engine"); - engineLock.writeLock().lock(); - try { + synchronized (engineMutex) { verifyNotClosed(); - IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); + engineResetLock.readLock().lock(); + try { + IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); + } finally { + engineResetLock.readLock().unlock(); + } // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); - } finally { - engineLock.writeLock().unlock(); } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -4724,4 +4685,9 @@ public void ensureMutable(ActionListener listener, boolean permitAcquired) l.onResponse(null); })); } + + // package-private for tests + EngineResetLock getEngineResetLock() { + return engineResetLock; + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 6b87745904cc6..bc63ef763ec57 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3613,7 +3613,7 @@ public void testRecoverFromForeignTranslog() throws IOException { null, true, config.getMapperService(), - config.getEngineLock() + config.getEngineResetLock() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -7177,7 +7177,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineLock() + config.getEngineResetLock() ); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index adb6a15c6260a..fa4c7c6942d0e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -193,6 +193,7 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; @@ -4515,7 +4516,7 @@ public void testSupplyTombstoneDoc() throws Exception { closeShards(shard); } - public void testResetEngineToGlobalCheckpoint() throws Exception { + public void testRollbackEngineToGlobalCheckpoint() throws Exception { IndexShard shard = newStartedShard(false); indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); long maxSeqNoBeforeRollback = shard.seqNoStats().getMaxSeqNo(); @@ -4556,7 +4557,7 @@ public void testResetEngineToGlobalCheckpoint() throws Exception { final CountDownLatch engineResetLatch = new CountDownLatch(1); shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), globalCheckpoint, 0L, ActionListener.wrap(r -> { try { - shard.resetEngineToGlobalCheckpoint(); + shard.rollbackEngineToGlobalCheckpoint(); } finally { r.close(); engineResetLatch.countDown(); @@ -4604,9 +4605,9 @@ public void prepareForEngineReset() throws IOException {} /** * This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. Closing a shard while engine is inside - * resetEngineToGlobalCheckpoint can lead to check index failure in integration tests. + * rollbackEngineToGlobalCheckpoint can lead to check index failure in integration tests. */ - public void testCloseShardWhileResettingEngine() throws Exception { + public void testCloseShardWhileRollbackEngine() throws Exception { CountDownLatch readyToCloseLatch = new CountDownLatch(1); CountDownLatch closeDoneLatch = new CountDownLatch(1); IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { @@ -4644,7 +4645,7 @@ public void recoverFromTranslog( 0L, ActionListener.wrap(r -> { try (r) { - shard.resetEngineToGlobalCheckpoint(); + shard.rollbackEngineToGlobalCheckpoint(); } finally { engineResetLatch.countDown(); } @@ -4662,9 +4663,9 @@ public void recoverFromTranslog( /** * This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. While engine is inside - * resetEngineToGlobalCheckpoint snapshot metadata could fail + * rollbackEngineToGlobalCheckpoint snapshot metadata could fail */ - public void testSnapshotWhileResettingEngine() throws Exception { + public void testSnapshotWhileRollbackEngine() throws Exception { CountDownLatch readyToSnapshotLatch = new CountDownLatch(1); CountDownLatch snapshotDoneLatch = new CountDownLatch(1); IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { @@ -4711,7 +4712,7 @@ public void recoverFromTranslog( 0L, ActionListener.wrap(r -> { try (r) { - shard.resetEngineToGlobalCheckpoint(); + shard.rollbackEngineToGlobalCheckpoint(); } finally { engineResetLatch.countDown(); } @@ -5061,7 +5062,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineLock() + config.getEngineResetLock() ); return new InternalEngine(configWithWarmer); }); @@ -5099,64 +5100,138 @@ public void testRecordsForceMerges() throws IOException { public void testCloseShardWhileRetainingEngine() throws Exception { final var primary = newStartedShard(true); try { - final var release = new CountDownLatch(1); final var hold = new PlainActionFuture(); + final var close = new PlainActionFuture(); + final var release = new CountDownLatch(1); + final var holdEngineThread = new Thread(() -> { primary.withEngine(engine -> { assertThat(engine, notNullValue()); EngineTestCase.ensureOpen(engine); hold.onResponse(engine); + + safeGet(close); + + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + expectThrows(AlreadyClosedException.class, () -> primary.getEngine()); + assertThat(primary.getEngineOrNull(), nullValue()); + safeAwait(release); return null; }); }); holdEngineThread.start(); - final var secondReaderExecuting = new CountDownLatch(1); - final var closed = new CountDownLatch(1); - final var closeEngineThread = new Thread(() -> { - try { - safeGet(hold); - // Unfair ReentrantReadWriteLock would prioritize writers over readers to avoid starving writers, - // hence we need to wait to close the engine until the second reader has acquired the read lock before - // closing, otherwise the test would deadlock. - safeAwait(secondReaderExecuting); - closeShardNoCheck(primary); - assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); - closed.countDown(); - } catch (IOException e) { - throw new AssertionError(e); - } - }); - closeEngineThread.start(); - final var retainedInstance = asInstanceOf(InternalEngine.class, safeGet(hold)); + assertSame(retainedInstance, primary.getEngine()); assertSame(retainedInstance, primary.getEngineOrNull()); assertThat(primary.state(), equalTo(IndexShardState.STARTED)); primary.withEngineOrNull(engine -> { - secondReaderExecuting.countDown(); assertSame(retainedInstance, engine); EngineTestCase.ensureOpen(engine); return null; }); - release.countDown(); - safeAwait(closed); + final var closeEngineThread = new Thread(() -> { + try { + safeGet(hold); + + assertThat(primary.getEngineResetLock().isReadLocked(), equalTo(true)); + + closeShardNoCheck(primary); + + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + expectThrows(AlreadyClosedException.class, () -> primary.getEngine()); + assertThat(primary.getEngineOrNull(), nullValue()); + + close.onResponse(null); + } catch (IOException e) { + close.onFailure(e); + } + }); + closeEngineThread.start(); + safeGet(close); assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + expectThrows(AlreadyClosedException.class, () -> primary.getEngine()); assertThat(primary.getEngineOrNull(), nullValue()); primary.withEngineOrNull(engine -> { assertThat(engine, nullValue()); return null; }); - holdEngineThread.join(); closeEngineThread.join(); + release.countDown(); + holdEngineThread.join(); } finally { IOUtils.close(primary.store()); } } + public void testResetEngineWhileRetainingEngine() throws Exception { + final var preparedForReset = new AtomicBoolean(); + final var shard = newStartedShard(true, Settings.EMPTY, config -> { + if (preparedForReset.get()) { + return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true); + } else { + return new InternalEngine(config) { + @Override + public void prepareForEngineReset() throws IOException { + assertTrue(preparedForReset.compareAndSet(false, true)); + } + }; + } + }); + final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock(); + + final var release = new CountDownLatch(1); + final var hold = new PlainActionFuture(); + final var holdEngineThread = new Thread(() -> { + shard.withEngine(engine -> { + assertThat(engine, notNullValue()); + EngineTestCase.ensureOpen(engine); + hold.onResponse(engine); + safeAwait(release, TimeValue.ONE_HOUR); + return null; + }); + }); + holdEngineThread.start(); + safeGet(hold); + + assertThat(shard.getEngine(), instanceOf(InternalEngine.class)); + + final var reset = new PlainActionFuture(); + final var resetEngineThread = new Thread(() -> { + try { + safeGet(hold); + shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> { + try (permit) { + shard.resetEngine(newEngine -> { + assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true)); + assertThat(newEngine, instanceOf(ReadOnlyEngine.class)); + }); + assertThat(preparedForReset.get(), equalTo(true)); + l.onResponse(null); + } + }), EsExecutors.DIRECT_EXECUTOR_SERVICE); + } catch (Exception e) { + reset.onFailure(e); + } + }); + resetEngineThread.start(); + + assertBusy(() -> assertThat(engineResetLock.getQueuedWriterThreads(), hasItem(resetEngineThread))); + assertThat(engineResetLock.isReadLocked(), equalTo(true)); + + release.countDown(); + safeGet(reset); + + holdEngineThread.join(); + resetEngineThread.join(); + + closeShards(shard); + } + public void testShardExposesWriteLoadStats() throws Exception { final IndexShard primary = newStartedShard( true, diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index a60805b587997..a9aae887d7d3b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -39,7 +39,7 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineReadWriteLock; +import org.elasticsearch.index.engine.EngineResetLock; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; @@ -168,7 +168,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { null, true, EngineTestCase.createMapperService(), - new EngineReadWriteLock() + new EngineResetLock() ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 756bbbe5f085b..5a1784bf531c5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -305,7 +305,7 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineLock() + config.getEngineResetLock() ); } @@ -339,7 +339,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineLock() + config.getEngineResetLock() ); } @@ -373,7 +373,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineLock() + config.getEngineResetLock() ); } @@ -879,7 +879,7 @@ public EngineConfig config( indexCommitListener, true, mapperService, - new EngineReadWriteLock() + new EngineResetLock() ); } @@ -921,7 +921,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineLock() + config.getEngineResetLock() ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 6f3348c324bea..61f6833c1f04a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -32,7 +32,7 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineReadWriteLock; +import org.elasticsearch.index.engine.EngineResetLock; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; @@ -275,7 +275,7 @@ public void onFailedEngine(String reason, Exception e) { null, true, mapperService, - new EngineReadWriteLock() + new EngineResetLock() ); } From d8760e363d86412f6005305b150b5b9211d09879 Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 4 Apr 2025 19:40:48 +0200 Subject: [PATCH 05/21] Add test for deadlock --- .../index/engine/EngineResetLock.java | 7 + .../index/shard/IndexShardTests.java | 207 +++++++++++++++++- 2 files changed, 213 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java b/server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java index 7f1f7d165bbf6..3db12e28521b1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java @@ -74,6 +74,13 @@ public boolean isReadLockedByCurrentThread() { return lock.getReadHoldCount() > 0; } + /** + * See {@link ReentrantReadWriteLock#getReadLockCount()} + */ + public int getReadLockCount() { + return lock.getReadLockCount(); + } + /** * See {@link ReentrantReadWriteLock#getQueuedWriterThreads()} */ diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index fa4c7c6942d0e..2f6730d1ec265 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,6 +18,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; @@ -188,6 +189,7 @@ import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.containsStringIgnoringCase; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -5191,7 +5193,7 @@ public void prepareForEngineReset() throws IOException { assertThat(engine, notNullValue()); EngineTestCase.ensureOpen(engine); hold.onResponse(engine); - safeAwait(release, TimeValue.ONE_HOUR); + safeAwait(release); return null; }); }); @@ -5226,12 +5228,215 @@ public void prepareForEngineReset() throws IOException { release.countDown(); safeGet(reset); + assertThat(preparedForReset.get(), equalTo(true)); + holdEngineThread.join(); resetEngineThread.join(); closeShards(shard); } + public void testReentrantEngineReadLockAcquisitionInRefreshListener() throws Exception { + final var lazyShard = new AtomicReference(); + final var lazyEngineConfig = new AtomicReference(); + + final var refreshStarted = new CountDownLatch(1); + final var blockRefresh = new AtomicBoolean(); + final var unblockRefresh = new CountDownLatch(1); + + final var getFromTranslogStarted = new CountDownLatch(1); + final var getFromTranslogResult = new PlainActionFuture(); + + final var resetStarted = new CountDownLatch(1); + + // Refresh listener that blocks on purpose (so it holds the refresh lock) and acquires the engine read lock in a reentrant manner + final var blockingRefreshListener = new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() throws IOException { + if (blockRefresh.get()) { + try { + var shard = lazyShard.get(); + assertThat(shard, notNullValue()); + + // Asserts that the refresh is triggered by the test and not something else + assertThat(Thread.currentThread().toString(), containsStringIgnoringCase(getTestClass().getSimpleName())); + + // Asserts the current thread holds the engine read lock + var engineResetLock = lazyEngineConfig.get().getEngineResetLock(); + assertThat(engineResetLock.isReadLockedByCurrentThread(), equalTo(true)); + + refreshStarted.countDown(); + safeAwait(getFromTranslogStarted); + + // A this stage, getThread is blocked on the refresh held by the current thread + assertBusy(() -> assertThat(engineResetLock.getReadLockCount(), greaterThanOrEqualTo(2))); + assertThat(getFromTranslogResult.isDone(), equalTo(false)); + + // Waits for the resetThread + safeAwait(resetStarted); + + // The resetThread waits for the engine write lock, blocking new non-reentrant engine read lock acquisitions + assertBusy(() -> assertThat(engineResetLock.getQueuedWriterThreads(), hasSize(1))); + + // Ensure that accessing the engine from a refresh listener works, even if another thread (like resetThread) is + // waiting for the engine write lock. If we were not acquiring the engine read lock when refreshing the reader, + // we would deadlock here. + var localCheckpoint = shard.withEngine(engine -> engine.getProcessedLocalCheckpoint()); + assertThat(localCheckpoint, greaterThan(SequenceNumbers.NO_OPS_PERFORMED)); + + // Also test `getEngine` + var internalEngine = asInstanceOf(InternalEngine.class, shard.getEngine()); + assertThat(internalEngine.getTranslogStats().getUncommittedOperations(), equalTo(1)); + + // Don't block refresh again (it will flush and refresh later in prepareEngineForReset) + blockRefresh.set(false); + + safeAwait(unblockRefresh); + } catch (Exception e) { + throw new AssertionError(e); + } + } + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException {} + }; + + final var preparedForReset = new AtomicBoolean(); + final var shard = newShard(true, Settings.EMPTY, config -> { + if (preparedForReset.get()) { + return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true); + } else { + var internalRefreshListeners = new ArrayList(); + internalRefreshListeners.add(blockingRefreshListener); + internalRefreshListeners.addAll(config.getInternalRefreshListener()); + + var engineConfigWithBlockingRefreshListener = new EngineConfig( + config.getShardId(), + config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), + config.getIndexSettings(), + config.getWarmer(), + config.getStore(), + config.getMergePolicy(), + config.getAnalyzer(), + config.getSimilarity(), + new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE), + config.getEventListener(), + config.getQueryCache(), + config.getQueryCachingPolicy(), + config.getTranslogConfig(), + config.getFlushMergesAfter(), + config.getExternalRefreshListener(), + internalRefreshListeners, + config.getIndexSort(), + config.getCircuitBreakerService(), + config.getGlobalCheckpointSupplier(), + config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + config.getLeafSorter(), + config.getRelativeTimeInNanosSupplier(), + config.getIndexCommitListener(), + config.isPromotableToPrimary(), + config.getMapperService(), + config.getEngineResetLock() + ); + lazyEngineConfig.set(engineConfigWithBlockingRefreshListener); + return new InternalEngine(engineConfigWithBlockingRefreshListener) { + @Override + public void prepareForEngineReset() throws IOException { + flush(true, true); + assertTrue(preparedForReset.compareAndSet(false, true)); + } + }; + } + }); + try { + recoverShardFromStore(shard); + blockRefresh.set(true); + lazyShard.set(shard); + + // Index a doc with an auto-generated idea makes the version map unsafe, and the realtime get will have to refresh + var index = indexDoc(shard, "_doc", null /* auto-generated id */); + assertThat(index.isCreated(), equalTo(true)); + + // Trigger a refresh + var refreshThread = new Thread(() -> shard.refresh("test")); + refreshThread.start(); + + // Wait for the refresh listener to hold the resfresh lock and the engine read lock + safeAwait(refreshStarted); + + // While refresh is blocked holding the locks, triggers a getFromTranslog() that will refresh-blocking in another thread + var getThread = new Thread(() -> { + shard.withEngine(engine -> { + getFromTranslogStarted.countDown(); + try ( + // Will block on the refresh lock + var getResult = engine.get( + new Engine.Get(true, false, index.getId()), + shard.mapperService().mappingLookup(), + shard.mapperService().documentParser(), + searcher -> searcher + ) + ) { + assertThat(getResult, notNullValue()); + getFromTranslogResult.onResponse(getResult.exists()); + return null; + } + }); + }); + getThread.start(); + + final var engineResetLock = lazyEngineConfig.get().getEngineResetLock(); + safeAwait(getFromTranslogStarted); + + // Resets the engine to have a thread waiting for the engine write lock (this will block non-reentrant read lock acquisitions) + final var reset = new PlainActionFuture(); + final var resetEngineThread = new Thread(() -> { + resetStarted.countDown(); + try { + shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> { + try (permit) { + shard.resetEngine(newEngine -> { + assertThat(newEngine.getEngineConfig().getEngineResetLock(), sameInstance(engineResetLock)); + assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true)); + assertThat(newEngine, instanceOf(ReadOnlyEngine.class)); + assertThat(getFromTranslogResult.isDone(), equalTo(true)); + }); + assertThat(preparedForReset.get(), equalTo(true)); + l.onResponse(null); + } + }), EsExecutors.DIRECT_EXECUTOR_SERVICE); + } catch (Exception e) { + reset.onFailure(e); + } + }); + resetEngineThread.start(); + + safeAwait(resetStarted); + + // A this stage, getThread is blocked by refreshThread, and boths threads block resetEngineThread + assertThat(getFromTranslogResult.isDone(), equalTo(false)); + + assertBusy(() -> assertThat(engineResetLock.getReadLockCount(), greaterThanOrEqualTo(2))); + assertBusy(() -> assertThat(engineResetLock.getQueuedWriterThreads(), hasItem(resetEngineThread))); + assertThat(engineResetLock.isWriteLocked(), equalTo(false)); + assertThat(engineResetLock.isReadLocked(), equalTo(true)); + + unblockRefresh.countDown(); + + safeGet(reset); + + resetEngineThread.join(); + refreshThread.join(); + getThread.join(); + } finally { + closeShards(shard); + } + } + public void testShardExposesWriteLoadStats() throws Exception { final IndexShard primary = newStartedShard( true, From a9832ebcb94ee251bf0065c83f37d66d795231e8 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 13:38:19 +0200 Subject: [PATCH 06/21] sometimes --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3350dd786b6c6..2040c6a04df23 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2054,7 +2054,7 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea // The shard uses a reentrant read/write lock to guard again engine changes, a type of lock that prioritizes the threads // waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners - // are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the + // sometimes access the engine read lock, we need to ensure that they won't block if another thread is waiting for the // engine write lock, so we acquire the read lock upfront before the refresh lock. final var engineReadLock = engineConfig.getEngineResetLock().readLock(); From 05ebd5ccb6a0e1d212d0af1be9d2fd74bb1f5822 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 13:50:02 +0200 Subject: [PATCH 07/21] doc about non-blocking refreshes and reset --- server/src/main/java/org/elasticsearch/index/engine/Engine.java | 2 +- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 1ad414036bf0e..e57574c23d264 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1289,7 +1289,7 @@ public void externalRefresh(String source, ActionListener /** * Asynchronously refreshes the engine for new search operations to reflect the latest - * changes unless another thread is already refreshing the engine concurrently. + * changes unless another thread is already refreshing or reseting the engine concurrently. */ @Nullable public abstract void maybeRefresh(String source, ActionListener listener) throws EngineException; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8123b8269f687..bdc9635c38166 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4433,6 +4433,8 @@ public void afterRefresh(boolean didRefresh) { * * In general, resetting the engine should be done with care, to consider any in-progress operations and listeners. * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. + * Reseting the engine can prevent non-blocking engine refreshes (see {@link Engine#maybeRefresh(String, ActionListener)} to be + * immediately executed, so it is expected that the new engine instance provides refreshed readers (if supported) after the reset. * * @param postResetNewEngineConsumer A consumer that will be called with the newly created engine after the reset * is complete, allowing for post-reset operations on the new engine instance. From eb83fdb1426c114cb5988aa1f8823fab1e1628fe Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 14:14:36 +0200 Subject: [PATCH 08/21] assertNoEngineResetLock --- .../elasticsearch/index/shard/IndexShard.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bdc9635c38166..d2f0027cc0ea7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1652,6 +1652,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { + assert assertNoEngineResetLock(); synchronized (engineMutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. @@ -1812,6 +1813,7 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { + assert assertNoEngineResetLock(); synchronized (engineMutex) { engineResetLock.readLock().lock(); // prevent engine resets while closing try { @@ -1977,6 +1979,7 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; + assert assertNoEngineResetLock(); synchronized (engineMutex) { engineResetLock.readLock().lock(); // prevent engine resets while closing try { @@ -2199,6 +2202,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException { assert Thread.holdsLock(mutex) == false : "opening engine under mutex"; + assert assertNoEngineResetLock(); if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -2303,6 +2307,7 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; + assert assertNoEngineResetLock(); synchronized (engineMutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; engineResetLock.readLock().lock(); // prevent engine resets while closing @@ -4443,6 +4448,7 @@ public void afterRefresh(boolean didRefresh) { public void resetEngine(Consumer postResetNewEngineConsumer) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); + assert assertNoEngineResetLock(); Engine previousEngine = null; try { synchronized (engineMutex) { @@ -4489,6 +4495,7 @@ public void resetEngine(Consumer postResetNewEngineConsumer) { */ void rollbackEngineToGlobalCheckpoint() throws IOException { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; + assert assertNoEngineResetLock(); assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "engine rollback without blocking operations; active operations are [" + getActiveOperationsCount() + ']'; sync(); // persist the global checkpoint to disk @@ -4517,6 +4524,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED ) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { + assert assertNoEngineResetLock(); synchronized (engineMutex) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); @@ -4528,6 +4536,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { @Override public IndexCommitRef acquireSafeIndexCommit() { + assert assertNoEngineResetLock(); synchronized (engineMutex) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); @@ -4539,6 +4548,7 @@ public IndexCommitRef acquireSafeIndexCommit() { @Override public void close() throws IOException { Engine newEngine; + assert assertNoEngineResetLock(); synchronized (engineMutex) { newEngine = newEngineReference.get(); if (newEngine == getCurrentEngine(true)) { @@ -4692,4 +4702,16 @@ public void ensureMutable(ActionListener listener, boolean permitAcquired) EngineResetLock getEngineResetLock() { return engineResetLock; } + + private boolean assertNoEngineResetLock() { + assert engineResetLock.isReadLockedByCurrentThread() + : "Expected current thread [" + + Thread.currentThread() + + "] to not hold an engine read lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; + assert engineResetLock.isWriteLockedByCurrentThread() + : "Expected current thread [" + + Thread.currentThread() + + "] to not hold the engine write lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; + return true; + } } From 2b9de63b4b3768351bc54632f3e8e5b80a4831f6 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 14:30:18 +0200 Subject: [PATCH 09/21] readLock and comment --- .../elasticsearch/index/shard/IndexShard.java | 102 ++++++++---------- 1 file changed, 47 insertions(+), 55 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d2f0027cc0ea7..ca535b1292fde 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1815,7 +1815,9 @@ public CacheHelper getReaderCacheHelper() { public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { assert assertNoEngineResetLock(); synchronized (engineMutex) { - engineResetLock.readLock().lock(); // prevent engine resets while closing + // The engineMutex prevents any other engine changes (like reseting the engine) to run concurrently, so we acquire the engine + // read lock here just to respect the lock ordering (engineMutex -> engineResetLock -> mutex). + engineResetLock.readLock().lock(); try { try { synchronized (mutex) { @@ -1981,12 +1983,7 @@ private void doLocalRecovery( assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; assert assertNoEngineResetLock(); synchronized (engineMutex) { - engineResetLock.readLock().lock(); // prevent engine resets while closing - try { - IOUtils.close(getAndSetCurrentEngine(null)); - } finally { - engineResetLock.readLock().unlock(); - } + IOUtils.close(getAndSetCurrentEngine(null)); } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -4509,62 +4506,57 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); synchronized (engineMutex) { verifyNotClosed(); - engineResetLock.readLock().lock(); // prevent engine resets during rollback - try { - // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, - // acquireXXXCommit and close works. - final Engine readOnlyEngine = new ReadOnlyEngine( - newEngineConfig(replicationTracker), - seqNoStats, - translogStats, - false, - Function.identity(), - true, - false - ) { - @Override - public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - assert assertNoEngineResetLock(); - synchronized (engineMutex) { - if (newEngineReference.get() == null) { - throw new AlreadyClosedException("engine was closed"); - } - // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay - return newEngineReference.get().acquireLastIndexCommit(false); + // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, + // acquireXXXCommit and close works. + final Engine readOnlyEngine = new ReadOnlyEngine( + newEngineConfig(replicationTracker), + seqNoStats, + translogStats, + false, + Function.identity(), + true, + false + ) { + @Override + public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { + assert assertNoEngineResetLock(); + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); } + // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay + return newEngineReference.get().acquireLastIndexCommit(false); } + } - @Override - public IndexCommitRef acquireSafeIndexCommit() { - assert assertNoEngineResetLock(); - synchronized (engineMutex) { - if (newEngineReference.get() == null) { - throw new AlreadyClosedException("engine was closed"); - } - return newEngineReference.get().acquireSafeIndexCommit(); + @Override + public IndexCommitRef acquireSafeIndexCommit() { + assert assertNoEngineResetLock(); + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); } + return newEngineReference.get().acquireSafeIndexCommit(); } + } - @Override - public void close() throws IOException { - Engine newEngine; - assert assertNoEngineResetLock(); - synchronized (engineMutex) { - newEngine = newEngineReference.get(); - if (newEngine == getCurrentEngine(true)) { - // we successfully installed the new engine so do not close it. - newEngine = null; - } + @Override + public void close() throws IOException { + Engine newEngine; + assert assertNoEngineResetLock(); + synchronized (engineMutex) { + newEngine = newEngineReference.get(); + if (newEngine == getCurrentEngine(true)) { + // we successfully installed the new engine so do not close it. + newEngine = null; } - IOUtils.close(super::close, newEngine); } - }; - IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); - newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); - onNewEngine(newEngineReference.get()); - } finally { - engineResetLock.readLock().unlock(); - } + IOUtils.close(super::close, newEngine); + } + }; + IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); + newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); + onNewEngine(newEngineReference.get()); } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, From 6323e32ab4c0c6a837227f5a5944024ba79400ac Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 14:33:19 +0200 Subject: [PATCH 10/21] move EngineResetLock --- .../src/main/java/org/elasticsearch/index/engine/Engine.java | 1 + .../java/org/elasticsearch/index/engine/EngineConfig.java | 1 + .../index/{engine => shard}/EngineResetLock.java | 5 +++-- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 1 - .../org/elasticsearch/index/shard/RefreshListenersTests.java | 1 - .../java/org/elasticsearch/index/engine/EngineTestCase.java | 1 + .../xpack/ccr/index/engine/FollowingEngineTests.java | 2 +- 7 files changed, 7 insertions(+), 5 deletions(-) rename server/src/main/java/org/elasticsearch/index/{engine => shard}/EngineResetLock.java (96%) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e57574c23d264..4baa6b6eb355f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -77,6 +77,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DenseVectorStats; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardFieldStats; import org.elasticsearch.index.shard.ShardId; diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 86c79bf99a373..6137aed83ec7b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogConfig; diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java b/server/src/main/java/org/elasticsearch/index/shard/EngineResetLock.java similarity index 96% rename from server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java rename to server/src/main/java/org/elasticsearch/index/shard/EngineResetLock.java index 3db12e28521b1..bf65e08095f90 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java +++ b/server/src/main/java/org/elasticsearch/index/shard/EngineResetLock.java @@ -7,7 +7,7 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.index.engine; +package org.elasticsearch.index.shard; import org.elasticsearch.core.Assertions; @@ -84,7 +84,8 @@ public int getReadLockCount() { /** * See {@link ReentrantReadWriteLock#getQueuedWriterThreads()} */ - public Collection getQueuedWriterThreads() { + // package-private for tests + Collection getQueuedWriterThreads() { if (lock instanceof QueuedWriterThreadsReentrantReadWriteLock queuedLock) { return queuedLock.queuedWriterThreads(); } else { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ca535b1292fde..43a3179f04e50 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -91,7 +91,6 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; -import org.elasticsearch.index.engine.EngineResetLock; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index a9aae887d7d3b..6367a33318abc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineResetLock; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 5a1784bf531c5..7ff65f0dc0c51 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -91,6 +91,7 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.shard.SearcherHelper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 61f6833c1f04a..db4e0d20cdbfd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -32,7 +32,7 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineResetLock; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; From e79c19a8b88af6c9bdb0234abab8717f45cf5343 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 7 Apr 2025 12:40:16 +0000 Subject: [PATCH 11/21] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/index/shard/IndexShard.java | 8 ++++---- .../xpack/ccr/index/engine/FollowingEngineTests.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 43a3179f04e50..9369ca2877c68 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4697,12 +4697,12 @@ EngineResetLock getEngineResetLock() { private boolean assertNoEngineResetLock() { assert engineResetLock.isReadLockedByCurrentThread() : "Expected current thread [" - + Thread.currentThread() - + "] to not hold an engine read lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; + + Thread.currentThread() + + "] to not hold an engine read lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; assert engineResetLock.isWriteLockedByCurrentThread() : "Expected current thread [" - + Thread.currentThread() - + "] to not hold the engine write lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; + + Thread.currentThread() + + "] to not hold the engine write lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; return true; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index db4e0d20cdbfd..b6ced318c1699 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; @@ -44,6 +43,7 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; From 24796c2de4c3a43e32277b03f3c40fecc03b8ecb Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 16:14:23 +0200 Subject: [PATCH 12/21] == false --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9369ca2877c68..6c2b53b506112 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4695,11 +4695,11 @@ EngineResetLock getEngineResetLock() { } private boolean assertNoEngineResetLock() { - assert engineResetLock.isReadLockedByCurrentThread() + assert engineResetLock.isReadLockedByCurrentThread() == false : "Expected current thread [" + Thread.currentThread() + "] to not hold an engine read lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; - assert engineResetLock.isWriteLockedByCurrentThread() + assert engineResetLock.isWriteLockedByCurrentThread() == false : "Expected current thread [" + Thread.currentThread() + "] to not hold the engine write lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; From ea5768dc493ce323e91401d028271674124d87a8 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 16:54:23 +0200 Subject: [PATCH 13/21] remove more readlocks --- .../elasticsearch/index/shard/IndexShard.java | 30 +++++-------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6c2b53b506112..6aaee41552e7d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2216,15 +2216,10 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t synchronized (engineMutex) { assert getEngineOrNull() == null : "engine is running"; verifyNotClosed(); - engineResetLock.readLock().lock(); // prevent engine resets while closing - try { - // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). - final Engine newEngine = createEngine(config); - onNewEngine(newEngine); - getAndSetCurrentEngine(newEngine); - } finally { - engineResetLock.readLock().unlock(); - } + // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). + final Engine newEngine = createEngine(config); + onNewEngine(newEngine); + getAndSetCurrentEngine(newEngine); // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); @@ -2306,12 +2301,7 @@ public void performRecoveryRestart() throws IOException { assert assertNoEngineResetLock(); synchronized (engineMutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - engineResetLock.readLock().lock(); // prevent engine resets while closing - try { - IOUtils.close(getAndSetCurrentEngine(null)); - } finally { - engineResetLock.readLock().unlock(); - } + IOUtils.close(getAndSetCurrentEngine(null)); resetRecoveryStage(); } } @@ -3380,7 +3370,6 @@ private Engine getCurrentEngine(boolean allowNoEngine) { private Engine getAndSetCurrentEngine(Engine newEngine) { assert Thread.holdsLock(engineMutex); - assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */; return currentEngine.getAndSet(newEngine); } @@ -4545,7 +4534,7 @@ public void close() throws IOException { assert assertNoEngineResetLock(); synchronized (engineMutex) { newEngine = newEngineReference.get(); - if (newEngine == getCurrentEngine(true)) { + if (newEngine == getEngineOrNull()) { // we successfully installed the new engine so do not close it. newEngine = null; } @@ -4569,12 +4558,7 @@ public void close() throws IOException { newEngineReference.get().refresh("reset_engine"); synchronized (engineMutex) { verifyNotClosed(); - engineResetLock.readLock().lock(); - try { - IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); - } finally { - engineResetLock.readLock().unlock(); - } + IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); From 22f846d1932a4e008d48e3b2777a2fd1eeb91b28 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 17:32:55 +0200 Subject: [PATCH 14/21] remove some assertNoEngineResetLock --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6aaee41552e7d..d83ec55b843f8 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -4507,7 +4507,6 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED ) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - assert assertNoEngineResetLock(); synchronized (engineMutex) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); @@ -4519,7 +4518,6 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { @Override public IndexCommitRef acquireSafeIndexCommit() { - assert assertNoEngineResetLock(); synchronized (engineMutex) { if (newEngineReference.get() == null) { throw new AlreadyClosedException("engine was closed"); @@ -4531,7 +4529,6 @@ public IndexCommitRef acquireSafeIndexCommit() { @Override public void close() throws IOException { Engine newEngine; - assert assertNoEngineResetLock(); synchronized (engineMutex) { newEngine = newEngineReference.get(); if (newEngine == getEngineOrNull()) { From 07a855870391bc056e9594969b84870270f2e916 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 7 Apr 2025 18:20:50 +0200 Subject: [PATCH 15/21] also read lock on flushes --- .../index/engine/InternalEngine.java | 153 ++++++++++-------- 1 file changed, 82 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2040c6a04df23..3bd404a3e38a0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2209,83 +2209,94 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList throw new IllegalArgumentException(message); } final long generation; - if (flushLock.tryLock() == false) { - // if we can't get the lock right away we block if needed otherwise barf - if (waitIfOngoing == false) { - logger.trace("detected an in-flight flush, not blocking to wait for it's completion"); - listener.onResponse(FlushResult.NO_FLUSH); - return; - } - logger.trace("waiting for in-flight flush to finish"); - flushLock.lock(); - logger.trace("acquired flush lock after blocking"); - } else { - logger.trace("acquired flush lock immediately"); - } - final long startTime = System.nanoTime(); + // Acquire an engine read lock before the flush lock. If we were not acquiring a read lock here, a concurrent engine reset could + // hold the engine write lock and later be blocked waiting for the flush lock (still holding the write lock), while the current + // thread could be blocked waiting for the write lock to be released (and therefore never release the flush lock). + final var engineReadLock = engineConfig.getEngineResetLock().readLock(); + engineReadLock.lock(); try { - // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the - // newly created commit points to a different translog generation (can free translog), - // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. - boolean hasUncommittedChanges = hasUncommittedChanges(); - if (hasUncommittedChanges - || force - || shouldPeriodicallyFlush() - || getProcessedLocalCheckpoint() > Long.parseLong( - lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) - )) { - ensureCanFlush(); - Translog.Location commitLocation = getTranslogLastWriteLocation(); - try { - translog.rollGeneration(); - logger.trace("starting commit for flush; commitTranslog=true"); - long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); - // Pre-emptively recording the upcoming segment generation so that the live version map archive records - // the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise, - // if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive, - // we clear them from the archive once we see that segment generation on the search shards, but those changes - // were not included in the commit since they happened right after it. - preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1); - commitIndexWriter(indexWriter, translog); - logger.trace("finished commit for flush"); - // we need to refresh in order to clear older version values - refresh("version_table_flush", SearcherScope.INTERNAL, true); - translog.trimUnreferencedReaders(); - // Update the translog location for flushListener if (1) the writeLocation has changed during the flush and - // (2) indexWriter has committed all the changes (checks must be done in this order). - // If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended. - final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation(); - if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) { - assert writeLocationAfterFlush.compareTo(commitLocation) > 0 : writeLocationAfterFlush + " <= " + commitLocation; - commitLocation = writeLocationAfterFlush; - } - // Use the timestamp from when the flush started, but only update it in case of success, so that any exception in - // the above lines would not lead the engine to think that it recently flushed, when it did not. - this.lastFlushTimestamp = lastFlushTimestamp; - } catch (AlreadyClosedException e) { - failOnTragicEvent(e); - throw e; - } catch (Exception e) { - throw new FlushFailedEngineException(shardId, e); + if (flushLock.tryLock() == false) { + // if we can't get the lock right away we block if needed otherwise barf + if (waitIfOngoing == false) { + logger.trace("detected an in-flight flush, not blocking to wait for it's completion"); + listener.onResponse(FlushResult.NO_FLUSH); + return; } - refreshLastCommittedSegmentInfos(); - generation = lastCommittedSegmentInfos.getGeneration(); - flushListener.afterFlush(generation, commitLocation); + logger.trace("waiting for in-flight flush to finish"); + flushLock.lock(); + logger.trace("acquired flush lock after blocking"); } else { - generation = lastCommittedSegmentInfos.getGeneration(); + logger.trace("acquired flush lock immediately"); + } + + final long startTime = System.nanoTime(); + try { + // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the + // newly created commit points to a different translog generation (can free translog), + // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. + boolean hasUncommittedChanges = hasUncommittedChanges(); + if (hasUncommittedChanges + || force + || shouldPeriodicallyFlush() + || getProcessedLocalCheckpoint() > Long.parseLong( + lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) + )) { + ensureCanFlush(); + Translog.Location commitLocation = getTranslogLastWriteLocation(); + try { + translog.rollGeneration(); + logger.trace("starting commit for flush; commitTranslog=true"); + long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); + // Pre-emptively recording the upcoming segment generation so that the live version map archive records + // the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise, + // if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive, + // we clear them from the archive once we see that segment generation on the search shards, but those changes + // were not included in the commit since they happened right after it. + preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1); + commitIndexWriter(indexWriter, translog); + logger.trace("finished commit for flush"); + // we need to refresh in order to clear older version values + refresh("version_table_flush", SearcherScope.INTERNAL, true); + translog.trimUnreferencedReaders(); + // Update the translog location for flushListener if (1) the writeLocation has changed during the flush and + // (2) indexWriter has committed all the changes (checks must be done in this order). + // If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended. + final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation(); + if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) { + assert writeLocationAfterFlush.compareTo(commitLocation) > 0 + : writeLocationAfterFlush + " <= " + commitLocation; + commitLocation = writeLocationAfterFlush; + } + // Use the timestamp from when the flush started, but only update it in case of success, so that any exception in + // the above lines would not lead the engine to think that it recently flushed, when it did not. + this.lastFlushTimestamp = lastFlushTimestamp; + } catch (AlreadyClosedException e) { + failOnTragicEvent(e); + throw e; + } catch (Exception e) { + throw new FlushFailedEngineException(shardId, e); + } + refreshLastCommittedSegmentInfos(); + generation = lastCommittedSegmentInfos.getGeneration(); + flushListener.afterFlush(generation, commitLocation); + } else { + generation = lastCommittedSegmentInfos.getGeneration(); + } + } catch (FlushFailedEngineException ex) { + maybeFailEngine("flush", ex); + listener.onFailure(ex); + return; + } catch (Exception e) { + listener.onFailure(e); + return; + } finally { + totalFlushTimeExcludingWaitingOnLock.inc(System.nanoTime() - startTime); + flushLock.unlock(); + logger.trace("released flush lock"); } - } catch (FlushFailedEngineException ex) { - maybeFailEngine("flush", ex); - listener.onFailure(ex); - return; - } catch (Exception e) { - listener.onFailure(e); - return; } finally { - totalFlushTimeExcludingWaitingOnLock.inc(System.nanoTime() - startTime); - flushLock.unlock(); - logger.trace("released flush lock"); + engineReadLock.unlock(); } afterFlush(generation); From 94517c74fb20c35f0efdf765165732003aac7fca Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 8 Apr 2025 16:37:06 +0200 Subject: [PATCH 16/21] feedback --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 2f6730d1ec265..6da4b83dd45d6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5157,6 +5157,9 @@ public void testCloseShardWhileRetainingEngine() throws Exception { assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); expectThrows(AlreadyClosedException.class, () -> primary.getEngine()); assertThat(primary.getEngineOrNull(), nullValue()); + expectThrows(AlreadyClosedException.class, () -> primary.withEngine(engine -> { + throw new AssertionError("should have thrown"); + })); primary.withEngineOrNull(engine -> { assertThat(engine, nullValue()); return null; From 4901c6ad14d5357059e001fcb04bf4c5de0f3fe9 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 8 Apr 2025 14:45:37 +0000 Subject: [PATCH 17/21] [CI] Auto commit changes from spotless --- .../org/elasticsearch/index/shard/IndexShardTests.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6da4b83dd45d6..bfb24b9c487fe 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5157,9 +5157,10 @@ public void testCloseShardWhileRetainingEngine() throws Exception { assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); expectThrows(AlreadyClosedException.class, () -> primary.getEngine()); assertThat(primary.getEngineOrNull(), nullValue()); - expectThrows(AlreadyClosedException.class, () -> primary.withEngine(engine -> { - throw new AssertionError("should have thrown"); - })); + expectThrows( + AlreadyClosedException.class, + () -> primary.withEngine(engine -> { throw new AssertionError("should have thrown"); }) + ); primary.withEngineOrNull(engine -> { assertThat(engine, nullValue()); return null; From 19683eb12a118e02fd4fa641aacde269dcfd7d98 Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 8 Apr 2025 16:52:25 +0200 Subject: [PATCH 18/21] assert --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d83ec55b843f8..8dd2def14572e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1901,7 +1901,7 @@ public void prepareForIndexRecovery() { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); - assert getEngineOrNull() == null; + assert currentEngine.get() == null; } /** @@ -2214,7 +2214,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + "] but got " + getRetentionLeases(); synchronized (engineMutex) { - assert getEngineOrNull() == null : "engine is running"; + assert currentEngine.get() == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); @@ -2311,7 +2311,7 @@ public void performRecoveryRestart() throws IOException { */ public void resetRecoveryStage() { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - assert getEngineOrNull() == null; + assert currentEngine.get() == null; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } From 2411a9b08bb48089dabe83b9320cc48d9629c9f7 Mon Sep 17 00:00:00 2001 From: tlrx Date: Thu, 10 Apr 2025 11:35:47 +0200 Subject: [PATCH 19/21] onSettingsChanged --- .../org/elasticsearch/index/shard/IndexShard.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8dd2def14572e..e87c81e63631d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2640,15 +2640,12 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - engineResetLock.readLock().lock(); - try { - var engine = getCurrentEngine(true); - if (engine != null) { - engine.onSettingsChanged(); + withEngineOrNull(engineOrNull -> { + if (engineOrNull != null) { + engineOrNull.onSettingsChanged(); } - } finally { - engineResetLock.readLock().unlock(); - } + return null; + }); } /** From 93c25daf97f4925bbe830b9247ca7909d6b0aedb Mon Sep 17 00:00:00 2001 From: tlrx Date: Thu, 10 Apr 2025 11:44:53 +0200 Subject: [PATCH 20/21] sameInstance --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index bfb24b9c487fe..5c3addfce5332 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5202,9 +5202,11 @@ public void prepareForEngineReset() throws IOException { }); }); holdEngineThread.start(); - safeGet(hold); + var retainedInstance = safeGet(hold); - assertThat(shard.getEngine(), instanceOf(InternalEngine.class)); + var currentInstance = shard.getEngine(); + assertThat(currentInstance, instanceOf(InternalEngine.class)); + assertThat(currentInstance, sameInstance(retainedInstance)); final var reset = new PlainActionFuture(); final var resetEngineThread = new Thread(() -> { From 4cf414646894c41edcccccb29c4ccddb97552f1f Mon Sep 17 00:00:00 2001 From: tlrx Date: Thu, 10 Apr 2025 12:13:01 +0200 Subject: [PATCH 21/21] Revert "onSettingsChanged" This reverts commit 2411a9b08bb48089dabe83b9320cc48d9629c9f7. --- .../org/elasticsearch/index/shard/IndexShard.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e87c81e63631d..8dd2def14572e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2640,12 +2640,15 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - withEngineOrNull(engineOrNull -> { - if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); + engineResetLock.readLock().lock(); + try { + var engine = getCurrentEngine(true); + if (engine != null) { + engine.onSettingsChanged(); } - return null; - }); + } finally { + engineResetLock.readLock().unlock(); + } } /**