Skip to content

Commit 1fa7ba8

Browse files
testMergesRunConcurrently
1 parent 3586ee1 commit 1fa7ba8

File tree

2 files changed

+100
-15
lines changed

2 files changed

+100
-15
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,21 +84,6 @@ public ThreadPoolMergeScheduler(
8484
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
8585
}
8686

87-
// used for tests
88-
ThreadPoolMergeScheduler(
89-
ShardId shardId,
90-
MergeSchedulerConfig mergeSchedulerConfig,
91-
Logger logger,
92-
MergeTracking mergeTracking,
93-
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
94-
) {
95-
this.shardId = shardId;
96-
this.config = mergeSchedulerConfig;
97-
this.logger = logger;
98-
this.mergeTracking = mergeTracking;
99-
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
100-
}
101-
10287
@Override
10388
public Set<OnGoingMerge> onGoingMerges() {
10489
return mergeTracking.onGoingMerges();
@@ -470,6 +455,16 @@ public void close() throws IOException {
470455
}
471456
}
472457

458+
// exposed for tests
459+
PriorityQueue<MergeTask> getBackloggedMergeTasks() {
460+
return backloggedMergeTasks;
461+
}
462+
463+
// exposed for tests
464+
Map<MergePolicy.OneMerge, MergeTask> getCurrentlyRunningMergeTasks() {
465+
return currentlyRunningMergeTasks;
466+
}
467+
473468
private static double nsToSec(long ns) {
474469
return ns / (double) TimeUnit.SECONDS.toNanos(1);
475470
}

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import java.io.IOException;
3131
import java.util.ArrayList;
3232
import java.util.List;
33+
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.Semaphore;
35+
import java.util.concurrent.ThreadPoolExecutor;
3436
import java.util.concurrent.atomic.AtomicBoolean;
3537
import java.util.concurrent.atomic.AtomicInteger;
3638

@@ -172,6 +174,94 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception
172174
}
173175
}
174176

177+
public void testMergesRunConcurrently() throws Exception {
178+
// min 2 allowed concurrent merges, per scheduler
179+
int mergeSchedulerMaxThreadCount = randomIntBetween(2, 4);
180+
// the merge executor has at least 1 extra thread available
181+
int mergeExecutorThreadCount = mergeSchedulerMaxThreadCount + randomIntBetween(1, 3);
182+
Settings settings = Settings.builder()
183+
.put(settingsWithMergeScheduler)
184+
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
185+
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeSchedulerMaxThreadCount)
186+
.build();
187+
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
188+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
189+
.maybeCreateThreadPoolMergeExecutorService(testThreadPool, settings);
190+
assertNotNull(threadPoolMergeExecutorService);
191+
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
192+
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
193+
try (
194+
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
195+
new ShardId("index", "_na_", 1),
196+
IndexSettingsModule.newIndexSettings("index", settings),
197+
threadPoolMergeExecutorService
198+
)
199+
) {
200+
// at least 1 extra merge than there are concurrently allowed
201+
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 10);
202+
Semaphore runMergeSemaphore = new Semaphore(0);
203+
for (int i = 0; i < mergeCount; i++) {
204+
MergeSource mergeSource = mock(MergeSource.class);
205+
OneMerge oneMerge = mock(OneMerge.class);
206+
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
207+
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
208+
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
209+
doAnswer(invocation -> {
210+
OneMerge merge = (OneMerge) invocation.getArguments()[0];
211+
assertFalse(merge.isAborted());
212+
// wait to be signalled before completing
213+
runMergeSemaphore.acquire();
214+
return null;
215+
}).when(mergeSource).merge(any(OneMerge.class));
216+
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
217+
}
218+
for (int completedMergesCount = 0; completedMergesCount < mergeCount - mergeSchedulerMaxThreadCount; completedMergesCount++) {
219+
int finalCompletedMergesCount = completedMergesCount;
220+
assertBusy(() -> {
221+
// assert that there are merges running concurrently at the max allowed concurrency rate
222+
assertThat(threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(), is(mergeSchedulerMaxThreadCount));
223+
// with the other merges backlogged
224+
assertThat(
225+
threadPoolMergeScheduler.getBackloggedMergeTasks().size(),
226+
is(mergeCount - mergeSchedulerMaxThreadCount - finalCompletedMergesCount)
227+
);
228+
// also check the same for the thread-pool executor
229+
assertThat(threadPoolMergeExecutorService.getCurrentlyRunningMergeTasks().size(), is(mergeSchedulerMaxThreadCount));
230+
// queued merge tasks do not include backlogged merges
231+
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
232+
// also check thread-pool stats for the same
233+
// there are active thread-pool threads waiting for the backlogged merge tasks to be re-enqueued
234+
int activeMergeThreads = Math.min(mergeCount - finalCompletedMergesCount, mergeExecutorThreadCount);
235+
assertThat(threadPoolExecutor.getActiveCount(), is(activeMergeThreads));
236+
assertThat(threadPoolExecutor.getQueue().size(), is(mergeCount - finalCompletedMergesCount - activeMergeThreads));
237+
});
238+
// let one merge task finish running
239+
runMergeSemaphore.release();
240+
}
241+
// there are now fewer merges still running than available threads
242+
for (int remainingMergesCount = mergeSchedulerMaxThreadCount; remainingMergesCount >= 0; remainingMergesCount--) {
243+
int finalRemainingMergesCount = remainingMergesCount;
244+
assertBusy(() -> {
245+
// there are fewer available merges than available threads
246+
assertThat(threadPoolMergeScheduler.getCurrentlyRunningMergeTasks().size(), is(finalRemainingMergesCount));
247+
// no more backlogged merges
248+
assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(0));
249+
// also check thread-pool executor for the same
250+
assertThat(threadPoolMergeExecutorService.getCurrentlyRunningMergeTasks().size(), is(finalRemainingMergesCount));
251+
// no more backlogged merges
252+
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
253+
// also check thread-pool stats for the same
254+
assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergesCount));
255+
assertThat(threadPoolExecutor.getQueue().size(), is(0));
256+
});
257+
// let one merge task finish running
258+
runMergeSemaphore.release();
259+
}
260+
assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone()));
261+
}
262+
}
263+
}
264+
175265
public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exception {
176266
// merge scheduler configured with auto IO throttle disabled
177267
Settings settings = Settings.builder()

0 commit comments

Comments
 (0)