Skip to content

Commit cd39b81

Browse files
testMergeTasksRunConcurrently
1 parent 30e4860 commit cd39b81

File tree

2 files changed

+90
-1
lines changed

2 files changed

+90
-1
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.threadpool.ThreadPool;
1818

1919
import java.util.Comparator;
20+
import java.util.PriorityQueue;
2021
import java.util.Set;
2122
import java.util.concurrent.ExecutorService;
2223
import java.util.concurrent.PriorityBlockingQueue;
@@ -232,6 +233,16 @@ private static long newTargetIORateBytesPerSec(
232233
return newTargetIORateBytesPerSec;
233234
}
234235

236+
// exposed for tests
237+
Set<MergeTask> getCurrentlyRunningMergeTasks() {
238+
return currentlyRunningMergeTasks;
239+
}
240+
241+
// exposed for tests
242+
PriorityBlockingQueue<MergeTask> getQueuedMergeTasks() {
243+
return queuedMergeTasks;
244+
}
245+
235246
// exposed for tests and stats
236247
long getTargetIORateBytesPerSec() {
237248
return targetIORateBytesPerSec.get();

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.PriorityQueue;
2626
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.Semaphore;
2728
import java.util.concurrent.ThreadPoolExecutor;
2829
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,7 +41,7 @@
4041

4142
public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {
4243

43-
public void testMergeTaskIsAbortedWhenThreadPoolIsShutdown() {
44+
public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() {
4445
TestThreadPool testThreadPool = new TestThreadPool("test");
4546
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
4647
.maybeCreateThreadPoolMergeExecutorService(
@@ -162,6 +163,83 @@ private void testIORateAdjustedForNewlySubmittedTasks(
162163
assertTrue(threadPoolMergeExecutorService.allDone());
163164
}
164165

166+
public void testMergeTasksRunConcurrently() throws Exception {
167+
// at least 2 merges allowed to run concurrently
168+
int mergeExecutorThreadCount = randomIntBetween(2, 5);
169+
Settings settings = Settings.builder()
170+
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
171+
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
172+
.build();
173+
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
174+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
175+
.maybeCreateThreadPoolMergeExecutorService(testThreadPool, settings);
176+
assertNotNull(threadPoolMergeExecutorService);
177+
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
178+
// more merge tasks than max concurrent merges allowed to run concurrently
179+
int totalMergeTasksCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
180+
Semaphore runMergeSemaphore = new Semaphore(0);
181+
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
182+
for (int i = 0; i < totalMergeTasksCount; i++) {
183+
MergeTask mergeTask = mock(MergeTask.class);
184+
when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean());
185+
doAnswer(mock -> {
186+
// each individual merge task can either "run" or be "backlogged"
187+
boolean runNowOrBacklog = randomBoolean();
188+
if (runNowOrBacklog == false) {
189+
testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
190+
// reenqueue backlogged merge task
191+
threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask);
192+
});
193+
}
194+
return runNowOrBacklog;
195+
}).when(mergeTask).runNowOrBacklog();
196+
doAnswer(mock -> {
197+
// wait to be signalled before completing
198+
runMergeSemaphore.acquire();
199+
return null;
200+
}).when(mergeTask).run();
201+
threadPoolMergeExecutorService.submitMergeTask(mergeTask);
202+
}
203+
for (int completedTasksCount = 0; completedTasksCount < totalMergeTasksCount
204+
- mergeExecutorThreadCount; completedTasksCount++) {
205+
int finalCompletedTasksCount = completedTasksCount;
206+
assertBusy(() -> {
207+
// assert that there are merge tasks running concurrently at the max allowed concurrency rate
208+
assertThat(threadPoolMergeExecutorService.getCurrentlyRunningMergeTasks().size(), is(mergeExecutorThreadCount));
209+
// with the other merge tasks enqueued
210+
assertThat(
211+
threadPoolMergeExecutorService.getQueuedMergeTasks().size(),
212+
is(totalMergeTasksCount - mergeExecutorThreadCount - finalCompletedTasksCount)
213+
);
214+
// also check thread-pool stats for the same
215+
assertThat(threadPoolExecutor.getActiveCount(), is(mergeExecutorThreadCount));
216+
assertThat(
217+
threadPoolExecutor.getQueue().size(),
218+
is(totalMergeTasksCount - mergeExecutorThreadCount - finalCompletedTasksCount)
219+
);
220+
});
221+
// let one merge task finish running
222+
runMergeSemaphore.release();
223+
}
224+
// merge tasks started drying out
225+
for (int remainingMergeTasksCount = mergeExecutorThreadCount; remainingMergeTasksCount >= 0; remainingMergeTasksCount--) {
226+
int finalRemainingMergeTasksCount = remainingMergeTasksCount;
227+
assertBusy(() -> {
228+
// there are fewer available merges than available threads
229+
assertThat(threadPoolMergeExecutorService.getCurrentlyRunningMergeTasks().size(), is(finalRemainingMergeTasksCount));
230+
// no more merges enqueued
231+
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
232+
// also check thread-pool stats for the same
233+
assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergeTasksCount));
234+
assertThat(threadPoolExecutor.getQueue().size(), is(0));
235+
});
236+
// let one merge task finish running
237+
runMergeSemaphore.release();
238+
}
239+
assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone()));
240+
}
241+
}
242+
165243
public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception {
166244
int mergeExecutorThreadCount = randomIntBetween(1, 3);
167245
Settings settings = Settings.builder()

0 commit comments

Comments
 (0)