Skip to content

Commit 6304852

Browse files
make MergeTask a Runnable rather than an AbstractRunnable
1 parent 779eace commit 6304852

File tree

2 files changed

+46
-57
lines changed

2 files changed

+46
-57
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeQueue.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private ThreadPoolMergeQueue(ThreadPool threadPool) {
6161

6262
void submitMergeTask(MergeTask mergeTask) {
6363
enqueueMergeTask(mergeTask);
64-
executeNextMergeTask();
64+
executeSmallestMergeTask();
6565
}
6666

6767
void enqueueMergeTask(MergeTask mergeTask) {
@@ -72,25 +72,31 @@ void enqueueMergeTask(MergeTask mergeTask) {
7272
}
7373
}
7474

75-
private void executeNextMergeTask() {
76-
executorService.execute(() -> {
77-
// one such task always executes a SINGLE merge task; this is important for merge queue statistics
78-
while (true) {
79-
MergeTask smallestMergeTask;
80-
try {
81-
// will block if there are backlogged merges until they're enqueued again
82-
smallestMergeTask = queuedMergeTasks.take();
83-
} catch (InterruptedException e) {
84-
Thread.currentThread().interrupt();
85-
return;
86-
}
87-
if (smallestMergeTask.runNowOrBacklog()) {
88-
runMergeTask(smallestMergeTask);
89-
// one runnable one merge task
90-
return;
75+
private void executeSmallestMergeTask() {
76+
final AtomicReference<MergeTask> smallestMergeTask = new AtomicReference<>();
77+
try {
78+
executorService.execute(() -> {
79+
// one such task always executes a SINGLE merge task; this is important for merge queue statistics
80+
while (true) {
81+
try {
82+
// will block if there are backlogged merges until they're enqueued again
83+
smallestMergeTask.set(queuedMergeTasks.take());
84+
} catch (InterruptedException e) {
85+
Thread.currentThread().interrupt();
86+
return;
87+
}
88+
if (smallestMergeTask.get().runNowOrBacklog()) {
89+
runMergeTask(smallestMergeTask.get());
90+
// one runnable one merge task
91+
return;
92+
}
9193
}
94+
});
95+
} catch (Exception e) {
96+
if (smallestMergeTask.get() != null) {
97+
smallestMergeTask.get().onRejection(e);
9298
}
93-
});
99+
}
94100
}
95101

96102
private void runMergeTask(MergeTask mergeTask) {

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 22 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.lucene.store.RateLimitedIndexOutput;
2222
import org.elasticsearch.common.logging.Loggers;
2323
import org.elasticsearch.common.settings.Setting;
24-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2524
import org.elasticsearch.core.TimeValue;
2625
import org.elasticsearch.index.IndexSettings;
2726
import org.elasticsearch.index.MergeSchedulerConfig;
@@ -228,7 +227,7 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
228227
};
229228
}
230229

231-
final class MergeTask extends AbstractRunnable implements Comparable<MergeTask> {
230+
final class MergeTask implements Runnable, Comparable<MergeTask> {
232231
private final String name;
233232
private final AtomicLong mergeStartTimeNS;
234233
private final MergeSource mergeSource;
@@ -271,7 +270,7 @@ public boolean isRunning() {
271270
}
272271

273272
@Override
274-
public void doRun() throws Exception {
273+
public void run() {
275274
if (mergeStartTimeNS.compareAndSet(0L, System.nanoTime()) == false) {
276275
throw new IllegalStateException("Cannot run the same merge task multiple times");
277276
}
@@ -306,57 +305,41 @@ public void doRun() throws Exception {
306305
abortOnGoingMerge();
307306
handleMergeException(t);
308307
}
309-
}
310-
}
311-
312-
@Override
313-
public void onAfter() {
314-
if (isRunning() == false) {
315-
throw new IllegalStateException("onAfter must only be invoked after doRun");
316-
}
317-
try {
318-
if (verbose()) {
319-
message(String.format(Locale.ROOT, "merge task %s end", this));
320-
}
321-
afterMerge(onGoingMerge);
322308
} finally {
323-
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get());
324309
try {
325-
mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS);
310+
if (verbose()) {
311+
message(String.format(Locale.ROOT, "merge task %s end", this));
312+
}
313+
afterMerge(onGoingMerge);
326314
} finally {
327-
mergeDone(this);
328-
// kick-off next merge, if any
329-
MergePolicy.OneMerge nextMerge = null;
315+
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get());
330316
try {
331-
nextMerge = mergeSource.getNextMerge();
332-
} catch (IllegalStateException e) {
333-
if (verbose()) {
334-
message("merge task poll failed, likely that index writer is failed");
317+
mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS);
318+
} finally {
319+
mergeDone(this);
320+
// kick-off next merge, if any
321+
MergePolicy.OneMerge nextMerge = null;
322+
try {
323+
nextMerge = mergeSource.getNextMerge();
324+
} catch (IllegalStateException e) {
325+
if (verbose()) {
326+
message("merge task poll failed, likely that index writer is failed");
327+
}
328+
// ignore exception, we expect the IW failure to be logged elsewhere
329+
}
330+
if (nextMerge != null) {
331+
submitNewMergeTask(mergeSource, nextMerge, MergeTrigger.MERGE_FINISHED);
335332
}
336-
// ignore exception, we expect the IW failure to be logged elsewhere
337-
}
338-
if (nextMerge != null) {
339-
submitNewMergeTask(mergeSource, nextMerge, MergeTrigger.MERGE_FINISHED);
340333
}
341334
}
342335
}
343336
}
344337

345-
@Override
346-
public void onFailure(Exception e) {
347-
// no-op, should not be called
348-
}
349-
350-
@Override
351338
public void onRejection(Exception e) {
352-
if (isRunning()) {
353-
throw new IllegalStateException("A running merge cannot be rejected for running");
354-
}
355339
if (verbose()) {
356340
message(String.format(Locale.ROOT, "merge task [%s] rejected by thread pool, aborting", onGoingMerge.getId()));
357341
}
358342
abortOnGoingMerge();
359-
mergeDone(this);
360343
}
361344

362345
private void abortOnGoingMerge() {

0 commit comments

Comments
 (0)