Skip to content

Commit 9b09dba

Browse files
Run merges when aborted too
1 parent 8bb890c commit 9b09dba

File tree

1 file changed

+17
-14
lines changed

1 file changed

+17
-14
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,15 @@ private synchronized void enqueueBackloggedTasks() {
244244
/**
245245
* Does the actual merge, by calling {@link org.apache.lucene.index.MergeScheduler.MergeSource#merge}
246246
*/
247-
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
248-
mergeSource.merge(merge);
247+
void doMerge(MergeSource mergeSource, MergePolicy.OneMerge oneMerge) {
248+
try {
249+
mergeSource.merge(oneMerge);
250+
} catch (Throwable t) {
251+
// OK to ignore MergeAbortedException. This is what Lucene's ConcurrentMergeScheduler does.
252+
if (t instanceof MergePolicy.MergeAbortedException == false) {
253+
handleMergeException(t);
254+
}
255+
}
249256
}
250257

251258
@Override
@@ -254,6 +261,9 @@ public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in) {
254261
// Note: the rate limiter is only per thread (per merge). So, if there are multiple merge threads running
255262
// the combined IO rate per node is, roughly, 'thread_pool_size * merge_queue#targetMBPerSec', as
256263
// the per-thread IO rate is updated, best effort, for all running merge threads concomitantly.
264+
if (merge.isAborted()) {
265+
return in;
266+
}
257267
MergeTask mergeTask = currentlyRunningMergeTasks.get(merge);
258268
if (mergeTask == null) {
259269
throw new IllegalStateException("associated merge task for executing merge not found");
@@ -349,12 +359,6 @@ public void run() {
349359
)
350360
);
351361
}
352-
} catch (Throwable t) {
353-
if (t instanceof MergePolicy.MergeAbortedException) {
354-
// OK to ignore. This is what Lucene's ConcurrentMergeScheduler does
355-
} else {
356-
handleMergeException(t);
357-
}
358362
} finally {
359363
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get());
360364
mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS);
@@ -376,13 +380,12 @@ public void run() {
376380
}
377381

378382
void abortOnGoingMerge() {
379-
// This would interrupt an IndexWriter if it were actually performing the merge. We just set this here because it seems
380-
// appropriate as we are not going to move forward with the merge.
383+
// This interrupts the IndexWriter.
381384
onGoingMerge.getMerge().setAborted();
382-
// It is fine to mark this merge as finished. Lucene will eventually produce a new merge including this segment even if
383-
// this merge did not actually execute.
384-
mergeSource.onMergeFinished(onGoingMerge.getMerge());
385-
doneMergeTaskCount.incrementAndGet();
385+
// This ensures {@code OneMerge#close} gets invoked.
386+
// {@code IndexWriter} considers a merge as "running" once it has been pulled from the {@code MergeSource#getNextMerge},
387+
// so in theory it's not enough to just call {@code MergeSource#onMergeFinished} on it (as for "pending" ones).
388+
doMerge(mergeSource, onGoingMerge.getMerge());
386389
}
387390

388391
@Override

0 commit comments

Comments
 (0)