diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 41307b92a61a4..507724bbaeb3c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -676,7 +676,7 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti final CountDownLatch engineResetLatch = new CountDownLatch(1); shard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> { try { - shard.resetEngineToGlobalCheckpoint(); + shard.rollbackEngineToGlobalCheckpoint(); } finally { r.close(); engineResetLatch.countDown(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 0ac8c4d0b6fd4..7f654c712d055 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -84,7 +84,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineResetLock() ); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8b0e84105055c..4baa6b6eb355f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; @@ -76,6 +77,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DenseVectorStats; import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardFieldStats; import org.elasticsearch.index.shard.ShardId; @@ -1288,7 +1290,7 @@ public void externalRefresh(String source, ActionListener /** * Asynchronously refreshes the engine for new search operations to reflect the latest - * changes unless another thread is already refreshing the engine concurrently. + * changes unless another thread is already refreshing or reseting the engine concurrently. */ @Nullable public abstract void maybeRefresh(String source, ActionListener listener) throws EngineException; @@ -2371,7 +2373,7 @@ public record FlushResult(boolean flushPerformed, long generation) { } /** - * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}. + * Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine(Consumer)}. * * In general, resetting the engine should be done with care, to consider any * in-progress operations and listeners (e.g., primary term and generation listeners). @@ -2384,4 +2386,36 @@ public void prepareForEngineReset() throws IOException { public long getLastUnsafeSegmentGenerationForGets() { throw new UnsupportedOperationException("Doesn't support getting the latest segment generation"); } + + protected static > R wrapForAssertions( + R referenceManager, + EngineConfig engineConfig + ) { + if (Assertions.ENABLED) { + referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineResetLock())); + } + return referenceManager; + } + + /** + * RefreshListener that asserts that the engine read lock is held by the thread refreshing the reference. + */ + private static class AssertRefreshListenerHoldsEngineReadLock implements ReferenceManager.RefreshListener { + + private final EngineResetLock engineLock; + + private AssertRefreshListenerHoldsEngineReadLock(EngineResetLock engineLock) { + this.engineLock = Objects.requireNonNull(engineLock); + } + + @Override + public void beforeRefresh() throws IOException { + assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread(); + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 1ef42cdb922c3..6137aed83ec7b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogConfig; @@ -146,6 +147,8 @@ public Supplier retentionLeasesSupplier() { private final boolean promotableToPrimary; + private final EngineResetLock engineResetLock; + /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ @@ -177,7 +180,8 @@ public EngineConfig( LongSupplier relativeTimeInNanosSupplier, Engine.IndexCommitListener indexCommitListener, boolean promotableToPrimary, - MapperService mapperService + MapperService mapperService, + EngineResetLock engineResetLock ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -224,6 +228,7 @@ public EngineConfig( this.promotableToPrimary = promotableToPrimary; // always use compound on flush - reduces # of file-handles on refresh this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true); + this.engineResetLock = engineResetLock; } /** @@ -468,4 +473,8 @@ public boolean getUseCompoundFile() { public MapperService getMapperService() { return mapperService; } + + public EngineResetLock getEngineResetLock() { + return engineResetLock; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index acd9cec8b064d..3bd404a3e38a0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -301,8 +301,8 @@ public InternalEngine(EngineConfig engineConfig) { } externalReaderManager = createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig)); internalReaderManager = externalReaderManager.internalReaderManager; - this.internalReaderManager = internalReaderManager; - this.externalReaderManager = externalReaderManager; + this.internalReaderManager = wrapForAssertions(internalReaderManager, engineConfig); + this.externalReaderManager = wrapForAssertions(externalReaderManager, engineConfig); internalReaderManager.addListener(versionMap); this.lastUnsafeSegmentGenerationForGets = new AtomicLong(lastCommittedSegmentInfos.getGeneration()); assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; @@ -2040,7 +2040,7 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); - boolean refreshed; + boolean refreshed = false; long segmentGeneration = RefreshResult.UNKNOWN_GENERATION; try { // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way. @@ -2051,12 +2051,30 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea // the second refresh will only do the extra work we have to do for warming caches etc. ReferenceManager referenceManager = getReferenceManager(scope); long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration(); + + // The shard uses a reentrant read/write lock to guard again engine changes, a type of lock that prioritizes the threads + // waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners + // sometimes access the engine read lock, we need to ensure that they won't block if another thread is waiting for the + // engine write lock, so we acquire the read lock upfront before the refresh lock. + final var engineReadLock = engineConfig.getEngineResetLock().readLock(); + // it is intentional that we never refresh both internal / external together if (block) { - referenceManager.maybeRefreshBlocking(); - refreshed = true; + engineReadLock.lock(); + try { + referenceManager.maybeRefreshBlocking(); + refreshed = true; + } finally { + engineReadLock.unlock(); + } } else { - refreshed = referenceManager.maybeRefresh(); + if (engineReadLock.tryLock()) { + try { + refreshed = referenceManager.maybeRefresh(); + } finally { + engineReadLock.unlock(); + } + } } if (refreshed) { final ElasticsearchDirectoryReader current = referenceManager.acquire(); @@ -2191,83 +2209,94 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList throw new IllegalArgumentException(message); } final long generation; - if (flushLock.tryLock() == false) { - // if we can't get the lock right away we block if needed otherwise barf - if (waitIfOngoing == false) { - logger.trace("detected an in-flight flush, not blocking to wait for it's completion"); - listener.onResponse(FlushResult.NO_FLUSH); - return; - } - logger.trace("waiting for in-flight flush to finish"); - flushLock.lock(); - logger.trace("acquired flush lock after blocking"); - } else { - logger.trace("acquired flush lock immediately"); - } - final long startTime = System.nanoTime(); + // Acquire an engine read lock before the flush lock. If we were not acquiring a read lock here, a concurrent engine reset could + // hold the engine write lock and later be blocked waiting for the flush lock (still holding the write lock), while the current + // thread could be blocked waiting for the write lock to be released (and therefore never release the flush lock). + final var engineReadLock = engineConfig.getEngineResetLock().readLock(); + engineReadLock.lock(); try { - // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the - // newly created commit points to a different translog generation (can free translog), - // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. - boolean hasUncommittedChanges = hasUncommittedChanges(); - if (hasUncommittedChanges - || force - || shouldPeriodicallyFlush() - || getProcessedLocalCheckpoint() > Long.parseLong( - lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) - )) { - ensureCanFlush(); - Translog.Location commitLocation = getTranslogLastWriteLocation(); - try { - translog.rollGeneration(); - logger.trace("starting commit for flush; commitTranslog=true"); - long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); - // Pre-emptively recording the upcoming segment generation so that the live version map archive records - // the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise, - // if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive, - // we clear them from the archive once we see that segment generation on the search shards, but those changes - // were not included in the commit since they happened right after it. - preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1); - commitIndexWriter(indexWriter, translog); - logger.trace("finished commit for flush"); - // we need to refresh in order to clear older version values - refresh("version_table_flush", SearcherScope.INTERNAL, true); - translog.trimUnreferencedReaders(); - // Update the translog location for flushListener if (1) the writeLocation has changed during the flush and - // (2) indexWriter has committed all the changes (checks must be done in this order). - // If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended. - final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation(); - if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) { - assert writeLocationAfterFlush.compareTo(commitLocation) > 0 : writeLocationAfterFlush + " <= " + commitLocation; - commitLocation = writeLocationAfterFlush; - } - // Use the timestamp from when the flush started, but only update it in case of success, so that any exception in - // the above lines would not lead the engine to think that it recently flushed, when it did not. - this.lastFlushTimestamp = lastFlushTimestamp; - } catch (AlreadyClosedException e) { - failOnTragicEvent(e); - throw e; - } catch (Exception e) { - throw new FlushFailedEngineException(shardId, e); + if (flushLock.tryLock() == false) { + // if we can't get the lock right away we block if needed otherwise barf + if (waitIfOngoing == false) { + logger.trace("detected an in-flight flush, not blocking to wait for it's completion"); + listener.onResponse(FlushResult.NO_FLUSH); + return; } - refreshLastCommittedSegmentInfos(); - generation = lastCommittedSegmentInfos.getGeneration(); - flushListener.afterFlush(generation, commitLocation); + logger.trace("waiting for in-flight flush to finish"); + flushLock.lock(); + logger.trace("acquired flush lock after blocking"); } else { - generation = lastCommittedSegmentInfos.getGeneration(); + logger.trace("acquired flush lock immediately"); + } + + final long startTime = System.nanoTime(); + try { + // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the + // newly created commit points to a different translog generation (can free translog), + // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. + boolean hasUncommittedChanges = hasUncommittedChanges(); + if (hasUncommittedChanges + || force + || shouldPeriodicallyFlush() + || getProcessedLocalCheckpoint() > Long.parseLong( + lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) + )) { + ensureCanFlush(); + Translog.Location commitLocation = getTranslogLastWriteLocation(); + try { + translog.rollGeneration(); + logger.trace("starting commit for flush; commitTranslog=true"); + long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); + // Pre-emptively recording the upcoming segment generation so that the live version map archive records + // the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise, + // if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive, + // we clear them from the archive once we see that segment generation on the search shards, but those changes + // were not included in the commit since they happened right after it. + preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1); + commitIndexWriter(indexWriter, translog); + logger.trace("finished commit for flush"); + // we need to refresh in order to clear older version values + refresh("version_table_flush", SearcherScope.INTERNAL, true); + translog.trimUnreferencedReaders(); + // Update the translog location for flushListener if (1) the writeLocation has changed during the flush and + // (2) indexWriter has committed all the changes (checks must be done in this order). + // If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended. + final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation(); + if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) { + assert writeLocationAfterFlush.compareTo(commitLocation) > 0 + : writeLocationAfterFlush + " <= " + commitLocation; + commitLocation = writeLocationAfterFlush; + } + // Use the timestamp from when the flush started, but only update it in case of success, so that any exception in + // the above lines would not lead the engine to think that it recently flushed, when it did not. + this.lastFlushTimestamp = lastFlushTimestamp; + } catch (AlreadyClosedException e) { + failOnTragicEvent(e); + throw e; + } catch (Exception e) { + throw new FlushFailedEngineException(shardId, e); + } + refreshLastCommittedSegmentInfos(); + generation = lastCommittedSegmentInfos.getGeneration(); + flushListener.afterFlush(generation, commitLocation); + } else { + generation = lastCommittedSegmentInfos.getGeneration(); + } + } catch (FlushFailedEngineException ex) { + maybeFailEngine("flush", ex); + listener.onFailure(ex); + return; + } catch (Exception e) { + listener.onFailure(e); + return; + } finally { + totalFlushTimeExcludingWaitingOnLock.inc(System.nanoTime() - startTime); + flushLock.unlock(); + logger.trace("released flush lock"); } - } catch (FlushFailedEngineException ex) { - maybeFailEngine("flush", ex); - listener.onFailure(ex); - return; - } catch (Exception e) { - listener.onFailure(e); - return; } finally { - totalFlushTimeExcludingWaitingOnLock.inc(System.nanoTime() - startTime); - flushLock.unlock(); - logger.trace("released flush lock"); + engineReadLock.unlock(); } afterFlush(generation); diff --git a/server/src/main/java/org/elasticsearch/index/shard/EngineResetLock.java b/server/src/main/java/org/elasticsearch/index/shard/EngineResetLock.java new file mode 100644 index 0000000000000..bf65e08095f90 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/EngineResetLock.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.core.Assertions; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Reentrant read/write lock used to control accesses to a shard's engine that can be reset. + * + * Implemented as a simple wrapper around a {@link ReentrantReadWriteLock} to make it easier to add/override methods. + */ +public final class EngineResetLock implements ReadWriteLock { + + private final ReentrantReadWriteLock lock; + + public EngineResetLock() { + this.lock = Assertions.ENABLED ? new QueuedWriterThreadsReentrantReadWriteLock() : new ReentrantReadWriteLock(); + } + + @Override + public Lock writeLock() { + return lock.writeLock(); + } + + @Override + public Lock readLock() { + return lock.readLock(); + } + + /** + * See {@link ReentrantReadWriteLock#isWriteLocked()} + */ + public boolean isWriteLocked() { + return lock.isWriteLocked(); + } + + /** + * See {@link ReentrantReadWriteLock#isWriteLockedByCurrentThread()} + */ + public boolean isWriteLockedByCurrentThread() { + return lock.isWriteLockedByCurrentThread(); + } + + /** + * Returns {@code true} if the number of read locks held by any thread is greater than zero. + * This method is designed for use in monitoring system state, not for synchronization control. + * + * @return {@code true} if any thread holds a read lock and {@code false} otherwise + */ + public boolean isReadLocked() { + return lock.getReadLockCount() > 0; + } + + /** + * Returns {@code true} if the number of holds on the read lock by the current thread is greater than zero. + * This method is designed for use in monitoring system state, not for synchronization control. + * + * @return {@code true} if the number of holds on the read lock by the current thread is greater than zero, {@code false} otherwise + */ + public boolean isReadLockedByCurrentThread() { + return lock.getReadHoldCount() > 0; + } + + /** + * See {@link ReentrantReadWriteLock#getReadLockCount()} + */ + public int getReadLockCount() { + return lock.getReadLockCount(); + } + + /** + * See {@link ReentrantReadWriteLock#getQueuedWriterThreads()} + */ + // package-private for tests + Collection getQueuedWriterThreads() { + if (lock instanceof QueuedWriterThreadsReentrantReadWriteLock queuedLock) { + return queuedLock.queuedWriterThreads(); + } else { + return List.of(); + } + } + + /** + * Extends ReentrantReadWriteLock to expose the protected {@link ReentrantReadWriteLock#getQueuedWriterThreads()} method + */ + private static class QueuedWriterThreadsReentrantReadWriteLock extends ReentrantReadWriteLock { + Collection queuedWriterThreads() { + return super.getQueuedWriterThreads(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bf2e20b70b441..8dd2def14572e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,6 +44,8 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lucene.Lucene; @@ -151,6 +153,7 @@ import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -243,8 +246,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // ensure happens-before relation between addRefreshListener() and postRecovery() private volatile SubscribableListener postRecoveryComplete; private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex - private final AtomicReference currentEngineReference = new AtomicReference<>(); + + // mutex for creating/closing the engine + private final Object engineMutex = new Object(); // lock ordering: engineMutex -> engineResetLock -> mutex + // read/write lock for reseting the engine + private final EngineResetLock engineResetLock = new EngineResetLock(); + // reference to the current engine + private final AtomicReference currentEngine = new AtomicReference<>(); // must be accessed holding engineResetLock final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -1643,13 +1651,11 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { Engine.IndexCommitRef indexCommit = null; store.incRef(); try { + assert assertNoEngineResetLock(); synchronized (engineMutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. - final Engine engine = getEngineOrNull(); - if (engine != null) { - indexCommit = engine.acquireLastIndexCommit(false); - } + indexCommit = withEngineOrNull(engine -> engine != null ? engine.acquireLastIndexCommit(false) : null); if (indexCommit == null) { return store.getMetadata(null, true); } @@ -1806,39 +1812,47 @@ public CacheHelper getReaderCacheHelper() { } public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { + assert assertNoEngineResetLock(); synchronized (engineMutex) { + // The engineMutex prevents any other engine changes (like reseting the engine) to run concurrently, so we acquire the engine + // read lock here just to respect the lock ordering (engineMutex -> engineResetLock -> mutex). + engineResetLock.readLock().lock(); try { - synchronized (mutex) { - changeState(IndexShardState.CLOSED, reason); - } - checkAndCallWaitForEngineOrClosedShardListeners(); - } finally { - final Engine engine = this.currentEngineReference.getAndSet(null); - closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { - @Override - public void run() throws Exception { - try { - if (engine != null && flushEngine) { - engine.flushAndClose(); + try { + synchronized (mutex) { + changeState(IndexShardState.CLOSED, reason); + } + checkAndCallWaitForEngineOrClosedShardListeners(); + } finally { + final Engine engine = getAndSetCurrentEngine(null); + closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() { + @Override + public void run() throws Exception { + try { + if (engine != null && flushEngine) { + engine.flushAndClose(); + } + } finally { + // playing safe here and close the engine even if the above succeeds - close can be called multiple times + // Also closing refreshListeners to prevent us from accumulating any more listeners + IOUtils.close( + engine, + globalCheckpointListeners, + refreshListeners, + pendingReplicationActions, + indexShardOperationPermits + ); } - } finally { - // playing safe here and close the engine even if the above succeeds - close can be called multiple times - // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close( - engine, - globalCheckpointListeners, - refreshListeners, - pendingReplicationActions, - indexShardOperationPermits - ); } - } - @Override - public String toString() { - return "IndexShard#close[" + shardId + "]"; - } - })); + @Override + public String toString() { + return "IndexShard#close[" + shardId + "]"; + } + })); + } + } finally { + engineResetLock.readLock().unlock(); } } } @@ -1887,7 +1901,7 @@ public void prepareForIndexRecovery() { throw new IndexShardNotRecoveringException(shardId, state); } recoveryState.setStage(RecoveryState.Stage.INDEX); - assert currentEngineReference.get() == null; + assert currentEngine.get() == null; } /** @@ -1966,8 +1980,9 @@ private void doLocalRecovery( // First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again. .newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; + assert assertNoEngineResetLock(); synchronized (engineMutex) { - IOUtils.close(currentEngineReference.getAndSet(null)); + IOUtils.close(getAndSetCurrentEngine(null)); } }, (recoveryCompleteListener, ignoredRef) -> { assert Thread.holdsLock(mutex) == false : "must not hold the mutex here"; @@ -2183,6 +2198,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException { assert Thread.holdsLock(mutex) == false : "opening engine under mutex"; + assert assertNoEngineResetLock(); if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -2198,12 +2214,12 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t + "] but got " + getRetentionLeases(); synchronized (engineMutex) { - assert currentEngineReference.get() == null : "engine is running"; + assert currentEngine.get() == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = createEngine(config); onNewEngine(newEngine); - currentEngineReference.set(newEngine); + getAndSetCurrentEngine(newEngine); // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); @@ -2282,9 +2298,10 @@ private void onNewEngine(Engine newEngine) { */ public void performRecoveryRestart() throws IOException { assert Thread.holdsLock(mutex) == false : "restart recovery under mutex"; + assert assertNoEngineResetLock(); synchronized (engineMutex) { assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners"; - IOUtils.close(currentEngineReference.getAndSet(null)); + IOUtils.close(getAndSetCurrentEngine(null)); resetRecoveryStage(); } } @@ -2294,7 +2311,7 @@ public void performRecoveryRestart() throws IOException { */ public void resetRecoveryStage() { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - assert currentEngineReference.get() == null; + assert currentEngine.get() == null; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -2623,9 +2640,14 @@ boolean shouldRollTranslogGeneration() { } public void onSettingsChanged() { - Engine engineOrNull = getEngineOrNull(); - if (engineOrNull != null) { - engineOrNull.onSettingsChanged(); + engineResetLock.readLock().lock(); + try { + var engine = getCurrentEngine(true); + if (engine != null) { + engine.onSettingsChanged(); + } + } finally { + engineResetLock.readLock().unlock(); } } @@ -3316,11 +3338,12 @@ private void doCheckIndex() throws IOException { } Engine getEngine() { - Engine engine = getEngineOrNull(); - if (engine == null) { - throw new AlreadyClosedException("engine is closed"); + engineResetLock.readLock().lock(); + try { + return getCurrentEngine(false); + } finally { + engineResetLock.readLock().unlock(); } - return engine; } /** @@ -3328,7 +3351,80 @@ Engine getEngine() { * closed. */ public Engine getEngineOrNull() { - return this.currentEngineReference.get(); + engineResetLock.readLock().lock(); + try { + return getCurrentEngine(true); + } finally { + engineResetLock.readLock().unlock(); + } + } + + private Engine getCurrentEngine(boolean allowNoEngine) { + assert engineResetLock.isReadLockedByCurrentThread() || engineResetLock.isWriteLockedByCurrentThread() /* for resets */; + var engine = currentEngine.get(); + if (engine == null && allowNoEngine == false) { + throw new AlreadyClosedException("engine is closed"); + } + return engine; + } + + private Engine getAndSetCurrentEngine(Engine newEngine) { + assert Thread.holdsLock(engineMutex); + return currentEngine.getAndSet(newEngine); + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution. + * The operation might be executed with a {@code null} engine instance. The engine might be closed while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + */ + public R withEngineOrNull(Function operation) { + return withEngine(operation, true); + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution. + * If the current engine instance is null, this method throws an {@link AlreadyClosedException} and the operation is not executed. The + * engine might be closed while the operation is executed. + * + * @param operation the operation to execute + * @return the result of the operation + * @param the type of the result + * @throws AlreadyClosedException if the current engine instance is {@code null}. + */ + public R withEngine(Function operation) { + return withEngine(operation, false); + } + + /** + * Executes an operation while preventing the shard's engine instance to be reset during the execution + * (see {@link #resetEngine(Consumer)}. + * NOTE: It does not prevent the engine to be closed by {@link #close(String, boolean, Executor, ActionListener)} though. + * The parameter {@code allowNoEngine} is used to allow the operation to be executed when the current engine instance is {@code null}. + * When {@code allowNoEngine} is set to {@code `false`} the method will throw an {@link AlreadyClosedException} if the current engine + * instance is {@code null}. + * + * @param operation the operation to execute + * @param allowNoEngine if the operation can be executed even if the current engine instance is {@code null} + * @return the result of the operation + * @param the type of the result + */ + private R withEngine(Function operation, boolean allowNoEngine) { + assert ClusterApplierService.assertNotClusterStateUpdateThread("IndexShard.withEngine() can block"); + assert MasterService.assertNotMasterUpdateThread("IndexShard.withEngine() can block"); + assert Transports.assertNotTransportThread("IndexShard.withEngine() can block"); + assert operation != null; + + engineResetLock.readLock().lock(); + try { + var engine = getCurrentEngine(allowNoEngine); + return operation.apply(engine); + } finally { + engineResetLock.readLock().unlock(); + } } public void startRecovery( @@ -3600,7 +3696,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { relativeTimeInNanosSupplier, indexCommitListener, routingEntry().isPromotableToPrimary(), - mapperService() + mapperService(), + engineResetLock ); } @@ -3879,7 +3976,7 @@ private void innerAcquireReplicaOperationPermit( maxSeqNo ); if (currentGlobalCheckpoint < maxSeqNo) { - resetEngineToGlobalCheckpoint(); + rollbackEngineToGlobalCheckpoint(); } else { getEngine().rollTranslogGeneration(); } @@ -4326,32 +4423,66 @@ public void afterRefresh(boolean didRefresh) { * * In general, resetting the engine should be done with care, to consider any in-progress operations and listeners. * At the moment, this is implemented in serverless for a special case that ensures the engine is prepared for reset. + * Reseting the engine can prevent non-blocking engine refreshes (see {@link Engine#maybeRefresh(String, ActionListener)} to be + * immediately executed, so it is expected that the new engine instance provides refreshed readers (if supported) after the reset. + * + * @param postResetNewEngineConsumer A consumer that will be called with the newly created engine after the reset + * is complete, allowing for post-reset operations on the new engine instance. + * The provided engine reference should not be retained by the consumer. */ - public void resetEngine() { + public void resetEngine(Consumer postResetNewEngineConsumer) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); + assert assertNoEngineResetLock(); + Engine previousEngine = null; try { synchronized (engineMutex) { verifyNotClosed(); - getEngine().prepareForEngineReset(); - var newEngine = createEngine(newEngineConfig(replicationTracker)); - IOUtils.close(currentEngineReference.getAndSet(newEngine)); - onNewEngine(newEngine); + try { + engineResetLock.writeLock().lock(); + try { + var engine = getCurrentEngine(false); + engine.prepareForEngineReset(); + var newEngine = createEngine(newEngineConfig(replicationTracker)); + getAndSetCurrentEngine(newEngine); + onNewEngine(newEngine); + postResetNewEngineConsumer.accept(newEngine); + previousEngine = engine; + } finally { + if (previousEngine != null) { + // Downgrade to read lock for closing the engine + engineResetLock.readLock().lock(); + } + engineResetLock.writeLock().unlock(); + } + } catch (Exception e) { + // we want to fail the shard in the case prepareForEngineReset throws + failShard("unable to reset engine", e); + } } onSettingsChanged(); - } catch (Exception e) { - // we want to fail the shard in the case prepareForEngineReset throws - failShard("unable to reset engine", e); + } finally { + if (previousEngine != null) { + assert engineResetLock.isReadLockedByCurrentThread(); + try { + IOUtils.close(previousEngine); + } catch (Exception e) { + failShard("unable to close previous engine after reset", e); + } finally { + engineResetLock.readLock().unlock(); + } + } } } /** * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ - void resetEngineToGlobalCheckpoint() throws IOException { + void rollbackEngineToGlobalCheckpoint() throws IOException { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; + assert assertNoEngineResetLock(); assert getActiveOperationsCount() == OPERATIONS_BLOCKED - : "resetting engine without blocking operations; active operations are [" + getActiveOperationsCount() + ']'; + : "engine rollback without blocking operations; active operations are [" + getActiveOperationsCount() + ']'; sync(); // persist the global checkpoint to disk final SeqNoStats seqNoStats = seqNoStats(); final TranslogStats translogStats = translogStats(); @@ -4400,7 +4531,7 @@ public void close() throws IOException { Engine newEngine; synchronized (engineMutex) { newEngine = newEngineReference.get(); - if (newEngine == currentEngineReference.get()) { + if (newEngine == getEngineOrNull()) { // we successfully installed the new engine so do not close it. newEngine = null; } @@ -4408,7 +4539,7 @@ public void close() throws IOException { IOUtils.close(super::close, newEngine); } }; - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + IOUtils.close(getAndSetCurrentEngine(readOnlyEngine)); newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } @@ -4424,7 +4555,7 @@ public void close() throws IOException { newEngineReference.get().refresh("reset_engine"); synchronized (engineMutex) { verifyNotClosed(); - IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get())); + IOUtils.close(getAndSetCurrentEngine(newEngineReference.get())); // We set active because we are now writing operations to the engine; this way, // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. active.set(true); @@ -4538,4 +4669,21 @@ public void ensureMutable(ActionListener listener, boolean permitAcquired) l.onResponse(null); })); } + + // package-private for tests + EngineResetLock getEngineResetLock() { + return engineResetLock; + } + + private boolean assertNoEngineResetLock() { + assert engineResetLock.isReadLockedByCurrentThread() == false + : "Expected current thread [" + + Thread.currentThread() + + "] to not hold an engine read lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; + assert engineResetLock.isWriteLockedByCurrentThread() == false + : "Expected current thread [" + + Thread.currentThread() + + "] to not hold the engine write lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)"; + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e1128a329023a..bc63ef763ec57 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3612,7 +3612,8 @@ public void testRecoverFromForeignTranslog() throws IOException { config.getRelativeTimeInNanosSupplier(), null, true, - config.getMapperService() + config.getMapperService(), + config.getEngineResetLock() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -7175,7 +7176,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineResetLock() ); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5d689804f9cbb..5c3addfce5332 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,6 +18,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; @@ -67,6 +68,7 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -187,11 +189,13 @@ import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.containsStringIgnoringCase; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; @@ -4514,7 +4518,7 @@ public void testSupplyTombstoneDoc() throws Exception { closeShards(shard); } - public void testResetEngineToGlobalCheckpoint() throws Exception { + public void testRollbackEngineToGlobalCheckpoint() throws Exception { IndexShard shard = newStartedShard(false); indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); long maxSeqNoBeforeRollback = shard.seqNoStats().getMaxSeqNo(); @@ -4555,7 +4559,7 @@ public void testResetEngineToGlobalCheckpoint() throws Exception { final CountDownLatch engineResetLatch = new CountDownLatch(1); shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), globalCheckpoint, 0L, ActionListener.wrap(r -> { try { - shard.resetEngineToGlobalCheckpoint(); + shard.rollbackEngineToGlobalCheckpoint(); } finally { r.close(); engineResetLatch.countDown(); @@ -4594,7 +4598,7 @@ public void prepareForEngineReset() throws IOException {} var onAcquired = new PlainActionFuture(); indexShard.acquireAllPrimaryOperationsPermits(onAcquired, TimeValue.timeValueMinutes(1L)); try (var permits = safeGet(onAcquired)) { - indexShard.resetEngine(); + indexShard.resetEngine(newEngine -> {}); } safeAwait(newEngineCreated); safeAwait(newEngineNotification); @@ -4603,9 +4607,9 @@ public void prepareForEngineReset() throws IOException {} /** * This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. Closing a shard while engine is inside - * resetEngineToGlobalCheckpoint can lead to check index failure in integration tests. + * rollbackEngineToGlobalCheckpoint can lead to check index failure in integration tests. */ - public void testCloseShardWhileResettingEngine() throws Exception { + public void testCloseShardWhileRollbackEngine() throws Exception { CountDownLatch readyToCloseLatch = new CountDownLatch(1); CountDownLatch closeDoneLatch = new CountDownLatch(1); IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { @@ -4643,7 +4647,7 @@ public void recoverFromTranslog( 0L, ActionListener.wrap(r -> { try (r) { - shard.resetEngineToGlobalCheckpoint(); + shard.rollbackEngineToGlobalCheckpoint(); } finally { engineResetLatch.countDown(); } @@ -4661,9 +4665,9 @@ public void recoverFromTranslog( /** * This test simulates a scenario seen rarely in ConcurrentSeqNoVersioningIT. While engine is inside - * resetEngineToGlobalCheckpoint snapshot metadata could fail + * rollbackEngineToGlobalCheckpoint snapshot metadata could fail */ - public void testSnapshotWhileResettingEngine() throws Exception { + public void testSnapshotWhileRollbackEngine() throws Exception { CountDownLatch readyToSnapshotLatch = new CountDownLatch(1); CountDownLatch snapshotDoneLatch = new CountDownLatch(1); IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { @@ -4710,7 +4714,7 @@ public void recoverFromTranslog( 0L, ActionListener.wrap(r -> { try (r) { - shard.resetEngineToGlobalCheckpoint(); + shard.rollbackEngineToGlobalCheckpoint(); } finally { engineResetLatch.countDown(); } @@ -5059,7 +5063,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineResetLock() ); return new InternalEngine(configWithWarmer); }); @@ -5094,6 +5099,350 @@ public void testRecordsForceMerges() throws IOException { closeShards(shard); } + public void testCloseShardWhileRetainingEngine() throws Exception { + final var primary = newStartedShard(true); + try { + final var hold = new PlainActionFuture(); + final var close = new PlainActionFuture(); + final var release = new CountDownLatch(1); + + final var holdEngineThread = new Thread(() -> { + primary.withEngine(engine -> { + assertThat(engine, notNullValue()); + EngineTestCase.ensureOpen(engine); + hold.onResponse(engine); + + safeGet(close); + + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + expectThrows(AlreadyClosedException.class, () -> primary.getEngine()); + assertThat(primary.getEngineOrNull(), nullValue()); + + safeAwait(release); + return null; + }); + }); + holdEngineThread.start(); + + final var retainedInstance = asInstanceOf(InternalEngine.class, safeGet(hold)); + assertSame(retainedInstance, primary.getEngine()); + assertSame(retainedInstance, primary.getEngineOrNull()); + assertThat(primary.state(), equalTo(IndexShardState.STARTED)); + primary.withEngineOrNull(engine -> { + assertSame(retainedInstance, engine); + EngineTestCase.ensureOpen(engine); + return null; + }); + + final var closeEngineThread = new Thread(() -> { + try { + safeGet(hold); + + assertThat(primary.getEngineResetLock().isReadLocked(), equalTo(true)); + + closeShardNoCheck(primary); + + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + expectThrows(AlreadyClosedException.class, () -> primary.getEngine()); + assertThat(primary.getEngineOrNull(), nullValue()); + + close.onResponse(null); + } catch (IOException e) { + close.onFailure(e); + } + }); + closeEngineThread.start(); + safeGet(close); + + assertThat(primary.state(), equalTo(IndexShardState.CLOSED)); + expectThrows(AlreadyClosedException.class, () -> primary.getEngine()); + assertThat(primary.getEngineOrNull(), nullValue()); + expectThrows( + AlreadyClosedException.class, + () -> primary.withEngine(engine -> { throw new AssertionError("should have thrown"); }) + ); + primary.withEngineOrNull(engine -> { + assertThat(engine, nullValue()); + return null; + }); + + closeEngineThread.join(); + release.countDown(); + holdEngineThread.join(); + } finally { + IOUtils.close(primary.store()); + } + } + + public void testResetEngineWhileRetainingEngine() throws Exception { + final var preparedForReset = new AtomicBoolean(); + final var shard = newStartedShard(true, Settings.EMPTY, config -> { + if (preparedForReset.get()) { + return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true); + } else { + return new InternalEngine(config) { + @Override + public void prepareForEngineReset() throws IOException { + assertTrue(preparedForReset.compareAndSet(false, true)); + } + }; + } + }); + final var engineResetLock = shard.getEngine().getEngineConfig().getEngineResetLock(); + + final var release = new CountDownLatch(1); + final var hold = new PlainActionFuture(); + final var holdEngineThread = new Thread(() -> { + shard.withEngine(engine -> { + assertThat(engine, notNullValue()); + EngineTestCase.ensureOpen(engine); + hold.onResponse(engine); + safeAwait(release); + return null; + }); + }); + holdEngineThread.start(); + var retainedInstance = safeGet(hold); + + var currentInstance = shard.getEngine(); + assertThat(currentInstance, instanceOf(InternalEngine.class)); + assertThat(currentInstance, sameInstance(retainedInstance)); + + final var reset = new PlainActionFuture(); + final var resetEngineThread = new Thread(() -> { + try { + safeGet(hold); + shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> { + try (permit) { + shard.resetEngine(newEngine -> { + assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true)); + assertThat(newEngine, instanceOf(ReadOnlyEngine.class)); + }); + assertThat(preparedForReset.get(), equalTo(true)); + l.onResponse(null); + } + }), EsExecutors.DIRECT_EXECUTOR_SERVICE); + } catch (Exception e) { + reset.onFailure(e); + } + }); + resetEngineThread.start(); + + assertBusy(() -> assertThat(engineResetLock.getQueuedWriterThreads(), hasItem(resetEngineThread))); + assertThat(engineResetLock.isReadLocked(), equalTo(true)); + + release.countDown(); + safeGet(reset); + + assertThat(preparedForReset.get(), equalTo(true)); + + holdEngineThread.join(); + resetEngineThread.join(); + + closeShards(shard); + } + + public void testReentrantEngineReadLockAcquisitionInRefreshListener() throws Exception { + final var lazyShard = new AtomicReference(); + final var lazyEngineConfig = new AtomicReference(); + + final var refreshStarted = new CountDownLatch(1); + final var blockRefresh = new AtomicBoolean(); + final var unblockRefresh = new CountDownLatch(1); + + final var getFromTranslogStarted = new CountDownLatch(1); + final var getFromTranslogResult = new PlainActionFuture(); + + final var resetStarted = new CountDownLatch(1); + + // Refresh listener that blocks on purpose (so it holds the refresh lock) and acquires the engine read lock in a reentrant manner + final var blockingRefreshListener = new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() throws IOException { + if (blockRefresh.get()) { + try { + var shard = lazyShard.get(); + assertThat(shard, notNullValue()); + + // Asserts that the refresh is triggered by the test and not something else + assertThat(Thread.currentThread().toString(), containsStringIgnoringCase(getTestClass().getSimpleName())); + + // Asserts the current thread holds the engine read lock + var engineResetLock = lazyEngineConfig.get().getEngineResetLock(); + assertThat(engineResetLock.isReadLockedByCurrentThread(), equalTo(true)); + + refreshStarted.countDown(); + safeAwait(getFromTranslogStarted); + + // A this stage, getThread is blocked on the refresh held by the current thread + assertBusy(() -> assertThat(engineResetLock.getReadLockCount(), greaterThanOrEqualTo(2))); + assertThat(getFromTranslogResult.isDone(), equalTo(false)); + + // Waits for the resetThread + safeAwait(resetStarted); + + // The resetThread waits for the engine write lock, blocking new non-reentrant engine read lock acquisitions + assertBusy(() -> assertThat(engineResetLock.getQueuedWriterThreads(), hasSize(1))); + + // Ensure that accessing the engine from a refresh listener works, even if another thread (like resetThread) is + // waiting for the engine write lock. If we were not acquiring the engine read lock when refreshing the reader, + // we would deadlock here. + var localCheckpoint = shard.withEngine(engine -> engine.getProcessedLocalCheckpoint()); + assertThat(localCheckpoint, greaterThan(SequenceNumbers.NO_OPS_PERFORMED)); + + // Also test `getEngine` + var internalEngine = asInstanceOf(InternalEngine.class, shard.getEngine()); + assertThat(internalEngine.getTranslogStats().getUncommittedOperations(), equalTo(1)); + + // Don't block refresh again (it will flush and refresh later in prepareEngineForReset) + blockRefresh.set(false); + + safeAwait(unblockRefresh); + } catch (Exception e) { + throw new AssertionError(e); + } + } + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException {} + }; + + final var preparedForReset = new AtomicBoolean(); + final var shard = newShard(true, Settings.EMPTY, config -> { + if (preparedForReset.get()) { + return new ReadOnlyEngine(config, null, new TranslogStats(), false, Function.identity(), true, true); + } else { + var internalRefreshListeners = new ArrayList(); + internalRefreshListeners.add(blockingRefreshListener); + internalRefreshListeners.addAll(config.getInternalRefreshListener()); + + var engineConfigWithBlockingRefreshListener = new EngineConfig( + config.getShardId(), + config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), + config.getIndexSettings(), + config.getWarmer(), + config.getStore(), + config.getMergePolicy(), + config.getAnalyzer(), + config.getSimilarity(), + new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE), + config.getEventListener(), + config.getQueryCache(), + config.getQueryCachingPolicy(), + config.getTranslogConfig(), + config.getFlushMergesAfter(), + config.getExternalRefreshListener(), + internalRefreshListeners, + config.getIndexSort(), + config.getCircuitBreakerService(), + config.getGlobalCheckpointSupplier(), + config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + config.getLeafSorter(), + config.getRelativeTimeInNanosSupplier(), + config.getIndexCommitListener(), + config.isPromotableToPrimary(), + config.getMapperService(), + config.getEngineResetLock() + ); + lazyEngineConfig.set(engineConfigWithBlockingRefreshListener); + return new InternalEngine(engineConfigWithBlockingRefreshListener) { + @Override + public void prepareForEngineReset() throws IOException { + flush(true, true); + assertTrue(preparedForReset.compareAndSet(false, true)); + } + }; + } + }); + try { + recoverShardFromStore(shard); + blockRefresh.set(true); + lazyShard.set(shard); + + // Index a doc with an auto-generated idea makes the version map unsafe, and the realtime get will have to refresh + var index = indexDoc(shard, "_doc", null /* auto-generated id */); + assertThat(index.isCreated(), equalTo(true)); + + // Trigger a refresh + var refreshThread = new Thread(() -> shard.refresh("test")); + refreshThread.start(); + + // Wait for the refresh listener to hold the resfresh lock and the engine read lock + safeAwait(refreshStarted); + + // While refresh is blocked holding the locks, triggers a getFromTranslog() that will refresh-blocking in another thread + var getThread = new Thread(() -> { + shard.withEngine(engine -> { + getFromTranslogStarted.countDown(); + try ( + // Will block on the refresh lock + var getResult = engine.get( + new Engine.Get(true, false, index.getId()), + shard.mapperService().mappingLookup(), + shard.mapperService().documentParser(), + searcher -> searcher + ) + ) { + assertThat(getResult, notNullValue()); + getFromTranslogResult.onResponse(getResult.exists()); + return null; + } + }); + }); + getThread.start(); + + final var engineResetLock = lazyEngineConfig.get().getEngineResetLock(); + safeAwait(getFromTranslogStarted); + + // Resets the engine to have a thread waiting for the engine write lock (this will block non-reentrant read lock acquisitions) + final var reset = new PlainActionFuture(); + final var resetEngineThread = new Thread(() -> { + resetStarted.countDown(); + try { + shard.acquirePrimaryOperationPermit(reset.delegateFailure((l, permit) -> { + try (permit) { + shard.resetEngine(newEngine -> { + assertThat(newEngine.getEngineConfig().getEngineResetLock(), sameInstance(engineResetLock)); + assertThat(engineResetLock.isWriteLockedByCurrentThread(), equalTo(true)); + assertThat(newEngine, instanceOf(ReadOnlyEngine.class)); + assertThat(getFromTranslogResult.isDone(), equalTo(true)); + }); + assertThat(preparedForReset.get(), equalTo(true)); + l.onResponse(null); + } + }), EsExecutors.DIRECT_EXECUTOR_SERVICE); + } catch (Exception e) { + reset.onFailure(e); + } + }); + resetEngineThread.start(); + + safeAwait(resetStarted); + + // A this stage, getThread is blocked by refreshThread, and boths threads block resetEngineThread + assertThat(getFromTranslogResult.isDone(), equalTo(false)); + + assertBusy(() -> assertThat(engineResetLock.getReadLockCount(), greaterThanOrEqualTo(2))); + assertBusy(() -> assertThat(engineResetLock.getQueuedWriterThreads(), hasItem(resetEngineThread))); + assertThat(engineResetLock.isWriteLocked(), equalTo(false)); + assertThat(engineResetLock.isReadLocked(), equalTo(true)); + + unblockRefresh.countDown(); + + safeGet(reset); + + resetEngineThread.join(); + refreshThread.join(); + getThread.join(); + } finally { + closeShards(shard); + } + } + public void testShardExposesWriteLoadStats() throws Exception { final IndexShard primary = newStartedShard( true, diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 4e280f5443787..6367a33318abc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -166,7 +166,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { System::nanoTime, null, true, - EngineTestCase.createMapperService() + EngineTestCase.createMapperService(), + new EngineResetLock() ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index d4554df1617ee..7ff65f0dc0c51 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -91,6 +91,7 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.shard.SearcherHelper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -304,7 +305,8 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineResetLock() ); } @@ -337,7 +339,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineResetLock() ); } @@ -370,7 +373,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineResetLock() ); } @@ -875,7 +879,8 @@ public EngineConfig config( this::relativeTimeInNanos, indexCommitListener, true, - mapperService + mapperService, + new EngineResetLock() ); } @@ -916,7 +921,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getEngineResetLock() ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 957570918cde3..b6ced318c1699 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.EngineResetLock; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -273,7 +274,8 @@ public void onFailedEngine(String reason, Exception e) { System::nanoTime, null, true, - mapperService + mapperService, + new EngineResetLock() ); }