Skip to content

Commit 9d2d29d

Browse files
Closing not quite there....
1 parent 5f9a293 commit 9d2d29d

File tree

3 files changed

+142
-62
lines changed

3 files changed

+142
-62
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeQueue.java

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
1717
import org.elasticsearch.threadpool.ThreadPool;
1818

19+
import java.io.Closeable;
20+
import java.io.IOException;
1921
import java.util.Set;
2022
import java.util.concurrent.ExecutorService;
2123
import java.util.concurrent.PriorityBlockingQueue;
24+
import java.util.concurrent.RejectedExecutionException;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2226
import java.util.concurrent.atomic.AtomicInteger;
2327
import java.util.concurrent.atomic.AtomicLong;
24-
import java.util.concurrent.atomic.AtomicReference;
2528

26-
public class ThreadPoolMergeQueue {
29+
public class ThreadPoolMergeQueue implements Closeable {
2730
/**
2831
* Floor for IO write rate limit of individual merge tasks (we will never go any lower than this)
2932
*/
@@ -48,6 +51,7 @@ public class ThreadPoolMergeQueue {
4851
private final AtomicLong targetIORateBytesPerSec = new AtomicLong(START_IO_RATE.getBytes());
4952
private final ExecutorService executorService;
5053
private final int maxConcurrentMerges;
54+
private final AtomicBoolean closed = new AtomicBoolean();
5155

5256
public static @Nullable ThreadPoolMergeQueue maybeCreateThreadPoolMergeQueue(ThreadPool threadPool, Settings settings) {
5357
if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) {
@@ -63,51 +67,66 @@ private ThreadPoolMergeQueue(ThreadPool threadPool) {
6367
}
6468

6569
void submitMergeTask(MergeTask mergeTask) {
66-
enqueueMergeTask(mergeTask);
67-
if (mergeTask.supportsIOThrottling()) {
68-
// count submitted merge tasks that support IO auto throttling, and maybe adjust IO rate for all
69-
maybeUpdateIORateBytesPerSec(submittedIOThrottledMergeTasksCount.incrementAndGet());
70+
assert mergeTask.isRunning() == false;
71+
assert mergeTask.isOnGoingMergeAborted() == false;
72+
if (closed.get() || enqueueMergeTaskExecution() == false) {
73+
mergeTask.abortOnGoingMerge();
74+
} else {
75+
if (mergeTask.supportsIOThrottling()) {
76+
// count submitted merge tasks that support IO auto throttling, and maybe adjust IO rate for all
77+
maybeUpdateIORateBytesPerSec(submittedIOThrottledMergeTasksCount.incrementAndGet());
78+
}
79+
enqueueMergeTask(mergeTask);
7080
}
71-
executeSmallestMergeTask();
7281
}
7382

7483
void enqueueMergeTask(MergeTask mergeTask) {
7584
queuedMergeTasks.add(mergeTask);
7685
}
7786

78-
private void executeSmallestMergeTask() {
79-
final AtomicReference<MergeTask> smallestMergeTask = new AtomicReference<>();
87+
private boolean enqueueMergeTaskExecution() {
8088
try {
8189
executorService.execute(() -> {
90+
boolean interrupted = false;
8291
// one such runnable always executes a SINGLE merge task from the queue
8392
// this is important for merge queue statistics, i.e. the executor's queue size equals the merge tasks' queue size
84-
while (true) {
85-
try {
86-
// will block if there are backlogged merges until they're enqueued again
87-
smallestMergeTask.set(queuedMergeTasks.take());
88-
} catch (InterruptedException e) {
89-
Thread.currentThread().interrupt();
90-
return;
93+
try {
94+
while (true) {
95+
try {
96+
// will block if there are backlogged merges until they're enqueued again
97+
MergeTask smallestMergeTask = queuedMergeTasks.take();
98+
// the merge task's scheduler might backlog rather than execute the task
99+
// it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task
100+
if (smallestMergeTask.runNowOrBacklog()) {
101+
runMergeTask(smallestMergeTask);
102+
// one runnable one merge task
103+
return;
104+
}
105+
// the merge task is backlogged by the merge scheduler, try to get the next smallest one
106+
} catch (InterruptedException e) {
107+
interrupted = true;
108+
// this runnable must always run exactly a single merge task
109+
}
91110
}
92-
// the merge task's scheduler might backlog rather than execute the task
93-
// it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task
94-
if (smallestMergeTask.get().runNowOrBacklog()) {
95-
runMergeTask(smallestMergeTask.get());
96-
// one runnable one merge task
97-
return;
111+
} finally {
112+
if (interrupted) {
113+
Thread.currentThread().interrupt();
98114
}
99-
// the merge task is backlogged by the merge scheduler,
100115
}
101116
});
102-
} catch (Exception e) {
103-
if (smallestMergeTask.get() != null) {
104-
smallestMergeTask.get().onRejection(e);
105-
}
117+
return true;
118+
} catch (RejectedExecutionException e) {
119+
// cannot execute merges because the executor is shutting down
120+
return false;
106121
}
107122
}
108123

109124
private void runMergeTask(MergeTask mergeTask) {
110125
assert mergeTask.isRunning() == false;
126+
if (closed.get()) {
127+
mergeTask.abortOnGoingMerge();
128+
return;
129+
}
111130
boolean added = currentlyRunningMergeTasks.add(mergeTask);
112131
assert added : "starting merge task [" + mergeTask + "] registered as already running";
113132
try {
@@ -183,4 +202,9 @@ public boolean isEmpty() {
183202
assert isEmpty == false || submittedIOThrottledMergeTasksCount.get() == 0L;
184203
return isEmpty;
185204
}
205+
206+
@Override
207+
public void close() throws IOException {
208+
closed.set(true);
209+
}
186210
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 90 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
5656
// how many {@link MergeTask}s have kicked off (this is used to name them).
5757
private final AtomicLong mergeTaskCount = new AtomicLong();
5858
private final AtomicLong mergeTaskDoneCount = new AtomicLong();
59+
private final AtomicBoolean closed = new AtomicBoolean();
5960

6061
public ThreadPoolMergeScheduler(ShardId shardId, IndexSettings indexSettings, ThreadPoolMergeQueue threadPoolMergeQueue) {
6162
this.shardId = shardId;
@@ -88,17 +89,28 @@ public void refreshConfig() {
8889
// if maxMergeCount changed, maybe we need to toggle merge task throttling
8990
checkMergeTaskThrottling();
9091
// if maxThreadCount changed, maybe some backlogged merges are now allowed to run
91-
enqueueBacklogged();
92+
enqueueBackloggedTasks();
9293
}
9394

9495
@Override
9596
public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
97+
if (closed.get()) {
98+
// avoid pulling from the merge source when closing
99+
return;
100+
}
96101
MergePolicy.OneMerge merge = mergeSource.getNextMerge();
97102
if (merge != null) {
98103
submitNewMergeTask(mergeSource, merge, trigger);
99104
}
100105
}
101106

107+
@Override
108+
public MergeScheduler clone() {
109+
// Lucene IW makes a clone internally but since we hold on to this instance
110+
// the clone will just be the identity.
111+
return this;
112+
}
113+
102114
/**
103115
* A callback allowing for custom logic before an actual merge starts.
104116
*/
@@ -109,17 +121,20 @@ protected void beforeMerge(OnGoingMerge merge) {}
109121
*/
110122
protected void afterMerge(OnGoingMerge merge) {}
111123

124+
/**
125+
* A callback that's invoked when indexing should throttle down indexing in order to let merging to catch up.
126+
*/
112127
protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {}
113128

129+
/**
130+
* A callback that's invoked when indexing should un-throttle because merging caught up.
131+
* This is invoked sometime after {@link #enableIndexingThrottling(int, int, int)} was invoked in the first place.
132+
*/
114133
protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {}
115134

116-
@Override
117-
public MergeScheduler clone() {
118-
// Lucene IW makes a clone internally but since we hold on to this instance
119-
// the clone will just be the identity.
120-
return this;
121-
}
122-
135+
/**
136+
* A callback for exceptions thrown while merging.
137+
*/
123138
protected void handleMergeException(Throwable t) {
124139
throw new MergePolicy.MergeException(t);
125140
}
@@ -162,13 +177,15 @@ private MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge mer
162177
// synchronized so that {@code #currentlyRunningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
163178
private synchronized boolean runNowOrBacklog(MergeTask mergeTask) {
164179
assert mergeTask.isRunning() == false;
165-
if (currentlyRunningMergeTasks.size() >= config.getMaxThreadCount()) {
166-
backloggedMergeTasks.add(mergeTask);
167-
return false;
168-
} else {
180+
assert mergeTask.isOnGoingMergeAborted() == false;
181+
// if the merge scheduler is closed it will abort all merges before they start running
182+
if (closed.get() || currentlyRunningMergeTasks.size() < config.getMaxThreadCount()) {
169183
boolean added = currentlyRunningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null;
170184
assert added : "starting merge task [" + mergeTask + "] registered as already running";
171185
return true;
186+
} else {
187+
backloggedMergeTasks.add(mergeTask);
188+
return false;
172189
}
173190
}
174191

@@ -178,15 +195,20 @@ private void mergeDone(MergeTask mergeTask) {
178195
boolean removed = currentlyRunningMergeTasks.remove(mergeTask.onGoingMerge.getMerge()) != null;
179196
assert removed : "completed merge task [" + mergeTask + "] not registered as running";
180197
// when one merge is done, maybe a backlogged one can now execute
181-
enqueueBacklogged();
198+
enqueueBackloggedTasks();
199+
// when closing, we wait for all running merges to finish
200+
if (currentlyRunningMergeTasks.isEmpty()) {
201+
notifyAll();
202+
}
182203
}
183204
mergeTaskDoneCount.incrementAndGet();
184205
checkMergeTaskThrottling();
185206
}
186207

187-
private synchronized void enqueueBacklogged() {
208+
private synchronized void enqueueBackloggedTasks() {
188209
int maxBackloggedTasksToEnqueue = config.getMaxThreadCount() - currentlyRunningMergeTasks.size();
189-
while (maxBackloggedTasksToEnqueue-- > 0) {
210+
// enqueue all backlogged tasks when closing, let the queue deal with them
211+
while (closed.get() || maxBackloggedTasksToEnqueue-- > 0) {
190212
MergeTask backloggedMergeTask = backloggedMergeTasks.poll();
191213
if (backloggedMergeTask == null) {
192214
break;
@@ -271,6 +293,11 @@ public boolean isRunning() {
271293

272294
@Override
273295
public void run() {
296+
assert isOnGoingMergeAborted() == false;
297+
if (ThreadPoolMergeScheduler.this.closed.get()) {
298+
abortOnGoingMerge();
299+
return;
300+
}
274301
if (mergeStartTimeNS.compareAndSet(0L, System.nanoTime()) == false) {
275302
throw new IllegalStateException("Cannot run the same merge task multiple times");
276303
}
@@ -302,6 +329,7 @@ public void run() {
302329
if (t instanceof MergePolicy.MergeAbortedException) {
303330
// OK to ignore. This is what Lucene's ConcurrentMergeScheduler does
304331
} else {
332+
// sometimes this might double-abort a merge, but that's OK
305333
abortOnGoingMerge();
306334
handleMergeException(t);
307335
}
@@ -317,32 +345,27 @@ public void run() {
317345
mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS);
318346
} finally {
319347
mergeDone(this);
320-
// kick-off next merge, if any
321-
MergePolicy.OneMerge nextMerge = null;
322-
try {
323-
nextMerge = mergeSource.getNextMerge();
324-
} catch (IllegalStateException e) {
325-
if (verbose()) {
326-
message("merge task poll failed, likely that index writer is failed");
348+
if (ThreadPoolMergeScheduler.this.closed.get() == false) {
349+
// kick-off next merge, if any
350+
MergePolicy.OneMerge nextMerge = null;
351+
try {
352+
nextMerge = mergeSource.getNextMerge();
353+
} catch (IllegalStateException e) {
354+
if (verbose()) {
355+
message("merge task poll failed, likely that index writer is failed");
356+
}
357+
// ignore exception, we expect the IW failure to be logged elsewhere
358+
}
359+
if (nextMerge != null) {
360+
submitNewMergeTask(mergeSource, nextMerge, MergeTrigger.MERGE_FINISHED);
327361
}
328-
// ignore exception, we expect the IW failure to be logged elsewhere
329-
}
330-
if (nextMerge != null) {
331-
submitNewMergeTask(mergeSource, nextMerge, MergeTrigger.MERGE_FINISHED);
332362
}
333363
}
334364
}
335365
}
336366
}
337367

338-
public void onRejection(Exception e) {
339-
if (verbose()) {
340-
message(String.format(Locale.ROOT, "merge task [%s] rejected by thread pool, aborting", onGoingMerge.getId()));
341-
}
342-
abortOnGoingMerge();
343-
}
344-
345-
private void abortOnGoingMerge() {
368+
void abortOnGoingMerge() {
346369
// This would interrupt an IndexWriter if it were actually performing the merge. We just set this here because it seems
347370
// appropriate as we are not going to move forward with the merge.
348371
onGoingMerge.getMerge().setAborted();
@@ -351,6 +374,10 @@ private void abortOnGoingMerge() {
351374
mergeSource.onMergeFinished(onGoingMerge.getMerge());
352375
}
353376

377+
boolean isOnGoingMergeAborted() {
378+
return onGoingMerge.getMerge().isAborted();
379+
}
380+
354381
@Override
355382
public String toString() {
356383
return name;
@@ -375,6 +402,35 @@ protected void message(String message) {
375402
super.message(message);
376403
}
377404

405+
@Override
406+
public void close() throws IOException {
407+
super.close();
408+
closed.set(true);
409+
boolean interrupted = false;
410+
try {
411+
synchronized (this) {
412+
// enqueue all backlogged merge tasks, the merge queue assumes that backlogged tasks are always re-enqueued
413+
enqueueBackloggedTasks();
414+
// supercharge running merges
415+
currentlyRunningMergeTasks.values().forEach(runningTask -> runningTask.setIORateLimit(Double.POSITIVE_INFINITY));
416+
// wait until all running merges are done
417+
while (currentlyRunningMergeTasks.isEmpty() == false) {
418+
try {
419+
// wait with a timeout, just to cover for something that failed to notify
420+
wait(1000);
421+
} catch (InterruptedException e) {
422+
// ignore interruption, we will retry until all currently running merge tasks are done
423+
interrupted = true;
424+
}
425+
}
426+
}
427+
} finally {
428+
if (interrupted) {
429+
Thread.currentThread().interrupt();
430+
}
431+
}
432+
}
433+
378434
private static double nsToSec(long ns) {
379435
return ns / (double) TimeUnit.SECONDS.toNanos(1);
380436
}

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
149149
if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) {
150150
result.put(
151151
ThreadPool.Names.MERGE,
152-
new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, allocatedProcessors, TimeValue.timeValueMinutes(5), false)
152+
new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, allocatedProcessors, TimeValue.timeValueMinutes(5), true)
153153
);
154154
}
155155
result.put(

0 commit comments

Comments
 (0)