Skip to content

Commit ba4c7ee

Browse files
WIP
1 parent a76744a commit ba4c7ee

File tree

3 files changed

+85
-18
lines changed

3 files changed

+85
-18
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
1717
import org.elasticsearch.threadpool.ThreadPool;
1818

19+
import java.util.Comparator;
1920
import java.util.Set;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.PriorityBlockingQueue;
@@ -45,7 +46,10 @@ public class ThreadPoolMergeExecutorService {
4546
* The merge tasks that are waiting execution. This does NOT include backlogged or currently executing merge tasks.
4647
* For instance, this can be empty while there are backlogged merge tasks awaiting re-enqueuing.
4748
*/
48-
private final PriorityBlockingQueue<MergeTask> queuedMergeTasks = new PriorityBlockingQueue<>();
49+
private final PriorityBlockingQueue<MergeTask> queuedMergeTasks = new PriorityBlockingQueue<>(
50+
32,
51+
Comparator.comparingLong(MergeTask::estimatedMergeSize)
52+
);
4953
/**
5054
* The set of all merge tasks currently being executed by merge threads from the pool.
5155
* These are tracked notably in order to be able to update their disk IO throttle rate, after they have started, while executing.

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ protected void handleMergeException(Throwable t) {
155155
throw new MergePolicy.MergeException(t);
156156
}
157157

158-
private boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
158+
// package-private for tests
159+
boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
159160
try {
160161
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
161162
return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
@@ -164,6 +165,19 @@ private boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge
164165
}
165166
}
166167

168+
// package-private for tests
169+
MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
170+
// forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled
171+
boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
172+
// IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
173+
return new MergeTask(
174+
mergeSource,
175+
merge,
176+
isAutoThrottle && config.isAutoThrottle(),
177+
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId
178+
);
179+
}
180+
167181
private void checkMergeTaskThrottling() {
168182
long submittedMergesCount = submittedMergeTaskCount.get();
169183
long doneMergesCount = doneMergeTaskCount.get();
@@ -180,18 +194,6 @@ private void checkMergeTaskThrottling() {
180194
}
181195
}
182196

183-
private MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
184-
// forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled
185-
boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
186-
// IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
187-
return new MergeTask(
188-
mergeSource,
189-
merge,
190-
isAutoThrottle && config.isAutoThrottle(),
191-
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId
192-
);
193-
}
194-
195197
// synchronized so that {@code #closed}, {@code #currentlyRunningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
196198
private synchronized boolean runNowOrBacklog(MergeTask mergeTask) {
197199
assert mergeTask.isRunning() == false;

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,23 @@
1414
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
1515
import org.elasticsearch.common.util.concurrent.EsExecutors;
1616
import org.elasticsearch.index.IndexSettings;
17+
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
1718
import org.elasticsearch.test.ESTestCase;
1819
import org.elasticsearch.test.IndexSettingsModule;
1920
import org.elasticsearch.threadpool.TestThreadPool;
2021
import org.elasticsearch.threadpool.ThreadPool;
2122
import org.junit.Before;
2223

2324
import java.util.Collection;
25+
import java.util.HashSet;
26+
import java.util.PriorityQueue;
27+
import java.util.Set;
2428
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.function.Consumer;
2531

2632
import static org.hamcrest.Matchers.equalTo;
33+
import static org.mockito.ArgumentMatchers.any;
2734
import static org.mockito.ArgumentMatchers.anyDouble;
2835
import static org.mockito.Mockito.doAnswer;
2936
import static org.mockito.Mockito.mock;
@@ -55,7 +62,7 @@ public void testMergeTasksAreAbortedWhenThreadPoolIsShutdown() {
5562
assertTrue(threadPoolMergeExecutorService.allDone());
5663
// shutdown the thread pool
5764
testThreadPool.shutdown();
58-
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
65+
MergeTask mergeTask = mock(MergeTask.class);
5966
when(mergeTask.isRunning()).thenReturn(false);
6067
boolean mergeTaskSupportsIOThrottling = randomBoolean();
6168
when(mergeTask.supportsIOThrottling()).thenReturn(mergeTaskSupportsIOThrottling);
@@ -82,10 +89,10 @@ public void testBackloggedMergeTasksAreAllExecutedExactlyOnce() throws Exception
8289
CountDownLatch mergeTasksDoneLatch = new CountDownLatch(mergeTaskCount);
8390
CountDownLatch mergeTasksReadyLatch = new CountDownLatch(mergeTaskCount);
8491
CountDownLatch submitTaskLatch = new CountDownLatch(1);
85-
Collection<ThreadPoolMergeScheduler.MergeTask> generatedMergeTasks = ConcurrentCollections.newConcurrentSet();
92+
Collection<MergeTask> generatedMergeTasks = ConcurrentCollections.newConcurrentSet();
8693
for (int i = 0; i < mergeTaskCount; i++) {
8794
new Thread(() -> {
88-
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
95+
MergeTask mergeTask = mock(MergeTask.class);
8996
when(mergeTask.isRunning()).thenReturn(false);
9097
boolean supportsIOThrottling = randomBoolean();
9198
when(mergeTask.supportsIOThrottling()).thenReturn(supportsIOThrottling);
@@ -115,7 +122,7 @@ public void testBackloggedMergeTasksAreAllExecutedExactlyOnce() throws Exception
115122
submitTaskLatch.countDown();
116123
safeAwait(mergeTasksDoneLatch);
117124
assertBusy(() -> {
118-
for (ThreadPoolMergeScheduler.MergeTask mergeTask : generatedMergeTasks) {
125+
for (MergeTask mergeTask : generatedMergeTasks) {
119126
verify(mergeTask, times(1)).run();
120127
if (mergeTask.supportsIOThrottling()) {
121128
verify(mergeTask).setIORateLimit(anyDouble());
@@ -127,4 +134,58 @@ public void testBackloggedMergeTasksAreAllExecutedExactlyOnce() throws Exception
127134
});
128135
}
129136
}
137+
138+
public void testMergeTasksRunInSizeOrderWithBacklog() {
139+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
140+
.maybeCreateThreadPoolMergeExecutorService(
141+
testThreadPool,
142+
Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build()
143+
);
144+
assertNotNull(threadPoolMergeExecutorService);
145+
threadPoolMergeExecutorService.submitMergeTask()
146+
// int mergeTaskCount = randomIntBetween(5, 50);
147+
int mergeTaskCount = 4;
148+
PriorityQueue<MergeTask> mergeTasksStillToRun = new PriorityQueue<>();
149+
for (int i = 0; i < mergeTaskCount; i++) {
150+
MergeTask mergeTask = mock(MergeTask.class);
151+
when(mergeTask.isRunning()).thenReturn(false);
152+
boolean supportsIOThrottling = randomBoolean();
153+
when(mergeTask.supportsIOThrottling()).thenReturn(supportsIOThrottling);
154+
long mergeSize = randomLongBetween(1, 10);
155+
when(mergeTask.estimatedMergeSize()).thenReturn(mergeSize);
156+
doAnswer(mock -> {
157+
@SuppressWarnings("unchecked")
158+
MergeTask other = (MergeTask) mock.getArguments()[1];
159+
return Long.com
160+
}).when(mergeTask).compareTo(any(MergeTask.class));
161+
doAnswer(mock -> {
162+
mergeTasksStillToRun.remove(mergeTask);
163+
// each individual merge task can either "run" or be "backlogged"
164+
boolean runNowOrBacklog = randomBoolean();
165+
if (runNowOrBacklog == false) {
166+
if (mergeTasksStillToRun.isEmpty()) {
167+
// reenqueue backlogged merge task now, otherwise the task won't finish
168+
threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask);
169+
mergeTasksStillToRun.add(mergeTask);
170+
} else {
171+
testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
172+
// reenqueue backlogged merge task sometime in the future
173+
threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask);
174+
mergeTasksStillToRun.add(mergeTask);
175+
});
176+
}
177+
}
178+
return runNowOrBacklog;
179+
}).when(mergeTask).runNowOrBacklog();
180+
doAnswer(mock -> {
181+
if (mergeTasksStillToRun.isEmpty() == false) {
182+
assertTrue(mergeTask.estimatedMergeSize() <= mergeTasksStillToRun.peek().estimatedMergeSize());
183+
}
184+
return null;
185+
}).when(mergeTask).run();
186+
mergeTasksStillToRun.add(mergeTask);
187+
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
188+
}
189+
deterministicTaskQueue.runAllTasks();
190+
}
130191
}

0 commit comments

Comments
 (0)