From 06e92f8545fe669a8c67f09767548da37b9ffaca Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 22 Sep 2025 09:31:32 +0200 Subject: [PATCH 1/3] Fix deadlock in ThreadPoolMergeScheduler when a failing merge closes the IndexWriter (#134656) This change fixes a bug that causes a deadlock in the thread pool merge scheduler when a merge fails due to a tragic event. The deadlock occurs because Lucene aborts running merges when failing with a tragic event and then waits for them to complete. But those "running" merges might in fact be waiting in the Elasticsearch's thread pool merge scheduler tasks queue, or they might be waiting in the backlogged merge tasks queue because the per-shard concurrent merges count limit has been reached, or they might simply be waiting for enough disk space to be executed. In which cases the merge thread that is failing waits indefinitely. The proposed fix in this change uses the merge thread that is failing due to a tragic event to abort all other enqueued and backlogged merge tasks of the same shard, before pursuing with the closing of the IndexWriter. This way Lucene won't have to wait for any running merges as they would have all be aborted upfront. Relates ES-12664 --- docs/changelog/134656.yaml | 6 + .../index/engine/MergeWithFailureIT.java | 425 ++++++++++++++++++ .../index/engine/InternalEngine.java | 56 ++- .../ThreadPoolMergeExecutorService.java | 45 +- .../engine/ThreadPoolMergeScheduler.java | 171 ++++++- .../elasticsearch/index/shard/IndexShard.java | 6 +- .../index/engine/EngineTestCase.java | 2 +- 7 files changed, 677 insertions(+), 34 deletions(-) create mode 100644 docs/changelog/134656.yaml create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java diff --git a/docs/changelog/134656.yaml b/docs/changelog/134656.yaml new file mode 100644 index 0000000000000..e4f442d629f50 --- /dev/null +++ b/docs/changelog/134656.yaml @@ -0,0 +1,6 @@ +pr: 134656 +summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the + `IndexWriter` +area: Engine +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java new file mode 100644 index 0000000000000..e72b9a2fec2e9 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java @@ -0,0 +1,425 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FilterCodecReader; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OneMergeWrappingMergePolicy; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLockObtainFailedException; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergeSchedulerConfig; +import org.elasticsearch.index.codec.FilterDocValuesProducer; +import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; + +public class MergeWithFailureIT extends ESIntegTestCase { + + private static final String FAILING_MERGE_ON_PURPOSE = "Failing merge on purpose"; + + public static class TestPlugin extends Plugin implements EnginePlugin { + + // Number of queued merges in the thread pool. Lucene considers those as "running" and blocks waiting on them to complete in case + // of a merge failure. + private final Set pendingMerges = ConcurrentCollections.newConcurrentSet(); + + // Number of running merges in the thread pool + private final AtomicInteger runningMergesCount = new AtomicInteger(); + + // Latch to unblock merges + private final CountDownLatch runMerges = new CountDownLatch(1); + + // Reference to the ThreadPoolMergeExecutorService + private final AtomicReference threadPoolMergeExecutorServiceReference = new AtomicReference<>(); + + // This future is completed once the shard that is expected to fail has its store closed + private final PlainActionFuture shardStoreClosedListener = new PlainActionFuture<>(); + + private final boolean isDataNode; + + public TestPlugin(Settings settings) { + this.isDataNode = DiscoveryNode.hasDataRole(settings); + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (isDataNode == false) { + return Optional.of(InternalEngine::new); + } + return Optional.of( + config -> new TestEngine( + EngineTestCase.copy( + config, + new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap -> new MergePolicy.OneMerge(toWrap) { + @Override + public CodecReader wrapForMerge(CodecReader reader) { + return new FilterCodecReader(reader) { + final AtomicBoolean failOnce = new AtomicBoolean(false); + + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + @Override + public DocValuesProducer getDocValuesReader() { + return new FilterDocValuesProducer(super.getDocValuesReader()) { + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + safeAwait(runMerges, TimeValue.ONE_MINUTE); + if (failOnce.compareAndSet(false, true)) { + throw new IOException(FAILING_MERGE_ON_PURPOSE); + } + return super.getNumeric(field); + } + }; + } + }; + } + }) + ) + ) + ); + } + + private class TestEngine extends InternalEngine { + + TestEngine(EngineConfig engineConfig) { + super(engineConfig); + } + + @Override + protected ElasticsearchMergeScheduler createMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + ThreadPoolMergeExecutorService executor, + MergeMetrics metrics + ) { + threadPoolMergeExecutorServiceReference.set(Objects.requireNonNull(executor)); + return new ThreadPoolMergeScheduler(shardId, indexSettings, executor, merge -> 0L, metrics) { + + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) { + var wrapped = wrapMergeSource(mergeSource); + super.merge(wrapped, trigger); + } + + private MergeSource wrapMergeSource(MergeSource delegate) { + // Wraps the merge source to know which merges were pulled from Lucene by the IndexWriter + return new MergeSource() { + @Override + public MergePolicy.OneMerge getNextMerge() { + var merge = delegate.getNextMerge(); + if (merge != null) { + if (pendingMerges.add(merge) == false) { + throw new AssertionError("Merge already pending " + merge); + } + } + return merge; + } + + @Override + public void onMergeFinished(MergePolicy.OneMerge merge) { + delegate.onMergeFinished(merge); + } + + @Override + public boolean hasPendingMerges() { + return delegate.hasPendingMerges(); + } + + @Override + public void merge(MergePolicy.OneMerge merge) throws IOException { + runningMergesCount.incrementAndGet(); + if (pendingMerges.remove(merge) == false) { + throw new AssertionError("Pending merge not found " + merge); + } + delegate.merge(merge); + } + }; + } + + @Override + protected void handleMergeException(final Throwable exc) { + mergeException(exc); + } + }; + } + } + + @Override + public void onIndexModule(IndexModule indexModule) { + if (isDataNode) { + indexModule.addIndexEventListener(new IndexEventListener() { + @Override + public void onStoreClosed(ShardId shardId) { + shardStoreClosedListener.onResponse(null); + } + }); + } + } + + public void registerMergeEventListener(MergeEventListener listener) { + var threadPoolMergeExecutorService = Objects.requireNonNull(threadPoolMergeExecutorServiceReference.get()); + threadPoolMergeExecutorService.registerMergeEventListener(listener); + } + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class); + } + + public void testFailedMergeDeadlock() throws Exception { + internalCluster().startMasterOnlyNode(); + final int maxMergeThreads = randomIntBetween(1, 3); + final int indexMaxThreadCount = randomBoolean() ? randomIntBetween(1, 10) : Integer.MAX_VALUE; + + final var dataNode = internalCluster().startDataOnlyNode( + Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + .put("thread_pool." + ThreadPool.Names.MERGE + ".max", maxMergeThreads) + .build() + ); + + final var plugin = getTestPlugin(dataNode); + assertThat(plugin, notNullValue()); + + final var indexName = randomIdentifier(); + createIndex( + indexName, + indexSettings(1, 0).put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".name", dataNode) + .put(MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.getKey(), 1) + // when indexMaxThreadCount is small so merge tasks might be backlogged + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), indexMaxThreadCount) + // no merge throttling + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), Integer.MAX_VALUE) + .build() + ); + + final var mergesListener = new AssertingMergeEventListener(); + plugin.registerMergeEventListener(mergesListener); + + // Kick off enough merges to block the thread pool + var maxRunningThreads = Math.min(maxMergeThreads, indexMaxThreadCount); + indexDocsInManySegmentsUntil(indexName, () -> plugin.runningMergesCount.get() == maxRunningThreads); + assertThat(plugin.runningMergesCount.get(), equalTo(maxRunningThreads)); + + // Now pull more merges so they are queued in the merge thread pool, but Lucene thinks they are running + final int pendingMerges = plugin.pendingMerges.size() + randomIntBetween(1, 5); + indexDocsInManySegmentsUntil(indexName, () -> plugin.pendingMerges.size() > pendingMerges); + + var mergeThreadPool = asInstanceOf( + ThreadPoolExecutor.class, + internalCluster().clusterService(dataNode).threadPool().executor(ThreadPool.Names.MERGE) + ); + assertThat(mergeThreadPool.getActiveCount(), greaterThanOrEqualTo(maxRunningThreads)); + + // More merges in the hope to have backlogged merges + if (indexMaxThreadCount != Integer.MAX_VALUE) { + final int backloggedMerges = plugin.pendingMerges.size() + randomIntBetween(1, 5); + indexDocsInManySegmentsUntil(indexName, () -> plugin.pendingMerges.size() > backloggedMerges); + } + + // Sometime closes the shard concurrently with the tragic failure + Thread closingThread = null; + if (rarely()) { + closingThread = new Thread(() -> { + safeAwait(plugin.runMerges, TimeValue.ONE_MINUTE); + client().admin().indices().prepareClose(indexName).get(); + }); + closingThread.start(); + } + + // unblock merges, one merge will fail the IndexWriter + plugin.runMerges.countDown(); + + // Deadlock sample: + // + // "elasticsearch[node_s5][merge][T#1]@16690" tid=0x8e nid=NA waiting + // java.lang.Thread.State: WAITING + // at java.lang.Object.wait0(Object.java:-1) + // at java.lang.Object.wait(Object.java:389) + // at org.apache.lucene.index.IndexWriter.doWait(IndexWriter.java:5531) + // at org.apache.lucene.index.IndexWriter.abortMerges(IndexWriter.java:2733) + // at org.apache.lucene.index.IndexWriter.rollbackInternalNoCommi(IndexWriter.java:2488) + // at org.apache.lucene.index.IndexWriter.rollbackInternal(IndexWriter.java:2457) + // - locked <0x429a> (a java.lang.Object) + // at org.apache.lucene.index.IndexWriter.maybeCloseOnTragicEvent(IndexWriter.java:5765) + // at org.apache.lucene.index.IndexWriter.tragicEvent(IndexWriter.java:5755) + // at org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4780) + // at org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6567) + // at org.elasticsearch.index.engine.MergeWithFailureIT$TestPlugin$TestEngine$1$1.merge(MergeWithFailureIT.java:178) + // at org.elasticsearch.index.engine.ThreadPoolMergeScheduler.doMerge(ThreadPoolMergeScheduler.java:347) + // at org.elasticsearch.index.engine.ThreadPoolMergeScheduler$MergeTask.run(ThreadPoolMergeScheduler.java:459) + // at org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.runMergeTask(ThreadPoolMergeExecutorService.java:364) + + ensureRed(indexName); + + // verify that the shard store is effectively closed + assertTrue(plugin.shardStoreClosedListener.isDone()); + + if (closingThread != null) { + closingThread.join(); + } + + final var shardId = new ShardId(resolveIndex(indexName), 0); + var nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, dataNode); + try { + var shardLock = nodeEnvironment.shardLock(shardId, getTestName(), 10_000L); + shardLock.close(); + } catch (ShardLockObtainFailedException ex) { + throw new AssertionError("Shard " + shardId + " is still locked after 10 seconds", ex); + } + + // check the state of the shard + var routingTable = internalCluster().clusterService(dataNode).state().routingTable(ProjectId.DEFAULT); + var indexRoutingTable = routingTable.index(shardId.getIndex()); + var primary = asInstanceOf(IndexShardRoutingTable.class, indexRoutingTable.shard(shardId.id())).primaryShard(); + assertThat(primary.state(), equalTo(ShardRoutingState.UNASSIGNED)); + assertThat(primary.unassignedInfo(), notNullValue()); + assertThat(primary.unassignedInfo().reason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + var failure = ExceptionsHelper.unwrap(primary.unassignedInfo().failure(), IOException.class); + assertThat(failure, notNullValue()); + assertThat(failure.getMessage(), containsString(FAILING_MERGE_ON_PURPOSE)); + + // verify the number of queued, completed and aborted merges + mergesListener.verify(); + + assertAcked(indicesAdmin().prepareDelete(indexName)); + } + + private void indexDocsInManySegmentsUntil(String indexName, Supplier stopCondition) { + indexDocsInManySegmentsUntil(indexName, stopCondition, TimeValue.THIRTY_SECONDS); + } + + private void indexDocsInManySegmentsUntil(String indexName, Supplier stopCondition, TimeValue timeout) { + long millisWaited = 0L; + do { + if (millisWaited >= timeout.millis()) { + logger.warn(format("timed out after waiting for [%d]", millisWaited)); + return; + } + var client = client(); + for (int request = 0; request < 10; request++) { + var bulkRequest = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < 10; doc++) { + bulkRequest.add(client.prepareIndex(indexName).setCreate(true).setSource("value", randomIntBetween(0, 1024))); + } + bulkRequest.get(); + } + // Sleep a bit to wait for merges to kick in + long sleepInMillis = randomLongBetween(50L, 200L); + safeSleep(sleepInMillis); + millisWaited += sleepInMillis; + } while (stopCondition.get() == false); + } + + private static TestPlugin getTestPlugin(String dataNode) { + return internalCluster().getInstance(PluginsService.class, dataNode).filterPlugins(TestPlugin.class).findFirst().get(); + } + + private static void ensureRed(String indexName) throws Exception { + assertBusy(() -> { + var healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT, indexName) + .setWaitForStatus(ClusterHealthStatus.RED) + .setWaitForEvents(Priority.LANGUID) + .get(); + assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED)); + }); + } + + private static class AssertingMergeEventListener implements MergeEventListener { + + private final AtomicInteger mergesQueued = new AtomicInteger(); + private final AtomicInteger mergesCompleted = new AtomicInteger(); + private final AtomicInteger mergesAborted = new AtomicInteger(); + + @Override + public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) { + mergesQueued.incrementAndGet(); + } + + @Override + public void onMergeCompleted(OnGoingMerge merge) { + mergesCompleted.incrementAndGet(); + } + + @Override + public void onMergeAborted(OnGoingMerge merge) { + mergesAborted.incrementAndGet(); + } + + private void verify() { + int queued = mergesQueued.get(); + int completed = mergesCompleted.get(); + int aborted = mergesAborted.get(); + var error = format("Queued merges mismatch (queued=%d, completed=%d, aborted=%d)", queued, completed, aborted); + assertThat(error, queued, equalTo(completed + aborted)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 325d552bcc474..cc5dd4cda4565 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2682,11 +2682,7 @@ private IndexWriter createWriter() throws IOException { // protected for testing protected IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - if (Assertions.ENABLED) { - return new AssertingIndexWriter(directory, iwc); - } else { - return new IndexWriter(directory, iwc); - } + return new ElasticsearchIndexWriter(directory, iwc, logger); } // with tests.verbose, lucene sets this up: plumb to align with filesystem stream @@ -2822,8 +2818,10 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() { return indexWriter.getConfig(); } - private void maybeFlushAfterMerge(OnGoingMerge merge) { - if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { + protected void maybeFlushAfterMerge(OnGoingMerge merge) { + if (indexWriter.getTragicException() == null + && indexWriter.hasPendingMerges() == false + && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the // writer // we deadlock on engine#close for instance. @@ -3280,19 +3278,49 @@ private static Map commitDataAsMap(final IndexWriter indexWriter return commitData; } - private static class AssertingIndexWriter extends IndexWriter { - AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { - super(d, conf); + private static class ElasticsearchIndexWriter extends IndexWriter { + + private final Logger logger; + + ElasticsearchIndexWriter(Directory directory, IndexWriterConfig indexWriterConfig, Logger logger) throws IOException { + super(directory, indexWriterConfig); + this.logger = logger; } @Override - public long deleteDocuments(Term... terms) { - throw new AssertionError("must not hard delete documents"); + public void onTragicEvent(Throwable tragedy, String location) { + assert tragedy != null; + try { + if (getConfig().getMergeScheduler() instanceof ThreadPoolMergeScheduler mergeScheduler) { + try { + // Must be executed before calling IndexWriter#onTragicEvent + mergeScheduler.onTragicEvent(tragedy); + } catch (Exception e) { + logger.warn("Exception thrown when notifying the merge scheduler of a tragic event", e); + if (tragedy != e) { + tragedy.addSuppressed(e); + } + } + } + } finally { + super.onTragicEvent(tragedy, location); + } } @Override - public long tryDeleteDocument(IndexReader readerIn, int docID) { - throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs"); + public long deleteDocuments(Term... terms) throws IOException { + if (Assertions.ENABLED) { + throw new AssertionError("must not hard delete documents"); + } + return super.deleteDocuments(terms); + } + + @Override + public long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException { + if (Assertions.ENABLED) { + throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs"); + } + return super.tryDeleteDocument(readerIn, docID); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 9efb582007aaf..d12db5dbafcc4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -31,7 +31,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; @@ -48,6 +50,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.LongUnaryOperator; +import java.util.function.Predicate; import java.util.function.ToLongFunction; import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING; @@ -372,7 +375,7 @@ private void runMergeTask(MergeTask mergeTask) { } } - private void abortMergeTask(MergeTask mergeTask) { + void abortMergeTask(MergeTask mergeTask) { assert mergeTask.hasStartedRunning() == false; assert runningMergeTasks.contains(mergeTask) == false; try { @@ -385,6 +388,25 @@ private void abortMergeTask(MergeTask mergeTask) { } } + private void abortMergeTasks(Collection mergeTasks) { + if (mergeTasks != null && mergeTasks.isEmpty() == false) { + for (var mergeTask : mergeTasks) { + abortMergeTask(mergeTask); + } + } + } + + /** + * Removes all {@link MergeTask} that match the predicate and aborts them. + * @param predicate the predicate to filter merge tasks to be aborted + */ + void abortQueuedMergeTasks(Predicate predicate) { + final var queuedMergesToAbort = new HashSet(); + if (queuedMergeTasks.drainMatchingElementsTo(predicate, queuedMergesToAbort) > 0) { + abortMergeTasks(queuedMergesToAbort); + } + } + /** * Start monitoring the available disk space, and update the available budget for running merge tasks * Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST @@ -675,6 +697,25 @@ ElementWithReleasableBudget take() throws InterruptedException { } } + int drainMatchingElementsTo(Predicate predicate, Collection c) { + int removed = 0; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + for (Iterator> iterator = enqueuedByBudget.iterator(); iterator.hasNext();) { + E item = iterator.next().v1(); + if (predicate.test(item)) { + iterator.remove(); + c.add(item); + removed++; + } + } + return removed; + } finally { + lock.unlock(); + } + } + /** * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements * that are still in use. The elements budget is also updated by re-applying the budget function. @@ -704,7 +745,7 @@ void updateBudget(long availableBudget) { void postBudgetUpdate() { assert lock.isHeldByCurrentThread(); - }; + } private void updateBudgetOfEnqueuedElementsAndReorderQueue() { assert this.lock.isHeldByCurrentThread(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index b21172c7007e8..91e52627a9087 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergeSchedulerConfig; @@ -75,9 +76,17 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics // how many {@link MergeTask}s have kicked off (this is used to name them). private final AtomicLong submittedMergeTaskCount = new AtomicLong(); private final AtomicLong doneMergeTaskCount = new AtomicLong(); + private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider; + + // Merge pulled from Lucene that is not yet submitted to the merge thread pool tasks queue + record PendingMerge(MergeSource source, MergePolicy.OneMerge merge, MergeTrigger trigger) {} + private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1); private volatile boolean closed = false; - private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider; + + // Tragic event that causes the IndexWriter and ThreadPoolMergeScheduler to be closed + private record TragicEvent(Throwable throwable, CountDownLatch latch) {} + private volatile TragicEvent tragedy = null; /** * Creates a thread-pool-based merge scheduler that runs merges in a thread pool. @@ -131,21 +140,26 @@ public void refreshConfig() { @Override public void merge(MergeSource mergeSource, MergeTrigger trigger) { - if (closed) { + if (closed || tragedy != null) { // avoid pulling from the merge source when closing return; } - MergePolicy.OneMerge merge = null; + PendingMerge pendingMerge = null; try { - merge = mergeSource.getNextMerge(); + // From this point on Lucene considers the OneMerge as "running", + // but it's not yet in the thread pool executor tasks queue! + var merge = mergeSource.getNextMerge(); + if (merge != null) { + pendingMerge = new PendingMerge(mergeSource, merge, trigger); + } } catch (IllegalStateException e) { if (verbose()) { message("merge task poll failed, likely that index writer is failed"); } // ignore exception, we expect the IW failure to be logged elsewhere } - if (merge != null) { - submitNewMergeTask(mergeSource, merge, trigger); + if (pendingMerge != null) { + submitNewMergeTask(pendingMerge); } } @@ -222,13 +236,26 @@ protected void handleMergeException(Throwable t) { throw new MergePolicy.MergeException(t); } - // package-private for tests - boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { + private void submitNewMergeTask(PendingMerge pendingMerge) { + boolean queued = false; try { - MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger); - mergeQueued(mergeTask.onGoingMerge); - return threadPoolMergeExecutorService.submitMergeTask(mergeTask); + // note that estimating the size of the merge might open a searcher + final var mergeTask = newMergeTask(pendingMerge.source(), pendingMerge.merge(), pendingMerge.trigger()); + if (tragedy == null) { + mergeQueued(mergeTask.onGoingMerge); + + queued = threadPoolMergeExecutorService.submitMergeTask(mergeTask); // may abort the merge immediately + // TODO Enable the following assertions once unit tests are fixed to not use Mockito + // assert queued || pendingMerge.merge().isAborted(); + } else { + // merge scheduler is failing due to a tragic event + mergeTask.abort(); + } } finally { + if (queued && tragedy != null) { + // ensure that if `onTragicEvent` races with this, we still abort what we just submitted. + abortQueuedMergesAfterTragedy(null); + } checkMergeTaskThrottling(); } } @@ -239,12 +266,15 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1; // IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge); + // used for reference equality in case the task must be aborted after a tragic event + var owner = this; return new MergeTask( mergeSource, merge, isAutoThrottle && isAutoThrottle(), "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId, - estimateMergeMemoryBytes + estimateMergeMemoryBytes, + owner ); } @@ -279,7 +309,7 @@ private void checkMergeTaskThrottling() { // synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically synchronized Schedule schedule(MergeTask mergeTask) { assert mergeTask.hasStartedRunning() == false; - if (closed) { + if (closed || tragedy != null) { // do not run or backlog tasks when closing the merge scheduler, instead abort them return Schedule.ABORT; } else if (shouldSkipMerge()) { @@ -314,12 +344,114 @@ private void mergeTaskDone(OnGoingMerge merge) { checkMergeTaskThrottling(); } - private synchronized void maybeSignalAllMergesDoneAfterClose() { - if (closed && runningMergeTasks.isEmpty()) { + private void maybeSignalAllMergesDoneAfterClose() { + assert Thread.holdsLock(this); + if ((closed || tragedy != null) && runningMergeTasks.isEmpty()) { closedWithNoRunningMerges.countDown(); } } + public void onTragicEvent(Throwable tragedy) { + assert tragedy != null; + assert tragedy instanceof MergePolicy.MergeAbortedException == false; + + TragicEvent tragicEvent; + boolean shouldAbort = false; + // Sets the tragic event if not already set + synchronized (this) { + tragicEvent = this.tragedy; + if (tragicEvent == null) { + tragicEvent = new TragicEvent(tragedy, new CountDownLatch(1)); + this.tragedy = tragicEvent; + shouldAbort = true; + } + } + if (shouldAbort) { + abortQueuedMergesAfterTragedy(tragedy); + closedWithNoRunningMerges.countDown(); + tragicEvent.latch().countDown(); + return; + } + try { + // the merge scheduler is being failed by another thread, wait for non-executed merges to be aborted + tragicEvent.latch().await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + tragedy.addSuppressed(e); + } + } + + private void abortQueuedMergesAfterTragedy(@Nullable Throwable throwable) { + assert this.tragedy != null; + try { + // Merges that have been pulled from Lucene using MergePolicy#getNextMerge before the tragic exception was set require special + // handling, because Lucene considers them as "running" and will wait for those to complete in IndexWriter#abortMerges when + // failing the IndexWriter with IndexWriter#maybeCloseOnTragicEvent. If at some point those merges are executed by a different + // thread (than the current thread we're in here) then it is OK, the merges will be aborted or failed almost immediately and + // there will be no running merges to wait for. + // + // But the thread pool executor offer no guarantee the those merges will be executed by another thread because: + // - the thread pool may have only 1 core thread, + // - or all other threads may be busy failing merges for different shards too, and can also be blocked waiting for their own + // queued merges to complete, + // - or there is not enough budget to execute the merge(s). + // + // In order to avoid waiting indefinitely in IndexWriter#abortMerges for merges that won't be executed, the current thread is + // used to abort all remaining non-executed merges: + // - the merge tasks in backloggedMergeTasks that are waiting to be re-enqueued, + // - the merge tasks in the thread pool executor task queue that are waiting to be executed. + // + // Note that only merges pulled from the current merge scheduler instance are aborted. These abortions are all executed in a + // synchronized block to ensure that no other concurrent merge thread can also fail due to a tragic event and set the + // IndexWriter#tragedy before we abort merges here. This is important because if the IndexWriter#tragedy is set, any upcoming + // merge execution/abortion would re-enter this method in order to fail the IndexWriter again (and ultimately also deadlock in + // IndexWriter#maybeCloseOnTragicEvent). + synchronized (this) { + // Abort backlogged merges + abortBackloggedMergeTasks(); + // Abort all queued tasks that have been created by this merge scheduler + threadPoolMergeExecutorService.abortQueuedMergeTasks(mergeTask -> mergeTask.owner == this); + } + } catch (Exception e) { + logger.warn("exception when aborting non-running merge tasks", e); + if (throwable != null) { + throwable.addSuppressed(e); + } + } + } + + private int abortBackloggedMergeTasks() throws Exception { + assert tragedy != null; + assert Thread.holdsLock(this); + + int count = 0; + int maxExceptions = 10; + Exception firstException = null; + MergeTask backlogged; + while ((backlogged = backloggedMergeTasks.poll()) != null) { + try { + abortMergeTask(backlogged); + count += 1; + } catch (Exception e) { + assert false : e; + if (firstException != null && maxExceptions-- >= 0) { + firstException.addSuppressed(e); + } else { + firstException = e; + } + } + } + if (firstException != null) { + throw firstException; + } + return count; + } + + private void abortMergeTask(MergeTask mergeTask) { + // abort immediately using the thread pool executor to handle throttling + threadPoolMergeExecutorService.abortMergeTask(mergeTask); + } + private synchronized void enqueueBackloggedTasks() { int maxBackloggedTasksToEnqueue = getMaxThreadCount() - runningMergeTasks.size(); // enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back @@ -342,6 +474,10 @@ void doMerge(MergeSource mergeSource, MergePolicy.OneMerge oneMerge) { } catch (Throwable t) { // OK to ignore MergeAbortedException. This is what Lucene's ConcurrentMergeScheduler does. if (t instanceof MergePolicy.MergeAbortedException == false) { + // A merge thread that thrown a tragic exception that closed the IndexWriter causes other merge threads to be aborted, but + // it is not itself aborted: instead the current merge is just completed and the thrown exception is set in the package + // private OneMerge#error field. Here we set such merge as aborted too so that it is not considered as successful later. + oneMerge.setAborted(); handleMergeException(t); } } @@ -384,13 +520,15 @@ class MergeTask implements Runnable { private final MergeRateLimiter rateLimiter; private final boolean supportsIOThrottling; private final long mergeMemoryEstimateBytes; + private final Object owner; MergeTask( MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name, - long mergeMemoryEstimateBytes + long mergeMemoryEstimateBytes, + Object owner ) { this.name = name; this.mergeStartTimeNS = new AtomicLong(); @@ -399,6 +537,7 @@ class MergeTask implements Runnable { this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); this.supportsIOThrottling = supportsIOThrottling; this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes; + this.owner = owner; } Schedule schedule() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 611284b21660a..e59df138bdeaa 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1329,7 +1329,11 @@ public long getWritingBytes() { if (engine == null) { return 0; } - return engine.getWritingBytes(); + try { + return engine.getWritingBytes(); + } catch (AlreadyClosedException ex) { + return 0L; + } } public RefreshStats refreshStats() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 574d3ac47daa9..be021a9619c33 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -346,7 +346,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { ); } - public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { + public static EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { return new EngineConfig( config.getShardId(), config.getThreadPool(), From 5cbf960fc3018e52b17add3b152a70243ad725ff Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 22 Sep 2025 08:40:01 +0000 Subject: [PATCH 2/3] [CI] Auto commit changes from spotless --- .../org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 91e52627a9087..c33a8816e4932 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -86,6 +86,7 @@ record PendingMerge(MergeSource source, MergePolicy.OneMerge merge, MergeTrigger // Tragic event that causes the IndexWriter and ThreadPoolMergeScheduler to be closed private record TragicEvent(Throwable throwable, CountDownLatch latch) {} + private volatile TragicEvent tragedy = null; /** From e43d50927eb1da74fd5f539a87592068ffeee374 Mon Sep 17 00:00:00 2001 From: tlrx Date: Mon, 22 Sep 2025 10:47:44 +0200 Subject: [PATCH 3/3] Adjust for 9.0 --- .../index/engine/MergeWithFailureIT.java | 54 ++++++++++++++++--- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java index e72b9a2fec2e9..a64e3344e11fc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java @@ -10,19 +10,23 @@ package org.elasticsearch.index.engine; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.DocValuesSkipper; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FilterCodecReader; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergeTrigger; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.OneMergeWrappingMergePolicy; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -38,7 +42,6 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergeSchedulerConfig; -import org.elasticsearch.index.codec.FilterDocValuesProducer; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.ShardId; @@ -122,14 +125,50 @@ public CacheHelper getReaderCacheHelper() { @Override public DocValuesProducer getDocValuesReader() { - return new FilterDocValuesProducer(super.getDocValuesReader()) { + final var in = super.getDocValuesReader(); + return new DocValuesProducer() { @Override public NumericDocValues getNumeric(FieldInfo field) throws IOException { safeAwait(runMerges, TimeValue.ONE_MINUTE); if (failOnce.compareAndSet(false, true)) { throw new IOException(FAILING_MERGE_ON_PURPOSE); } - return super.getNumeric(field); + return in.getNumeric(field); + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + return in.getBinary(field); + } + + @Override + public SortedDocValues getSorted(FieldInfo fieldInfo) throws IOException { + return in.getSorted(fieldInfo); + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo fieldInfo) throws IOException { + return in.getSortedNumeric(fieldInfo); + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + return in.getSortedSet(field); + } + + @Override + public DocValuesSkipper getSkipper(FieldInfo fieldInfo) throws IOException { + return in.getSkipper(fieldInfo); + } + + @Override + public void checkIntegrity() throws IOException { + in.checkIntegrity(); + } + + @Override + public void close() throws IOException { + in.close(); } }; } @@ -151,11 +190,10 @@ private class TestEngine extends InternalEngine { protected ElasticsearchMergeScheduler createMergeScheduler( ShardId shardId, IndexSettings indexSettings, - ThreadPoolMergeExecutorService executor, - MergeMetrics metrics + ThreadPoolMergeExecutorService executor ) { threadPoolMergeExecutorServiceReference.set(Objects.requireNonNull(executor)); - return new ThreadPoolMergeScheduler(shardId, indexSettings, executor, merge -> 0L, metrics) { + return new ThreadPoolMergeScheduler(shardId, indexSettings, executor, merge -> 0L) { @Override public void merge(MergeSource mergeSource, MergeTrigger trigger) { @@ -337,7 +375,7 @@ public void testFailedMergeDeadlock() throws Exception { } // check the state of the shard - var routingTable = internalCluster().clusterService(dataNode).state().routingTable(ProjectId.DEFAULT); + var routingTable = internalCluster().clusterService(dataNode).state().routingTable(); var indexRoutingTable = routingTable.index(shardId.getIndex()); var primary = asInstanceOf(IndexShardRoutingTable.class, indexRoutingTable.shard(shardId.id())).primaryShard(); assertThat(primary.state(), equalTo(ShardRoutingState.UNASSIGNED));