Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/134656.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 134656
summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the
`IndexWriter`
area: Engine
type: bug
issues: []

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2682,11 +2682,7 @@ private IndexWriter createWriter() throws IOException {

// protected for testing
protected IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
if (Assertions.ENABLED) {
return new AssertingIndexWriter(directory, iwc);
} else {
return new IndexWriter(directory, iwc);
}
return new ElasticsearchIndexWriter(directory, iwc, logger);
}

// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
Expand Down Expand Up @@ -2822,8 +2818,10 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
return indexWriter.getConfig();
}

private void maybeFlushAfterMerge(OnGoingMerge merge) {
if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
protected void maybeFlushAfterMerge(OnGoingMerge merge) {
if (indexWriter.getTragicException() == null
&& indexWriter.hasPendingMerges() == false
&& System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
// NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the
// writer
// we deadlock on engine#close for instance.
Expand Down Expand Up @@ -3280,19 +3278,49 @@ private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter
return commitData;
}

private static class AssertingIndexWriter extends IndexWriter {
AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
super(d, conf);
private static class ElasticsearchIndexWriter extends IndexWriter {

private final Logger logger;

ElasticsearchIndexWriter(Directory directory, IndexWriterConfig indexWriterConfig, Logger logger) throws IOException {
super(directory, indexWriterConfig);
this.logger = logger;
}

@Override
public long deleteDocuments(Term... terms) {
throw new AssertionError("must not hard delete documents");
public void onTragicEvent(Throwable tragedy, String location) {
assert tragedy != null;
try {
if (getConfig().getMergeScheduler() instanceof ThreadPoolMergeScheduler mergeScheduler) {
try {
// Must be executed before calling IndexWriter#onTragicEvent
mergeScheduler.onTragicEvent(tragedy);
} catch (Exception e) {
logger.warn("Exception thrown when notifying the merge scheduler of a tragic event", e);
if (tragedy != e) {
tragedy.addSuppressed(e);
}
}
}
} finally {
super.onTragicEvent(tragedy, location);
}
}

@Override
public long tryDeleteDocument(IndexReader readerIn, int docID) {
throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
public long deleteDocuments(Term... terms) throws IOException {
if (Assertions.ENABLED) {
throw new AssertionError("must not hard delete documents");
}
return super.deleteDocuments(terms);
}

@Override
public long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
if (Assertions.ENABLED) {
throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
}
return super.tryDeleteDocument(readerIn, docID);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -48,6 +50,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;

import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING;
Expand Down Expand Up @@ -372,7 +375,7 @@ private void runMergeTask(MergeTask mergeTask) {
}
}

private void abortMergeTask(MergeTask mergeTask) {
void abortMergeTask(MergeTask mergeTask) {
assert mergeTask.hasStartedRunning() == false;
assert runningMergeTasks.contains(mergeTask) == false;
try {
Expand All @@ -385,6 +388,25 @@ private void abortMergeTask(MergeTask mergeTask) {
}
}

private void abortMergeTasks(Collection<MergeTask> mergeTasks) {
if (mergeTasks != null && mergeTasks.isEmpty() == false) {
for (var mergeTask : mergeTasks) {
abortMergeTask(mergeTask);
}
}
}

/**
* Removes all {@link MergeTask} that match the predicate and aborts them.
* @param predicate the predicate to filter merge tasks to be aborted
*/
void abortQueuedMergeTasks(Predicate<MergeTask> predicate) {
final var queuedMergesToAbort = new HashSet<MergeTask>();
if (queuedMergeTasks.drainMatchingElementsTo(predicate, queuedMergesToAbort) > 0) {
abortMergeTasks(queuedMergesToAbort);
}
}

/**
* Start monitoring the available disk space, and update the available budget for running merge tasks
* Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
Expand Down Expand Up @@ -675,6 +697,25 @@ ElementWithReleasableBudget take() throws InterruptedException {
}
}

int drainMatchingElementsTo(Predicate<E> predicate, Collection<? super E> c) {
int removed = 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Iterator<Tuple<E, Long>> iterator = enqueuedByBudget.iterator(); iterator.hasNext();) {
E item = iterator.next().v1();
if (predicate.test(item)) {
iterator.remove();
c.add(item);
removed++;
}
}
return removed;
} finally {
lock.unlock();
}
}

/**
* Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
* that are still in use. The elements budget is also updated by re-applying the budget function.
Expand Down Expand Up @@ -704,7 +745,7 @@ void updateBudget(long availableBudget) {

void postBudgetUpdate() {
assert lock.isHeldByCurrentThread();
};
}

private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
assert this.lock.isHeldByCurrentThread();
Expand Down
Loading