diff --git a/docs/changelog/134656.yaml b/docs/changelog/134656.yaml new file mode 100644 index 0000000000000..e4f442d629f50 --- /dev/null +++ b/docs/changelog/134656.yaml @@ -0,0 +1,6 @@ +pr: 134656 +summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the + `IndexWriter` +area: Engine +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java new file mode 100644 index 0000000000000..e72b9a2fec2e9 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java @@ -0,0 +1,425 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FilterCodecReader; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OneMergeWrappingMergePolicy; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLockObtainFailedException; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergeSchedulerConfig; +import org.elasticsearch.index.codec.FilterDocValuesProducer; +import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; + +public class MergeWithFailureIT extends ESIntegTestCase { + + private static final String FAILING_MERGE_ON_PURPOSE = "Failing merge on purpose"; + + public static class TestPlugin extends Plugin implements EnginePlugin { + + // Number of queued merges in the thread pool. Lucene considers those as "running" and blocks waiting on them to complete in case + // of a merge failure. + private final Set pendingMerges = ConcurrentCollections.newConcurrentSet(); + + // Number of running merges in the thread pool + private final AtomicInteger runningMergesCount = new AtomicInteger(); + + // Latch to unblock merges + private final CountDownLatch runMerges = new CountDownLatch(1); + + // Reference to the ThreadPoolMergeExecutorService + private final AtomicReference threadPoolMergeExecutorServiceReference = new AtomicReference<>(); + + // This future is completed once the shard that is expected to fail has its store closed + private final PlainActionFuture shardStoreClosedListener = new PlainActionFuture<>(); + + private final boolean isDataNode; + + public TestPlugin(Settings settings) { + this.isDataNode = DiscoveryNode.hasDataRole(settings); + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (isDataNode == false) { + return Optional.of(InternalEngine::new); + } + return Optional.of( + config -> new TestEngine( + EngineTestCase.copy( + config, + new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap -> new MergePolicy.OneMerge(toWrap) { + @Override + public CodecReader wrapForMerge(CodecReader reader) { + return new FilterCodecReader(reader) { + final AtomicBoolean failOnce = new AtomicBoolean(false); + + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + @Override + public DocValuesProducer getDocValuesReader() { + return new FilterDocValuesProducer(super.getDocValuesReader()) { + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + safeAwait(runMerges, TimeValue.ONE_MINUTE); + if (failOnce.compareAndSet(false, true)) { + throw new IOException(FAILING_MERGE_ON_PURPOSE); + } + return super.getNumeric(field); + } + }; + } + }; + } + }) + ) + ) + ); + } + + private class TestEngine extends InternalEngine { + + TestEngine(EngineConfig engineConfig) { + super(engineConfig); + } + + @Override + protected ElasticsearchMergeScheduler createMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + ThreadPoolMergeExecutorService executor, + MergeMetrics metrics + ) { + threadPoolMergeExecutorServiceReference.set(Objects.requireNonNull(executor)); + return new ThreadPoolMergeScheduler(shardId, indexSettings, executor, merge -> 0L, metrics) { + + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) { + var wrapped = wrapMergeSource(mergeSource); + super.merge(wrapped, trigger); + } + + private MergeSource wrapMergeSource(MergeSource delegate) { + // Wraps the merge source to know which merges were pulled from Lucene by the IndexWriter + return new MergeSource() { + @Override + public MergePolicy.OneMerge getNextMerge() { + var merge = delegate.getNextMerge(); + if (merge != null) { + if (pendingMerges.add(merge) == false) { + throw new AssertionError("Merge already pending " + merge); + } + } + return merge; + } + + @Override + public void onMergeFinished(MergePolicy.OneMerge merge) { + delegate.onMergeFinished(merge); + } + + @Override + public boolean hasPendingMerges() { + return delegate.hasPendingMerges(); + } + + @Override + public void merge(MergePolicy.OneMerge merge) throws IOException { + runningMergesCount.incrementAndGet(); + if (pendingMerges.remove(merge) == false) { + throw new AssertionError("Pending merge not found " + merge); + } + delegate.merge(merge); + } + }; + } + + @Override + protected void handleMergeException(final Throwable exc) { + mergeException(exc); + } + }; + } + } + + @Override + public void onIndexModule(IndexModule indexModule) { + if (isDataNode) { + indexModule.addIndexEventListener(new IndexEventListener() { + @Override + public void onStoreClosed(ShardId shardId) { + shardStoreClosedListener.onResponse(null); + } + }); + } + } + + public void registerMergeEventListener(MergeEventListener listener) { + var threadPoolMergeExecutorService = Objects.requireNonNull(threadPoolMergeExecutorServiceReference.get()); + threadPoolMergeExecutorService.registerMergeEventListener(listener); + } + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class); + } + + public void testFailedMergeDeadlock() throws Exception { + internalCluster().startMasterOnlyNode(); + final int maxMergeThreads = randomIntBetween(1, 3); + final int indexMaxThreadCount = randomBoolean() ? randomIntBetween(1, 10) : Integer.MAX_VALUE; + + final var dataNode = internalCluster().startDataOnlyNode( + Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + .put("thread_pool." + ThreadPool.Names.MERGE + ".max", maxMergeThreads) + .build() + ); + + final var plugin = getTestPlugin(dataNode); + assertThat(plugin, notNullValue()); + + final var indexName = randomIdentifier(); + createIndex( + indexName, + indexSettings(1, 0).put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".name", dataNode) + .put(MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.getKey(), 1) + // when indexMaxThreadCount is small so merge tasks might be backlogged + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), indexMaxThreadCount) + // no merge throttling + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), Integer.MAX_VALUE) + .build() + ); + + final var mergesListener = new AssertingMergeEventListener(); + plugin.registerMergeEventListener(mergesListener); + + // Kick off enough merges to block the thread pool + var maxRunningThreads = Math.min(maxMergeThreads, indexMaxThreadCount); + indexDocsInManySegmentsUntil(indexName, () -> plugin.runningMergesCount.get() == maxRunningThreads); + assertThat(plugin.runningMergesCount.get(), equalTo(maxRunningThreads)); + + // Now pull more merges so they are queued in the merge thread pool, but Lucene thinks they are running + final int pendingMerges = plugin.pendingMerges.size() + randomIntBetween(1, 5); + indexDocsInManySegmentsUntil(indexName, () -> plugin.pendingMerges.size() > pendingMerges); + + var mergeThreadPool = asInstanceOf( + ThreadPoolExecutor.class, + internalCluster().clusterService(dataNode).threadPool().executor(ThreadPool.Names.MERGE) + ); + assertThat(mergeThreadPool.getActiveCount(), greaterThanOrEqualTo(maxRunningThreads)); + + // More merges in the hope to have backlogged merges + if (indexMaxThreadCount != Integer.MAX_VALUE) { + final int backloggedMerges = plugin.pendingMerges.size() + randomIntBetween(1, 5); + indexDocsInManySegmentsUntil(indexName, () -> plugin.pendingMerges.size() > backloggedMerges); + } + + // Sometime closes the shard concurrently with the tragic failure + Thread closingThread = null; + if (rarely()) { + closingThread = new Thread(() -> { + safeAwait(plugin.runMerges, TimeValue.ONE_MINUTE); + client().admin().indices().prepareClose(indexName).get(); + }); + closingThread.start(); + } + + // unblock merges, one merge will fail the IndexWriter + plugin.runMerges.countDown(); + + // Deadlock sample: + // + // "elasticsearch[node_s5][merge][T#1]@16690" tid=0x8e nid=NA waiting + // java.lang.Thread.State: WAITING + // at java.lang.Object.wait0(Object.java:-1) + // at java.lang.Object.wait(Object.java:389) + // at org.apache.lucene.index.IndexWriter.doWait(IndexWriter.java:5531) + // at org.apache.lucene.index.IndexWriter.abortMerges(IndexWriter.java:2733) + // at org.apache.lucene.index.IndexWriter.rollbackInternalNoCommi(IndexWriter.java:2488) + // at org.apache.lucene.index.IndexWriter.rollbackInternal(IndexWriter.java:2457) + // - locked <0x429a> (a java.lang.Object) + // at org.apache.lucene.index.IndexWriter.maybeCloseOnTragicEvent(IndexWriter.java:5765) + // at org.apache.lucene.index.IndexWriter.tragicEvent(IndexWriter.java:5755) + // at org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4780) + // at org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6567) + // at org.elasticsearch.index.engine.MergeWithFailureIT$TestPlugin$TestEngine$1$1.merge(MergeWithFailureIT.java:178) + // at org.elasticsearch.index.engine.ThreadPoolMergeScheduler.doMerge(ThreadPoolMergeScheduler.java:347) + // at org.elasticsearch.index.engine.ThreadPoolMergeScheduler$MergeTask.run(ThreadPoolMergeScheduler.java:459) + // at org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.runMergeTask(ThreadPoolMergeExecutorService.java:364) + + ensureRed(indexName); + + // verify that the shard store is effectively closed + assertTrue(plugin.shardStoreClosedListener.isDone()); + + if (closingThread != null) { + closingThread.join(); + } + + final var shardId = new ShardId(resolveIndex(indexName), 0); + var nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, dataNode); + try { + var shardLock = nodeEnvironment.shardLock(shardId, getTestName(), 10_000L); + shardLock.close(); + } catch (ShardLockObtainFailedException ex) { + throw new AssertionError("Shard " + shardId + " is still locked after 10 seconds", ex); + } + + // check the state of the shard + var routingTable = internalCluster().clusterService(dataNode).state().routingTable(ProjectId.DEFAULT); + var indexRoutingTable = routingTable.index(shardId.getIndex()); + var primary = asInstanceOf(IndexShardRoutingTable.class, indexRoutingTable.shard(shardId.id())).primaryShard(); + assertThat(primary.state(), equalTo(ShardRoutingState.UNASSIGNED)); + assertThat(primary.unassignedInfo(), notNullValue()); + assertThat(primary.unassignedInfo().reason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + var failure = ExceptionsHelper.unwrap(primary.unassignedInfo().failure(), IOException.class); + assertThat(failure, notNullValue()); + assertThat(failure.getMessage(), containsString(FAILING_MERGE_ON_PURPOSE)); + + // verify the number of queued, completed and aborted merges + mergesListener.verify(); + + assertAcked(indicesAdmin().prepareDelete(indexName)); + } + + private void indexDocsInManySegmentsUntil(String indexName, Supplier stopCondition) { + indexDocsInManySegmentsUntil(indexName, stopCondition, TimeValue.THIRTY_SECONDS); + } + + private void indexDocsInManySegmentsUntil(String indexName, Supplier stopCondition, TimeValue timeout) { + long millisWaited = 0L; + do { + if (millisWaited >= timeout.millis()) { + logger.warn(format("timed out after waiting for [%d]", millisWaited)); + return; + } + var client = client(); + for (int request = 0; request < 10; request++) { + var bulkRequest = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int doc = 0; doc < 10; doc++) { + bulkRequest.add(client.prepareIndex(indexName).setCreate(true).setSource("value", randomIntBetween(0, 1024))); + } + bulkRequest.get(); + } + // Sleep a bit to wait for merges to kick in + long sleepInMillis = randomLongBetween(50L, 200L); + safeSleep(sleepInMillis); + millisWaited += sleepInMillis; + } while (stopCondition.get() == false); + } + + private static TestPlugin getTestPlugin(String dataNode) { + return internalCluster().getInstance(PluginsService.class, dataNode).filterPlugins(TestPlugin.class).findFirst().get(); + } + + private static void ensureRed(String indexName) throws Exception { + assertBusy(() -> { + var healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT, indexName) + .setWaitForStatus(ClusterHealthStatus.RED) + .setWaitForEvents(Priority.LANGUID) + .get(); + assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED)); + }); + } + + private static class AssertingMergeEventListener implements MergeEventListener { + + private final AtomicInteger mergesQueued = new AtomicInteger(); + private final AtomicInteger mergesCompleted = new AtomicInteger(); + private final AtomicInteger mergesAborted = new AtomicInteger(); + + @Override + public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) { + mergesQueued.incrementAndGet(); + } + + @Override + public void onMergeCompleted(OnGoingMerge merge) { + mergesCompleted.incrementAndGet(); + } + + @Override + public void onMergeAborted(OnGoingMerge merge) { + mergesAborted.incrementAndGet(); + } + + private void verify() { + int queued = mergesQueued.get(); + int completed = mergesCompleted.get(); + int aborted = mergesAborted.get(); + var error = format("Queued merges mismatch (queued=%d, completed=%d, aborted=%d)", queued, completed, aborted); + assertThat(error, queued, equalTo(completed + aborted)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 170ccc2cc7bbb..d18a475b5757e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2728,11 +2728,7 @@ private IndexWriter createWriter() throws IOException { // protected for testing protected IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - if (Assertions.ENABLED) { - return new AssertingIndexWriter(directory, iwc); - } else { - return new IndexWriter(directory, iwc); - } + return new ElasticsearchIndexWriter(directory, iwc, logger); } // with tests.verbose, lucene sets this up: plumb to align with filesystem stream @@ -2876,8 +2872,10 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() { return indexWriter.getConfig(); } - private void maybeFlushAfterMerge(OnGoingMerge merge) { - if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { + protected void maybeFlushAfterMerge(OnGoingMerge merge) { + if (indexWriter.getTragicException() == null + && indexWriter.hasPendingMerges() == false + && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the // writer // we deadlock on engine#close for instance. @@ -3333,19 +3331,49 @@ private static Map commitDataAsMap(final IndexWriter indexWriter return commitData; } - private static class AssertingIndexWriter extends IndexWriter { - AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { - super(d, conf); + private static class ElasticsearchIndexWriter extends IndexWriter { + + private final Logger logger; + + ElasticsearchIndexWriter(Directory directory, IndexWriterConfig indexWriterConfig, Logger logger) throws IOException { + super(directory, indexWriterConfig); + this.logger = logger; + } + + @Override + public void onTragicEvent(Throwable tragedy, String location) { + assert tragedy != null; + try { + if (getConfig().getMergeScheduler() instanceof ThreadPoolMergeScheduler mergeScheduler) { + try { + // Must be executed before calling IndexWriter#onTragicEvent + mergeScheduler.onTragicEvent(tragedy); + } catch (Exception e) { + logger.warn("Exception thrown when notifying the merge scheduler of a tragic event", e); + if (tragedy != e) { + tragedy.addSuppressed(e); + } + } + } + } finally { + super.onTragicEvent(tragedy, location); + } } @Override - public long deleteDocuments(Term... terms) { - throw new AssertionError("must not hard delete documents"); + public long deleteDocuments(Term... terms) throws IOException { + if (Assertions.ENABLED) { + throw new AssertionError("must not hard delete documents"); + } + return super.deleteDocuments(terms); } @Override - public long tryDeleteDocument(IndexReader readerIn, int docID) { - throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs"); + public long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException { + if (Assertions.ENABLED) { + throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs"); + } + return super.tryDeleteDocument(readerIn, docID); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java index 9efb582007aaf..d12db5dbafcc4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -31,7 +31,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; @@ -48,6 +50,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.LongUnaryOperator; +import java.util.function.Predicate; import java.util.function.ToLongFunction; import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING; @@ -372,7 +375,7 @@ private void runMergeTask(MergeTask mergeTask) { } } - private void abortMergeTask(MergeTask mergeTask) { + void abortMergeTask(MergeTask mergeTask) { assert mergeTask.hasStartedRunning() == false; assert runningMergeTasks.contains(mergeTask) == false; try { @@ -385,6 +388,25 @@ private void abortMergeTask(MergeTask mergeTask) { } } + private void abortMergeTasks(Collection mergeTasks) { + if (mergeTasks != null && mergeTasks.isEmpty() == false) { + for (var mergeTask : mergeTasks) { + abortMergeTask(mergeTask); + } + } + } + + /** + * Removes all {@link MergeTask} that match the predicate and aborts them. + * @param predicate the predicate to filter merge tasks to be aborted + */ + void abortQueuedMergeTasks(Predicate predicate) { + final var queuedMergesToAbort = new HashSet(); + if (queuedMergeTasks.drainMatchingElementsTo(predicate, queuedMergesToAbort) > 0) { + abortMergeTasks(queuedMergesToAbort); + } + } + /** * Start monitoring the available disk space, and update the available budget for running merge tasks * Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST @@ -675,6 +697,25 @@ ElementWithReleasableBudget take() throws InterruptedException { } } + int drainMatchingElementsTo(Predicate predicate, Collection c) { + int removed = 0; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + for (Iterator> iterator = enqueuedByBudget.iterator(); iterator.hasNext();) { + E item = iterator.next().v1(); + if (predicate.test(item)) { + iterator.remove(); + c.add(item); + removed++; + } + } + return removed; + } finally { + lock.unlock(); + } + } + /** * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements * that are still in use. The elements budget is also updated by re-applying the budget function. @@ -704,7 +745,7 @@ void updateBudget(long availableBudget) { void postBudgetUpdate() { assert lock.isHeldByCurrentThread(); - }; + } private void updateBudgetOfEnqueuedElementsAndReorderQueue() { assert this.lock.isHeldByCurrentThread(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 7f1ad5adf3181..26459c6376393 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergeSchedulerConfig; @@ -76,9 +77,18 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics // how many {@link MergeTask}s have kicked off (this is used to name them). private final AtomicLong submittedMergeTaskCount = new AtomicLong(); private final AtomicLong doneMergeTaskCount = new AtomicLong(); + private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider; + + // Merge pulled from Lucene that is not yet submitted to the merge thread pool tasks queue + record PendingMerge(MergeSource source, MergePolicy.OneMerge merge, MergeTrigger trigger) {} + private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1); private volatile boolean closed = false; - private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider; + + // Tragic event that causes the IndexWriter and ThreadPoolMergeScheduler to be closed + private record TragicEvent(Throwable throwable, CountDownLatch latch) {} + + private volatile TragicEvent tragedy = null; /** * Creates a thread-pool-based merge scheduler that runs merges in a thread pool. @@ -135,21 +145,26 @@ public void refreshConfig() { @Override public void merge(MergeSource mergeSource, MergeTrigger trigger) { - if (closed) { + if (closed || tragedy != null) { // avoid pulling from the merge source when closing return; } - MergePolicy.OneMerge merge = null; + PendingMerge pendingMerge = null; try { - merge = mergeSource.getNextMerge(); + // From this point on Lucene considers the OneMerge as "running", + // but it's not yet in the thread pool executor tasks queue! + var merge = mergeSource.getNextMerge(); + if (merge != null) { + pendingMerge = new PendingMerge(mergeSource, merge, trigger); + } } catch (IllegalStateException e) { if (verbose()) { message("merge task poll failed, likely that index writer is failed"); } // ignore exception, we expect the IW failure to be logged elsewhere } - if (merge != null) { - submitNewMergeTask(mergeSource, merge, trigger); + if (pendingMerge != null) { + submitNewMergeTask(pendingMerge); } } @@ -226,14 +241,27 @@ protected void handleMergeException(Throwable t) { throw new MergePolicy.MergeException(t); } - // package-private for tests - boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { + private void submitNewMergeTask(PendingMerge pendingMerge) { + boolean queued = false; try { - MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger); - mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()); - mergeQueued(mergeTask.onGoingMerge); - return threadPoolMergeExecutorService.submitMergeTask(mergeTask); + // note that estimating the size of the merge might open a searcher + final var mergeTask = newMergeTask(pendingMerge.source(), pendingMerge.merge(), pendingMerge.trigger()); + if (tragedy == null) { + mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()); + mergeQueued(mergeTask.onGoingMerge); + + queued = threadPoolMergeExecutorService.submitMergeTask(mergeTask); // may abort the merge immediately + // TODO Enable the following assertions once unit tests are fixed to not use Mockito + // assert queued || pendingMerge.merge().isAborted(); + } else { + // merge scheduler is failing due to a tragic event + mergeTask.abort(); + } } finally { + if (queued && tragedy != null) { + // ensure that if `onTragicEvent` races with this, we still abort what we just submitted. + abortQueuedMergesAfterTragedy(null); + } checkMergeTaskThrottling(); } } @@ -244,12 +272,15 @@ MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, Merg boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1; // IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge); + // used for reference equality in case the task must be aborted after a tragic event + var owner = this; return new MergeTask( mergeSource, merge, isAutoThrottle && isAutoThrottle(), "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId, - estimateMergeMemoryBytes + estimateMergeMemoryBytes, + owner ); } @@ -284,7 +315,7 @@ private void checkMergeTaskThrottling() { // synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically synchronized Schedule schedule(MergeTask mergeTask) { assert mergeTask.hasStartedRunning() == false; - if (closed) { + if (closed || tragedy != null) { // do not run or backlog tasks when closing the merge scheduler, instead abort them return Schedule.ABORT; } else if (shouldSkipMerge()) { @@ -297,7 +328,6 @@ synchronized Schedule schedule(MergeTask mergeTask) { assert added : "starting merge task [" + mergeTask + "] registered as already running"; return Schedule.RUN; } else { - assert mergeTask.hasStartedRunning() == false; backloggedMergeTasks.add(mergeTask); return Schedule.BACKLOG; } @@ -320,10 +350,112 @@ private void mergeTaskDone(OnGoingMerge merge) { checkMergeTaskThrottling(); } - private synchronized void maybeSignalAllMergesDoneAfterClose() { - if (closed && runningMergeTasks.isEmpty()) { + private void maybeSignalAllMergesDoneAfterClose() { + assert Thread.holdsLock(this); + if ((closed || tragedy != null) && runningMergeTasks.isEmpty()) { + closedWithNoRunningMerges.countDown(); + } + } + + public void onTragicEvent(Throwable tragedy) { + assert tragedy != null; + assert tragedy instanceof MergePolicy.MergeAbortedException == false; + + TragicEvent tragicEvent; + boolean shouldAbort = false; + // Sets the tragic event if not already set + synchronized (this) { + tragicEvent = this.tragedy; + if (tragicEvent == null) { + tragicEvent = new TragicEvent(tragedy, new CountDownLatch(1)); + this.tragedy = tragicEvent; + shouldAbort = true; + } + } + if (shouldAbort) { + abortQueuedMergesAfterTragedy(tragedy); closedWithNoRunningMerges.countDown(); + tragicEvent.latch().countDown(); + return; } + try { + // the merge scheduler is being failed by another thread, wait for non-executed merges to be aborted + tragicEvent.latch().await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + tragedy.addSuppressed(e); + } + } + + private void abortQueuedMergesAfterTragedy(@Nullable Throwable throwable) { + assert this.tragedy != null; + try { + // Merges that have been pulled from Lucene using MergePolicy#getNextMerge before the tragic exception was set require special + // handling, because Lucene considers them as "running" and will wait for those to complete in IndexWriter#abortMerges when + // failing the IndexWriter with IndexWriter#maybeCloseOnTragicEvent. If at some point those merges are executed by a different + // thread (than the current thread we're in here) then it is OK, the merges will be aborted or failed almost immediately and + // there will be no running merges to wait for. + // + // But the thread pool executor offer no guarantee the those merges will be executed by another thread because: + // - the thread pool may have only 1 core thread, + // - or all other threads may be busy failing merges for different shards too, and can also be blocked waiting for their own + // queued merges to complete, + // - or there is not enough budget to execute the merge(s). + // + // In order to avoid waiting indefinitely in IndexWriter#abortMerges for merges that won't be executed, the current thread is + // used to abort all remaining non-executed merges: + // - the merge tasks in backloggedMergeTasks that are waiting to be re-enqueued, + // - the merge tasks in the thread pool executor task queue that are waiting to be executed. + // + // Note that only merges pulled from the current merge scheduler instance are aborted. These abortions are all executed in a + // synchronized block to ensure that no other concurrent merge thread can also fail due to a tragic event and set the + // IndexWriter#tragedy before we abort merges here. This is important because if the IndexWriter#tragedy is set, any upcoming + // merge execution/abortion would re-enter this method in order to fail the IndexWriter again (and ultimately also deadlock in + // IndexWriter#maybeCloseOnTragicEvent). + synchronized (this) { + // Abort backlogged merges + abortBackloggedMergeTasks(); + // Abort all queued tasks that have been created by this merge scheduler + threadPoolMergeExecutorService.abortQueuedMergeTasks(mergeTask -> mergeTask.owner == this); + } + } catch (Exception e) { + logger.warn("exception when aborting non-running merge tasks", e); + if (throwable != null) { + throwable.addSuppressed(e); + } + } + } + + private int abortBackloggedMergeTasks() throws Exception { + assert tragedy != null; + assert Thread.holdsLock(this); + + int count = 0; + int maxExceptions = 10; + Exception firstException = null; + MergeTask backlogged; + while ((backlogged = backloggedMergeTasks.poll()) != null) { + try { + abortMergeTask(backlogged); + count += 1; + } catch (Exception e) { + assert false : e; + if (firstException != null && maxExceptions-- >= 0) { + firstException.addSuppressed(e); + } else { + firstException = e; + } + } + } + if (firstException != null) { + throw firstException; + } + return count; + } + + private void abortMergeTask(MergeTask mergeTask) { + // abort immediately using the thread pool executor to handle throttling + threadPoolMergeExecutorService.abortMergeTask(mergeTask); } private synchronized void enqueueBackloggedTasks() { @@ -348,6 +480,10 @@ void doMerge(MergeSource mergeSource, MergePolicy.OneMerge oneMerge) { } catch (Throwable t) { // OK to ignore MergeAbortedException. This is what Lucene's ConcurrentMergeScheduler does. if (t instanceof MergePolicy.MergeAbortedException == false) { + // A merge thread that thrown a tragic exception that closed the IndexWriter causes other merge threads to be aborted, but + // it is not itself aborted: instead the current merge is just completed and the thrown exception is set in the package + // private OneMerge#error field. Here we set such merge as aborted too so that it is not considered as successful later. + oneMerge.setAborted(); handleMergeException(t); } } @@ -390,13 +526,15 @@ class MergeTask implements Runnable { private final MergeRateLimiter rateLimiter; private final boolean supportsIOThrottling; private final long mergeMemoryEstimateBytes; + private final Object owner; MergeTask( MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name, - long mergeMemoryEstimateBytes + long mergeMemoryEstimateBytes, + Object owner ) { this.name = name; this.mergeStartTimeNS = new AtomicLong(); @@ -405,6 +543,7 @@ class MergeTask implements Runnable { this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); this.supportsIOThrottling = supportsIOThrottling; this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes; + this.owner = owner; } Schedule schedule() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8dc7440c5dccf..c15db48d9c9a3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1351,7 +1351,11 @@ public long getWritingBytes() { if (engine == null) { return 0; } - return engine.getWritingBytes(); + try { + return engine.getWritingBytes(); + } catch (AlreadyClosedException ex) { + return 0L; + } } public RefreshStats refreshStats() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2327ac06b9e81..3297084b7afb7 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -355,7 +355,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { ); } - public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { + public static EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { return new EngineConfig( config.getShardId(), config.getThreadPool(),