Skip to content

Commit 86fc888

Browse files
tlrxDonalEvans
authored andcommitted
Fix deadlock in ThreadPoolMergeScheduler when a failing merge closes the IndexWriter (elastic#134656)
This change fixes a bug that causes a deadlock in the thread pool merge scheduler when a merge fails due to a tragic event. The deadlock occurs because Lucene aborts running merges when failing with a tragic event and then waits for them to complete. But those "running" merges might in fact be waiting in the Elasticsearch's thread pool merge scheduler tasks queue, or they might be waiting in the backlogged merge tasks queue because the per-shard concurrent merges count limit has been reached, or they might simply be waiting for enough disk space to be executed. In which cases the merge thread that is failing waits indefinitely. The proposed fix in this change uses the merge thread that is failing due to a tragic event to abort all other enqueued and backlogged merge tasks of the same shard, before pursuing with the closing of the IndexWriter. This way Lucene won't have to wait for any running merges as they would have all be aborted upfront. Relates ES-12664
1 parent 820ecb4 commit 86fc888

File tree

7 files changed

+679
-36
lines changed

7 files changed

+679
-36
lines changed

docs/changelog/134656.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 134656
2+
summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the
3+
`IndexWriter`
4+
area: Engine
5+
type: bug
6+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java

Lines changed: 425 additions & 0 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2756,11 +2756,7 @@ private IndexWriter createWriter() throws IOException {
27562756

27572757
// protected for testing
27582758
protected IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
2759-
if (Assertions.ENABLED) {
2760-
return new AssertingIndexWriter(directory, iwc);
2761-
} else {
2762-
return new IndexWriter(directory, iwc);
2763-
}
2759+
return new ElasticsearchIndexWriter(directory, iwc, logger);
27642760
}
27652761

27662762
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
@@ -2920,8 +2916,10 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
29202916
return indexWriter.getConfig();
29212917
}
29222918

2923-
private void maybeFlushAfterMerge(OnGoingMerge merge) {
2924-
if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
2919+
protected void maybeFlushAfterMerge(OnGoingMerge merge) {
2920+
if (indexWriter.getTragicException() == null
2921+
&& indexWriter.hasPendingMerges() == false
2922+
&& System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
29252923
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the
29262924
// writer
29272925
// we deadlock on engine#close for instance.
@@ -3377,19 +3375,49 @@ private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter
33773375
return commitData;
33783376
}
33793377

3380-
private static class AssertingIndexWriter extends IndexWriter {
3381-
AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
3382-
super(d, conf);
3378+
private static class ElasticsearchIndexWriter extends IndexWriter {
3379+
3380+
private final Logger logger;
3381+
3382+
ElasticsearchIndexWriter(Directory directory, IndexWriterConfig indexWriterConfig, Logger logger) throws IOException {
3383+
super(directory, indexWriterConfig);
3384+
this.logger = logger;
3385+
}
3386+
3387+
@Override
3388+
public void onTragicEvent(Throwable tragedy, String location) {
3389+
assert tragedy != null;
3390+
try {
3391+
if (getConfig().getMergeScheduler() instanceof ThreadPoolMergeScheduler mergeScheduler) {
3392+
try {
3393+
// Must be executed before calling IndexWriter#onTragicEvent
3394+
mergeScheduler.onTragicEvent(tragedy);
3395+
} catch (Exception e) {
3396+
logger.warn("Exception thrown when notifying the merge scheduler of a tragic event", e);
3397+
if (tragedy != e) {
3398+
tragedy.addSuppressed(e);
3399+
}
3400+
}
3401+
}
3402+
} finally {
3403+
super.onTragicEvent(tragedy, location);
3404+
}
33833405
}
33843406

33853407
@Override
3386-
public long deleteDocuments(Term... terms) {
3387-
throw new AssertionError("must not hard delete documents");
3408+
public long deleteDocuments(Term... terms) throws IOException {
3409+
if (Assertions.ENABLED) {
3410+
throw new AssertionError("must not hard delete documents");
3411+
}
3412+
return super.deleteDocuments(terms);
33883413
}
33893414

33903415
@Override
3391-
public long tryDeleteDocument(IndexReader readerIn, int docID) {
3392-
throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
3416+
public long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
3417+
if (Assertions.ENABLED) {
3418+
throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
3419+
}
3420+
return super.tryDeleteDocument(readerIn, docID);
33933421
}
33943422
}
33953423

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import java.io.IOException;
3232
import java.util.ArrayList;
3333
import java.util.Arrays;
34+
import java.util.Collection;
3435
import java.util.Comparator;
36+
import java.util.HashSet;
3537
import java.util.IdentityHashMap;
3638
import java.util.Iterator;
3739
import java.util.List;
@@ -48,6 +50,7 @@
4850
import java.util.concurrent.locks.ReentrantLock;
4951
import java.util.function.Consumer;
5052
import java.util.function.LongUnaryOperator;
53+
import java.util.function.Predicate;
5154
import java.util.function.ToLongFunction;
5255

5356
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING;
@@ -372,7 +375,7 @@ private void runMergeTask(MergeTask mergeTask) {
372375
}
373376
}
374377

375-
private void abortMergeTask(MergeTask mergeTask) {
378+
void abortMergeTask(MergeTask mergeTask) {
376379
assert mergeTask.hasStartedRunning() == false;
377380
assert runningMergeTasks.contains(mergeTask) == false;
378381
try {
@@ -385,6 +388,25 @@ private void abortMergeTask(MergeTask mergeTask) {
385388
}
386389
}
387390

391+
private void abortMergeTasks(Collection<MergeTask> mergeTasks) {
392+
if (mergeTasks != null && mergeTasks.isEmpty() == false) {
393+
for (var mergeTask : mergeTasks) {
394+
abortMergeTask(mergeTask);
395+
}
396+
}
397+
}
398+
399+
/**
400+
* Removes all {@link MergeTask} that match the predicate and aborts them.
401+
* @param predicate the predicate to filter merge tasks to be aborted
402+
*/
403+
void abortQueuedMergeTasks(Predicate<MergeTask> predicate) {
404+
final var queuedMergesToAbort = new HashSet<MergeTask>();
405+
if (queuedMergeTasks.drainMatchingElementsTo(predicate, queuedMergesToAbort) > 0) {
406+
abortMergeTasks(queuedMergesToAbort);
407+
}
408+
}
409+
388410
/**
389411
* Start monitoring the available disk space, and update the available budget for running merge tasks
390412
* Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
@@ -675,6 +697,25 @@ ElementWithReleasableBudget take() throws InterruptedException {
675697
}
676698
}
677699

700+
int drainMatchingElementsTo(Predicate<E> predicate, Collection<? super E> c) {
701+
int removed = 0;
702+
final ReentrantLock lock = this.lock;
703+
lock.lock();
704+
try {
705+
for (Iterator<Tuple<E, Long>> iterator = enqueuedByBudget.iterator(); iterator.hasNext();) {
706+
E item = iterator.next().v1();
707+
if (predicate.test(item)) {
708+
iterator.remove();
709+
c.add(item);
710+
removed++;
711+
}
712+
}
713+
return removed;
714+
} finally {
715+
lock.unlock();
716+
}
717+
}
718+
678719
/**
679720
* Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
680721
* that are still in use. The elements budget is also updated by re-applying the budget function.
@@ -704,7 +745,7 @@ void updateBudget(long availableBudget) {
704745

705746
void postBudgetUpdate() {
706747
assert lock.isHeldByCurrentThread();
707-
};
748+
}
708749

709750
private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
710751
assert this.lock.isHeldByCurrentThread();

0 commit comments

Comments
 (0)