Skip to content

Commit 6c21654

Browse files
Implemented max thread setting
1 parent 3c203cb commit 6c21654

File tree

1 file changed

+34
-0
lines changed

1 file changed

+34
-0
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,11 @@
3232
import org.elasticsearch.threadpool.ThreadPool;
3333

3434
import java.io.IOException;
35+
import java.util.ArrayList;
3536
import java.util.HashSet;
37+
import java.util.List;
3638
import java.util.Locale;
39+
import java.util.PriorityQueue;
3740
import java.util.Set;
3841
import java.util.concurrent.ExecutorService;
3942
import java.util.concurrent.TimeUnit;
@@ -67,6 +70,8 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
6770
private final ExecutorService executorService;
6871
private final int maxThreadPoolSize;
6972
private final ThreadLocal<MergeRateLimiter> onGoingMergeRateLimiter = new ThreadLocal<>();
73+
private final PriorityQueue<MergeTask> activeMergeTasksLocalSchedulerQueue = new PriorityQueue<>();
74+
private final List<MergeTask> activeMergeTasksExecutingOnLocalSchedulerList = new ArrayList<>();
7075

7176
public ThreadPoolMergeScheduler(ShardId shardId, IndexSettings indexSettings, ThreadPool threadPool) {
7277
this.config = indexSettings.getMergeSchedulerConfig();
@@ -135,6 +140,32 @@ private void submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge me
135140
if (mergeTask.isAutoThrottle) {
136141
trackNewActiveThrottledMergeTask(mergeTask, maxThreadPoolSize);
137142
}
143+
synchronized (this) {
144+
activeMergeTasksLocalSchedulerQueue.add(mergeTask);
145+
}
146+
maybeExecuteNextMerge();
147+
}
148+
149+
private void mergeDone(MergeTask mergeTask) {
150+
synchronized (this) {
151+
activeMergeTasksExecutingOnLocalSchedulerList.remove(mergeTask);
152+
}
153+
maybeExecuteNextMerge();
154+
}
155+
156+
private void maybeExecuteNextMerge() {
157+
MergeTask mergeTask;
158+
synchronized (this) {
159+
if (activeMergeTasksExecutingOnLocalSchedulerList.size() >= config.getMaxThreadCount()) {
160+
return;
161+
}
162+
mergeTask = activeMergeTasksLocalSchedulerQueue.poll();
163+
if (mergeTask == null) {
164+
// no more merges to execute
165+
return;
166+
}
167+
activeMergeTasksExecutingOnLocalSchedulerList.add(mergeTask);
168+
}
138169
executorService.execute(mergeTask);
139170
}
140171

@@ -293,6 +324,7 @@ public void onAfter() {
293324
if (isAutoThrottle) {
294325
removeFromActiveThrottledMergeTasks(this);
295326
}
327+
mergeDone(this);
296328
// kick-off next merge, if any
297329
MergePolicy.OneMerge nextMerge = null;
298330
try {
@@ -319,6 +351,7 @@ public void onFailure(Exception e) {
319351
// plus the engine is probably going to be failed when any merge fails,
320352
// but keep this in case something believes calling `MergeTask#onFailure` is a sane way to abort a merge
321353
abortOnGoingMerge();
354+
mergeDone(this);
322355
handleMergeException(e);
323356
}
324357

@@ -333,6 +366,7 @@ public void onRejection(Exception e) {
333366
message(String.format(Locale.ROOT, "merge task [%s] rejected by thread pool, aborting", onGoingMerge.getId()));
334367
}
335368
abortOnGoingMerge();
369+
mergeDone(this);
336370
}
337371

338372
private void abortOnGoingMerge() {

0 commit comments

Comments
 (0)