Skip to content

Commit b2e65a2

Browse files
Handle aborted merge tasks at any point
1 parent 5ae44c5 commit b2e65a2

File tree

2 files changed

+45
-55
lines changed

2 files changed

+45
-55
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,20 +62,21 @@ private ThreadPoolMergeExecutorService(ThreadPool threadPool) {
6262
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
6363
}
6464

65-
void submitMergeTask(MergeTask mergeTask) {
65+
boolean submitMergeTask(MergeTask mergeTask) {
6666
assert mergeTask.isRunning() == false;
67-
assert mergeTask.isOnGoingMergeAborted() == false;
6867
// first enqueue the runnable that runs exactly one merge task (the smallest it can find)
6968
if (enqueueMergeTaskExecution() == false) {
70-
// if the threadpool cannot run the merge, just abort it
69+
// if the thread pool cannot run the merge, just abort it
7170
mergeTask.abortOnGoingMerge();
71+
return false;
7272
} else {
7373
if (mergeTask.supportsIOThrottling()) {
7474
// count enqueued merge tasks that support IO auto throttling, and maybe adjust IO rate for all
7575
maybeUpdateIORateBytesPerSec(submittedIOThrottledMergeTasksCount.incrementAndGet());
7676
}
7777
// then enqueue the merge task proper
7878
enqueueMergeTask(mergeTask);
79+
return true;
7980
}
8081
}
8182

@@ -92,17 +93,19 @@ private boolean enqueueMergeTaskExecution() {
9293
executorService.execute(() -> {
9394
boolean interrupted = false;
9495
// one such runnable always executes a SINGLE merge task from the queue
95-
// this is important for merge queue statistics, i.e. the executor's queue size equals the merge tasks' queue size
96+
// this is important for merge queue statistics, i.e. the executor's queue size represents the current amount of merges
97+
MergeTask smallestMergeTask = null;
9698
try {
9799
while (true) {
98100
try {
99101
// will block if there are backlogged merges until they're enqueued again
100-
MergeTask smallestMergeTask = queuedMergeTasks.take();
102+
smallestMergeTask = queuedMergeTasks.take();
101103
// let the task's scheduler decide if it can actually run the merge task now
102104
if (smallestMergeTask.runNowOrBacklog()) {
103105
runMergeTask(smallestMergeTask);
104106
break;
105107
}
108+
smallestMergeTask = null;
106109
// the merge task is backlogged by the merge scheduler, try to get the next smallest one
107110
// it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run
108111
} catch (InterruptedException e) {
@@ -111,6 +114,9 @@ private boolean enqueueMergeTaskExecution() {
111114
}
112115
}
113116
} finally {
117+
if (smallestMergeTask != null && smallestMergeTask.supportsIOThrottling()) {
118+
submittedIOThrottledMergeTasksCount.decrementAndGet();
119+
}
114120
if (interrupted) {
115121
Thread.currentThread().interrupt();
116122
}
@@ -126,25 +132,16 @@ private boolean enqueueMergeTaskExecution() {
126132

127133
private void runMergeTask(MergeTask mergeTask) {
128134
assert mergeTask.isRunning() == false;
135+
boolean added = currentlyRunningMergeTasks.add(mergeTask);
136+
assert added : "starting merge task [" + mergeTask + "] registered as already running";
129137
try {
130-
if (mergeTask.isOnGoingMergeAborted()) {
131-
return;
132-
}
133-
boolean added = currentlyRunningMergeTasks.add(mergeTask);
134-
assert added : "starting merge task [" + mergeTask + "] registered as already running";
135-
try {
136-
if (mergeTask.supportsIOThrottling()) {
137-
mergeTask.setIORateLimit(ByteSizeValue.ofBytes(targetIORateBytesPerSec.get()).getMbFrac());
138-
}
139-
mergeTask.run();
140-
} finally {
141-
boolean removed = currentlyRunningMergeTasks.remove(mergeTask);
142-
assert removed : "completed merge task [" + mergeTask + "] not registered as running";
143-
}
144-
} finally {
145138
if (mergeTask.supportsIOThrottling()) {
146-
submittedIOThrottledMergeTasksCount.decrementAndGet();
139+
mergeTask.setIORateLimit(ByteSizeValue.ofBytes(targetIORateBytesPerSec.get()).getMbFrac());
147140
}
141+
mergeTask.run();
142+
} finally {
143+
boolean removed = currentlyRunningMergeTasks.remove(mergeTask);
144+
assert removed : "completed merge task [" + mergeTask + "] not registered as running";
148145
}
149146
}
150147

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.lucene.store.IOContext;
2020
import org.apache.lucene.store.IndexOutput;
2121
import org.apache.lucene.store.RateLimitedIndexOutput;
22+
import org.elasticsearch.ElasticsearchException;
2223
import org.elasticsearch.common.logging.Loggers;
2324
import org.elasticsearch.common.settings.Setting;
2425
import org.elasticsearch.core.TimeValue;
@@ -99,12 +100,20 @@ public void refreshConfig() {
99100
}
100101

101102
@Override
102-
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
103+
public void merge(MergeSource mergeSource, MergeTrigger trigger) {
103104
if (closed) {
104105
// avoid pulling from the merge source when closing
105106
return;
106107
}
107-
MergePolicy.OneMerge merge = mergeSource.getNextMerge();
108+
MergePolicy.OneMerge merge = null;
109+
try {
110+
merge = mergeSource.getNextMerge();
111+
} catch (IllegalStateException e) {
112+
if (verbose()) {
113+
message("merge task poll failed, likely that index writer is failed");
114+
}
115+
// ignore exception, we expect the IW failure to be logged elsewhere
116+
}
108117
if (merge != null) {
109118
submitNewMergeTask(mergeSource, merge, trigger);
110119
}
@@ -145,10 +154,13 @@ protected void handleMergeException(Throwable t) {
145154
throw new MergePolicy.MergeException(t);
146155
}
147156

148-
private void submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
149-
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
150-
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
151-
checkMergeTaskThrottling();
157+
private boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
158+
try {
159+
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
160+
return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
161+
} finally {
162+
checkMergeTaskThrottling();
163+
}
152164
}
153165

154166
private void checkMergeTaskThrottling() {
@@ -179,16 +191,15 @@ private MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge mer
179191
);
180192
}
181193

182-
// synchronized so that {@code #currentlyRunningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
194+
// synchronized so that {@code #closed}, {@code #currentlyRunningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
183195
private synchronized boolean runNowOrBacklog(MergeTask mergeTask) {
184196
assert mergeTask.isRunning() == false;
185-
assert mergeTask.isOnGoingMergeAborted() == false;
186197
if (closed) {
187-
// Do not backlog tasks when closing the merge scheduler, instead abort them.
188-
// Aborted task are not actually executed.
198+
// Do not backlog or execute tasks when closing the merge scheduler, instead abort them.
189199
mergeTask.abortOnGoingMerge();
190-
return true;
191-
} else if (currentlyRunningMergeTasks.size() < config.getMaxThreadCount()) {
200+
throw new ElasticsearchException("merge task aborted because scheduler is shutting down");
201+
}
202+
if (currentlyRunningMergeTasks.size() < config.getMaxThreadCount()) {
192203
boolean added = currentlyRunningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null;
193204
assert added : "starting merge task [" + mergeTask + "] registered as already running";
194205
return true;
@@ -225,7 +236,7 @@ private synchronized void enqueueBackloggedTasks() {
225236
if (backloggedMergeTask == null) {
226237
break;
227238
}
228-
// no need to abort merge tasks now, they will be aborted when the scheduler tries to run them
239+
// no need to abort merge tasks now, they will be aborted on the spot when the scheduler gets to run them
229240
threadPoolMergeExecutorService.enqueueMergeTask(backloggedMergeTask);
230241
}
231242
}
@@ -307,7 +318,6 @@ public boolean isRunning() {
307318
@Override
308319
public void run() {
309320
assert isRunning() == false;
310-
assert isOnGoingMergeAborted() == false;
311321
assert ThreadPoolMergeScheduler.this.currentlyRunningMergeTasks.containsKey(onGoingMerge.getMerge())
312322
: "runNowOrBacklog must be invoked before actually running the merge task";
313323
try {
@@ -360,21 +370,8 @@ public void run() {
360370
message(String.format(Locale.ROOT, "merge task %s end", this));
361371
}
362372
mergeDone(this);
363-
if (ThreadPoolMergeScheduler.this.closed == false) {
364-
// kick-off next merge, if any
365-
MergePolicy.OneMerge nextMerge = null;
366-
try {
367-
nextMerge = mergeSource.getNextMerge();
368-
} catch (IllegalStateException e) {
369-
if (verbose()) {
370-
message("merge task poll failed, likely that index writer is failed");
371-
}
372-
// ignore exception, we expect the IW failure to be logged elsewhere
373-
}
374-
if (nextMerge != null) {
375-
submitNewMergeTask(mergeSource, nextMerge, MergeTrigger.MERGE_FINISHED);
376-
}
377-
}
373+
// kick-off next merge, if any
374+
merge(mergeSource, MergeTrigger.MERGE_FINISHED);
378375
}
379376
}
380377

@@ -388,13 +385,9 @@ void abortOnGoingMerge() {
388385
doneMergeTaskCount.incrementAndGet();
389386
}
390387

391-
boolean isOnGoingMergeAborted() {
392-
return onGoingMerge.getMerge().isAborted();
393-
}
394-
395388
@Override
396389
public String toString() {
397-
return name + (isOnGoingMergeAborted() ? " (aborted)" : "");
390+
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");
398391
}
399392
}
400393

0 commit comments

Comments
 (0)