Skip to content

Commit 519c48e

Browse files
Simple tests
1 parent dcd2af1 commit 519c48e

File tree

3 files changed

+190
-1
lines changed

3 files changed

+190
-1
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
287287
};
288288
}
289289

290-
final class MergeTask implements Runnable, Comparable<MergeTask> {
290+
class MergeTask implements Runnable, Comparable<MergeTask> {
291291
private final String name;
292292
private final AtomicLong mergeStartTimeNS;
293293
private final MergeSource mergeSource;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
14+
import org.elasticsearch.index.IndexSettings;
15+
import org.elasticsearch.test.ESTestCase;
16+
import org.elasticsearch.test.IndexSettingsModule;
17+
import org.elasticsearch.threadpool.TestThreadPool;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
import org.junit.Before;
20+
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
25+
26+
public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {
27+
28+
DeterministicTaskQueue deterministicTaskQueue;
29+
ThreadPool testThreadPool;
30+
IndexSettings indexSettings;
31+
32+
@Before
33+
public void setUpThreadPool() {
34+
deterministicTaskQueue = new DeterministicTaskQueue();
35+
testThreadPool = deterministicTaskQueue.getThreadPool();
36+
indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
37+
}
38+
39+
public void testMergeTasksAreAbortedWhenThreadPoolIsShutdown() {
40+
final TestThreadPool testThreadPool = new TestThreadPool("test");
41+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
42+
.maybeCreateThreadPoolMergeExecutorService(
43+
testThreadPool,
44+
Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build()
45+
);
46+
assertNotNull(threadPoolMergeExecutorService);
47+
assertTrue(threadPoolMergeExecutorService.allDone());
48+
// shutdown the thread pool
49+
testThreadPool.shutdown();
50+
ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
51+
when(mergeTask.isRunning()).thenReturn(false);
52+
boolean mergeTaskSupportsIOThrottling = randomBoolean();
53+
when(mergeTask.supportsIOThrottling()).thenReturn(mergeTaskSupportsIOThrottling);
54+
assertFalse(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
55+
verify(mergeTask).abortOnGoingMerge();
56+
verify(mergeTask, times(0)).runNowOrBacklog();
57+
verify(mergeTask, times(0)).run();
58+
assertTrue(threadPoolMergeExecutorService.allDone());
59+
}
60+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.index.MergePolicy;
13+
import org.apache.lucene.index.MergeScheduler;
14+
import org.apache.lucene.index.MergeTrigger;
15+
import org.apache.lucene.store.MergeInfo;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
18+
import org.elasticsearch.index.IndexSettings;
19+
import org.elasticsearch.index.MergeSchedulerConfig;
20+
import org.elasticsearch.index.shard.ShardId;
21+
import org.elasticsearch.test.ESTestCase;
22+
import org.elasticsearch.test.IndexSettingsModule;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
import org.junit.Before;
25+
import org.mockito.ArgumentCaptor;
26+
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.when;
30+
31+
public class ThreadPoolMergeSchedulerTests extends ESTestCase {
32+
33+
DeterministicTaskQueue deterministicTaskQueue;
34+
ThreadPool testThreadPool;
35+
Settings settingsWithMergeScheduler;
36+
IndexSettings indexSettings;
37+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
38+
39+
@Before
40+
public void setUpThreadPool() {
41+
deterministicTaskQueue = new DeterministicTaskQueue();
42+
testThreadPool = deterministicTaskQueue.getThreadPool();
43+
settingsWithMergeScheduler = Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build();
44+
indexSettings = IndexSettingsModule.newIndexSettings("index", settingsWithMergeScheduler);
45+
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(testThreadPool, settingsWithMergeScheduler);
46+
}
47+
48+
public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exception {
49+
// merge scheduler configured with auto IO throttle disabled
50+
Settings settings = Settings.builder()
51+
.put(settingsWithMergeScheduler)
52+
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), false)
53+
.build();
54+
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", settings);
55+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
56+
MergePolicy.OneMergeProgress oneMergeProgress = new MergePolicy.OneMergeProgress();
57+
MergePolicy.OneMerge oneMerge = mock(MergePolicy.OneMerge.class);
58+
when(oneMerge.getStoreMergeInfo()).thenReturn(new MergeInfo(randomNonNegativeInt(), randomNonNegativeLong(), randomBoolean(),
59+
randomFrom(-1, randomNonNegativeInt())));
60+
when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress);
61+
MergeScheduler.MergeSource mergeSource = mock(MergeScheduler.MergeSource.class);
62+
when(mergeSource.getNextMerge()).thenReturn(oneMerge);
63+
try (ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(new ShardId("index", "_na_", 1),
64+
indexSettings, threadPoolMergeExecutorService)) {
65+
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
66+
var submittedMergeTaskCaptor = ArgumentCaptor.forClass(ThreadPoolMergeScheduler.MergeTask.class);
67+
verify(threadPoolMergeExecutorService).submitMergeTask(submittedMergeTaskCaptor.capture());
68+
assertFalse(submittedMergeTaskCaptor.getValue().supportsIOThrottling());
69+
}
70+
}
71+
72+
public void testAutoIOThrottleForMergeTasks() throws Exception {
73+
// merge scheduler configured with auto IO throttle disabled
74+
final Settings settings;
75+
if (randomBoolean()) {
76+
settings = Settings.builder()
77+
.put(settingsWithMergeScheduler)
78+
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), true)
79+
.build();
80+
} else {
81+
settings = Settings.builder().put(settingsWithMergeScheduler).build();
82+
}
83+
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", settings);
84+
MergePolicy.OneMergeProgress oneMergeProgress = new MergePolicy.OneMergeProgress();
85+
MergePolicy.OneMerge oneMerge = mock(MergePolicy.OneMerge.class);
86+
// forced merge with a set number of segments
87+
when(oneMerge.getStoreMergeInfo()).thenReturn(
88+
new MergeInfo(randomNonNegativeInt(), randomNonNegativeLong(), randomBoolean(), randomNonNegativeInt())
89+
);
90+
when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress);
91+
MergeScheduler.MergeSource mergeSource = mock(MergeScheduler.MergeSource.class);
92+
when(mergeSource.getNextMerge()).thenReturn(oneMerge);
93+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
94+
try (ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(new ShardId("index", "_na_", 1),
95+
indexSettings, threadPoolMergeExecutorService)) {
96+
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
97+
var submittedMergeTaskCaptor = ArgumentCaptor.forClass(ThreadPoolMergeScheduler.MergeTask.class);
98+
verify(threadPoolMergeExecutorService).submitMergeTask(submittedMergeTaskCaptor.capture());
99+
// forced merge tasks should not be IO throttled
100+
assertFalse(submittedMergeTaskCaptor.getValue().supportsIOThrottling());
101+
}
102+
// NOT a forced merge
103+
when(oneMerge.getStoreMergeInfo()).thenReturn(new MergeInfo(randomNonNegativeInt(), randomNonNegativeLong(), randomBoolean(), -1));
104+
threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
105+
try (ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(new ShardId("index", "_na_", 1),
106+
indexSettings, threadPoolMergeExecutorService)) {
107+
// merge submitted upon closing
108+
threadPoolMergeScheduler.merge(mergeSource, MergeTrigger.CLOSING);
109+
var submittedMergeTaskCaptor = ArgumentCaptor.forClass(ThreadPoolMergeScheduler.MergeTask.class);
110+
verify(threadPoolMergeExecutorService).submitMergeTask(submittedMergeTaskCaptor.capture());
111+
// merge tasks submitted when closing should not be IO throttled
112+
assertFalse(submittedMergeTaskCaptor.getValue().supportsIOThrottling());
113+
}
114+
// otherwise, merge tasks should be auto IO throttled
115+
threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
116+
try (ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(new ShardId("index", "_na_", 1),
117+
indexSettings, threadPoolMergeExecutorService)) {
118+
// merge submitted upon closing
119+
threadPoolMergeScheduler.merge(
120+
mergeSource,
121+
randomValueOtherThan(MergeTrigger.CLOSING, () -> randomFrom(MergeTrigger.values()))
122+
);
123+
var submittedMergeTaskCaptor = ArgumentCaptor.forClass(ThreadPoolMergeScheduler.MergeTask.class);
124+
verify(threadPoolMergeExecutorService).submitMergeTask(submittedMergeTaskCaptor.capture());
125+
// merge tasks should be auto IO throttled
126+
assertTrue(submittedMergeTaskCaptor.getValue().supportsIOThrottling());
127+
}
128+
}
129+
}

0 commit comments

Comments
 (0)