diff --git a/docs/changelog/125856.yaml b/docs/changelog/125856.yaml new file mode 100644 index 0000000000000..22dc79ae9cacb --- /dev/null +++ b/docs/changelog/125856.yaml @@ -0,0 +1,5 @@ +pr: 125856 +summary: Hold engine read lock during reader refresh +area: Engine +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 0ac8c4d0b6fd4..31c16231ec20e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -84,7 +84,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getMaybeRefreshLock() ); } diff --git a/server/src/main/java/org/elasticsearch/index/IndexWarmer.java b/server/src/main/java/org/elasticsearch/index/IndexWarmer.java index 4bd63f877acb6..ed1cc9eef9fe0 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexWarmer.java +++ b/server/src/main/java/org/elasticsearch/index/IndexWarmer.java @@ -47,7 +47,7 @@ public final class IndexWarmer { } void warm(ElasticsearchDirectoryReader reader, IndexShard shard, IndexSettings settings) { - if (shard.state() == IndexShardState.CLOSED) { + if (shard.state() == IndexShardState.CLOSED || shard.isClosing()) { return; } if (settings.isWarmerEnabled() == false) { @@ -117,6 +117,9 @@ public TerminationHandle warmReader(final IndexShard indexShard, final Elasticse for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) { executor.execute(() -> { try { + if (indexShard.isClosing()) { + return; + } final long start = System.nanoTime(); IndexFieldData.Global ifd = indexFieldDataService.getForField( fieldType, diff --git a/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java b/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java index 33a8487bb33a3..08df807b43330 100644 --- a/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java +++ b/server/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java @@ -272,7 +272,7 @@ public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, fin return TerminationHandle.NO_WAIT; } - if (loadRandomAccessFiltersEagerly == false) { + if (loadRandomAccessFiltersEagerly == false || indexShard.isClosing()) { return TerminationHandle.NO_WAIT; } @@ -291,7 +291,9 @@ public IndexWarmer.TerminationHandle warmReader(final IndexShard indexShard, fin executor.execute(() -> { try { final long start = System.nanoTime(); - getAndLoadIfNotPresent(filterToWarm, ctx); + if (indexShard.isClosing() == false) { + getAndLoadIfNotPresent(filterToWarm, ctx); + } if (indexShard.warmerService().logger().isTraceEnabled()) { indexShard.warmerService() .logger() diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 1ef42cdb922c3..a67e0961b6860 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -40,6 +40,7 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.concurrent.locks.Lock; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -146,6 +147,11 @@ public Supplier retentionLeasesSupplier() { private final boolean promotableToPrimary; + /** + * Lock to acquire before executing refreshing a reader instance. + */ + private final Lock maybeRefreshLock; + /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ @@ -177,7 +183,8 @@ public EngineConfig( LongSupplier relativeTimeInNanosSupplier, Engine.IndexCommitListener indexCommitListener, boolean promotableToPrimary, - MapperService mapperService + MapperService mapperService, + Lock maybeRefreshLock ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -224,6 +231,7 @@ public EngineConfig( this.promotableToPrimary = promotableToPrimary; // always use compound on flush - reduces # of file-handles on refresh this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true); + this.maybeRefreshLock = maybeRefreshLock; } /** @@ -468,4 +476,8 @@ public boolean getUseCompoundFile() { public MapperService getMapperService() { return mapperService; } + + public Lock getMaybeRefreshLock() { + return maybeRefreshLock; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index acd9cec8b064d..677871048a23c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2051,12 +2051,22 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea // the second refresh will only do the extra work we have to do for warming caches etc. ReferenceManager referenceManager = getReferenceManager(scope); long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration(); - // it is intentional that we never refresh both internal / external together - if (block) { - referenceManager.maybeRefreshBlocking(); - refreshed = true; - } else { - refreshed = referenceManager.maybeRefresh(); + + // acquire the engine read lock before trying to acquire the reader reference manager's refresh lock, so that the + // refresh is guaranteed to complete if it tries accessing the engine for refreshing while there is a thread waiting + // for the engine write lock. + final var maybeRefreshLock = engineConfig.getMaybeRefreshLock(); + maybeRefreshLock.lock(); + try { + // it is intentional that we never refresh both internal / external together + if (block) { + referenceManager.maybeRefreshBlocking(); + refreshed = true; + } else { + refreshed = referenceManager.maybeRefresh(); + } + } finally { + maybeRefreshLock.unlock(); } if (refreshed) { final ElasticsearchDirectoryReader current = referenceManager.acquire(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 28fd80cf5eafa..3244ad772c38f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -315,6 +315,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // the translog keeps track of the GCP, but unpromotable shards have no translog so we need to track the GCP here instead private volatile long globalCheckPointIfUnpromotable; + /** + * Indicates that the {@link #close(String, boolean, Executor, ActionListener)} has been called + */ + private final AtomicBoolean isClosing = new AtomicBoolean(); + @SuppressWarnings("this-escape") public IndexShard( final ShardRouting shardRouting, @@ -1819,8 +1824,13 @@ public CacheHelper getReaderCacheHelper() { } + public boolean isClosing() { + return isClosing.get(); + } + public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener closeListener) throws IOException { synchronized (closeMutex) { + isClosing.set(true); Engine engineOrNull = null; try { // engine reference and shard state are changed under the engine write lock @@ -1852,8 +1862,8 @@ public void close(String reason, boolean flushEngine, Executor closeExecutor, Ac @Override public void run() throws Exception { try { + assert engineLock.isWriteLockedByCurrentThread() == false : "do not close engine while holding write lock"; if (engine != null && flushEngine) { - assert engineLock.isWriteLockedByCurrentThread() == false : "do not flush under engine write lock"; engine.flushAndClose(); } } finally { @@ -3735,7 +3745,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { relativeTimeInNanosSupplier, indexCommitListener, routingEntry().isPromotableToPrimary(), - mapperService() + mapperService(), + engineLock.readLock() ); } @@ -4470,41 +4481,26 @@ public void resetEngine(Consumer postResetNewEngineConsumer) { assert Thread.holdsLock(mutex) == false : "resetting engine under mutex"; assert waitForEngineOrClosedShardListeners.isDone(); try { - engineLock.readLock().lock(); - var release = true; + Engine previousEngine = null; + engineLock.writeLock().lock(); try { verifyNotClosed(); - // Preparing the engine for reset flushes and blocks for refresh: it should not be executed under the write lock because - // another thread might already be refreshing, and the refresh listeners in that thread will need to access to the engine - // using the read lock. If we were using the write lock here, it would deadlock. currentEngine.prepareForEngineReset(); - engineLock.readLock().unlock(); - release = false; - - // Promote to write lock in order to swap engines - engineLock.writeLock().lock(); - Engine previousEngine = null; + var newEngine = createEngine(newEngineConfig(replicationTracker)); + previousEngine = getAndSetCurrentEngine(newEngine); + postResetNewEngineConsumer.accept(newEngine); + onNewEngine(newEngine); + } finally { + // Downgrade to read lock for closing the engine + engineLock.readLock().lock(); try { - - // How do we ensure that no indexing operations have been processed since prepareForEngineReset() here? We're not - // blocking all operations when resetting the engine nor we are blocking flushes or force-merges. - - assert state != IndexShardState.CLOSED : "state has changed without holding engine write lock!"; - var newEngine = createEngine(newEngineConfig(replicationTracker)); - previousEngine = getAndSetCurrentEngine(newEngine); - postResetNewEngineConsumer.accept(newEngine); - onNewEngine(newEngine); + engineLock.writeLock().unlock(); + // Some engine implementations use a references counting mechanism to avoid closing the engine until all operations + // requiring the engine to be open to run are completed (and in order to ensure this open state, the operations + // acquire a reference before running). In case an operation requires to access the engine read lock during + // execution, it is important that we don't hold the engine write lock here otherwise it might deadlock. + IOUtils.close(previousEngine); } finally { - engineLock.readLock().lock(); - try { - engineLock.writeLock().unlock(); - IOUtils.close(previousEngine); - } finally { - engineLock.readLock().unlock(); - } - } - } finally { - if (release) { engineLock.readLock().unlock(); } } @@ -4515,6 +4511,11 @@ public void resetEngine(Consumer postResetNewEngineConsumer) { } } + // Some engine implementations use a references counting mechanism to avoid closing the engine until all operations + // requiring the engine to be open to run are completed (and in order to ensure this open state, the operations + // acquire a reference before running). In case an operation requires to access the engine read lock during + // execution, it is important that we don't hold the engine write lock here otherwise it might deadlock. + /** * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e1128a329023a..24b8162750b3e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3612,7 +3612,8 @@ public void testRecoverFromForeignTranslog() throws IOException { config.getRelativeTimeInNanosSupplier(), null, true, - config.getMapperService() + config.getMapperService(), + config.getMaybeRefreshLock() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -7175,7 +7176,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getMaybeRefreshLock() ); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9e203f179e6fc..15fd79856a227 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -18,6 +18,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; @@ -67,6 +68,7 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -188,6 +190,7 @@ import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.containsStringIgnoringCase; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -5023,6 +5026,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { closeShards(readonlyShard); } + @AwaitsFix(bugUrl = "Adjust this test") public void testCloseShardWhileEngineIsWarming() throws Exception { CountDownLatch warmerStarted = new CountDownLatch(1); CountDownLatch warmerBlocking = new CountDownLatch(1); @@ -5060,7 +5064,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getMaybeRefreshLock() ); return new InternalEngine(configWithWarmer); }); @@ -5156,6 +5161,151 @@ public void testCloseShardWhileRetainingEngine() throws Exception { } } + public void testRefreshListenersExecutedWhileHoldingEngineReadLock() throws Exception { + final var recovered = new AtomicBoolean(); + final var refreshLocked = new CountDownLatch(1); + final var afterRefreshLocked = new CountDownLatch(1); + final var tryGetEngineFromRefreshListener = new AtomicReference>(); + final var getFromTranslog = new CountDownLatch(1); + + final var refreshListener = new ReferenceManager.RefreshListener() { + @Override + public void beforeRefresh() throws IOException { + if (recovered.get()) { + refreshLocked.countDown(); + assertThat(Thread.currentThread().toString(), containsStringIgnoringCase(getTestClass().getSimpleName())); + + safeAwait(getFromTranslog); + safeAwait(afterRefreshLocked, TimeValue.THIRTY_SECONDS); + + var runnable = tryGetEngineFromRefreshListener.get(); + assertThat(runnable, notNullValue()); + // Try access the engine from a refresh listener, while holding the engine read lock, blocks because another thread + // (closing thread) is waiting for the engine write lock + runnable.run(); + } + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException {} + }; + + final var shard = newShard(true, Settings.EMPTY, new InternalEngineFactory() { + @Override + public Engine newReadWriteEngine(EngineConfig config) { + var internalRefreshListeners = new ArrayList(); + internalRefreshListeners.add(refreshListener); + internalRefreshListeners.addAll(config.getInternalRefreshListener()); + + return new InternalEngine( + new EngineConfig( + config.getShardId(), + config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), + config.getIndexSettings(), + config.getWarmer(), + config.getStore(), + config.getMergePolicy(), + config.getAnalyzer(), + config.getSimilarity(), + new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE), + config.getEventListener(), + config.getQueryCache(), + config.getQueryCachingPolicy(), + config.getTranslogConfig(), + config.getFlushMergesAfter(), + config.getExternalRefreshListener(), + internalRefreshListeners, + config.getIndexSort(), + config.getCircuitBreakerService(), + config.getGlobalCheckpointSupplier(), + config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER, + config.getLeafSorter(), + config.getRelativeTimeInNanosSupplier(), + config.getIndexCommitListener(), + config.isPromotableToPrimary(), + config.getMapperService(), + config.getMaybeRefreshLock() + ) + ); + } + }); + try { + recoverShardFromStore(shard); + + var index = indexDoc(shard, "_doc", null /* auto-generated id */); + assertThat(index.isCreated(), equalTo(true)); + recovered.set(true); + + // Trigger a refresh + var refreshThread = new Thread(() -> shard.refresh("test")); + refreshThread.start(); + + // Wait for the refresh listener to hold the refresh lock + safeAwait(refreshLocked, TimeValue.THIRTY_SECONDS); + + // While refresh is blocked, trigger a getFromTranslog() that refreshes the internal reader + var getThread = new Thread(() -> { + var docExists = shard.withEngine(engine -> { + getFromTranslog.countDown(); + try ( + var getResult = engine.get( + new Engine.Get(true, false, index.getId()), + shard.mapperService().mappingLookup(), + shard.mapperService().documentParser(), + searcher -> searcher + ) + ) { + assertThat(getResult, notNullValue()); + return getResult.exists(); + } + }); + assertThat(docExists, equalTo(true)); + }); + getThread.start(); + + safeAwait(getFromTranslog); + + // Sleep a bit to allow the getThread to block on the refresh lock + safeSleep(randomLongBetween(50L, 500L)); + + final var closeShardNoCheck = new CountDownLatch(1); + // Now close the shard to acquire the engine write lock + var closingThread = new Thread(() -> { + try { + closeShardNoCheck.countDown(); + closeShardNoCheck(shard); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + closingThread.start(); + + tryGetEngineFromRefreshListener.set(() -> { + safeAwait(closeShardNoCheck); + var engine = shard.getEngine(); + assertThat(engine, notNullValue()); + }); + + safeAwait(closeShardNoCheck); + + // Sleep a bit to allow the closingThread to block on the engine write lock + safeSleep(randomLongBetween(50L, 500L)); + + afterRefreshLocked.countDown(); + + logger.info("--> deadlock"); + + closingThread.join(); + refreshThread.join(); + getThread.join(); + } finally { + IOUtils.close(shard.store()); + } + } + public void testShardExposesWriteLoadStats() throws Exception { final IndexShard primary = newStartedShard( true, diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 4e280f5443787..5351509bce7ff 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -75,6 +75,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -166,7 +167,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { System::nanoTime, null, true, - EngineTestCase.createMapperService() + EngineTestCase.createMapperService(), + new ReentrantReadWriteLock().readLock() ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index d4554df1617ee..3a7303fe26ba0 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -130,6 +130,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -304,7 +305,8 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getMaybeRefreshLock() ); } @@ -337,7 +339,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getMaybeRefreshLock() ); } @@ -370,7 +373,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getMaybeRefreshLock() ); } @@ -875,7 +879,8 @@ public EngineConfig config( this::relativeTimeInNanos, indexCommitListener, true, - mapperService + mapperService, + new ReentrantReadWriteLock().readLock() ); } @@ -916,7 +921,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat config.getRelativeTimeInNanosSupplier(), config.getIndexCommitListener(), config.isPromotableToPrimary(), - config.getMapperService() + config.getMapperService(), + config.getMaybeRefreshLock() ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 957570918cde3..2867822bcdf3a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -70,6 +70,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; @@ -273,7 +274,8 @@ public void onFailedEngine(String reason, Exception e) { System::nanoTime, null, true, - mapperService + mapperService, + new ReentrantReadWriteLock().readLock() ); }