Skip to content

Commit 0003f10

Browse files
testMergesExecuteInSizeOrder
1 parent 31b0867 commit 0003f10

File tree

3 files changed

+67
-8
lines changed

3 files changed

+67
-8
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,21 @@ public ThreadPoolMergeScheduler(
8484
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
8585
}
8686

87+
// used for tests
88+
ThreadPoolMergeScheduler(
89+
ShardId shardId,
90+
MergeSchedulerConfig mergeSchedulerConfig,
91+
Logger logger,
92+
MergeTracking mergeTracking,
93+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
94+
) {
95+
this.shardId = shardId;
96+
this.config = mergeSchedulerConfig;
97+
this.logger = logger;
98+
this.mergeTracking = mergeTracking;
99+
this.threadPoolMergeExecutorService = threadPoolMergeExecutorService;
100+
}
101+
87102
@Override
88103
public Set<OnGoingMerge> onGoingMerges() {
89104
return mergeTracking.onGoingMerges();
@@ -283,6 +298,7 @@ public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in) {
283298
// the combined IO rate per node is, roughly, 'thread_pool_size * merge_queue#targetMBPerSec', as
284299
// the per-thread IO rate is updated, best effort, for all running merge threads concomitantly.
285300
if (merge.isAborted()) {
301+
// merges can theoretically be aborted at any moment
286302
return in;
287303
}
288304
MergeTask mergeTask = currentlyRunningMergeTasks.get(merge);
@@ -408,7 +424,7 @@ void abortOnGoingMerge() {
408424
}
409425

410426
long estimatedMergeSize() {
411-
return onGoingMerge.getMerge().estimatedMergeBytes;
427+
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
412428
}
413429

414430
@Override

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ public void testMergeTasksExecuteInSizeOrder() {
570570
}
571571
}
572572

573-
private boolean runOneTask(DeterministicTaskQueue deterministicTaskQueue) {
573+
private static boolean runOneTask(DeterministicTaskQueue deterministicTaskQueue) {
574574
while (deterministicTaskQueue.hasAnyTasks()) {
575575
if (deterministicTaskQueue.hasRunnableTasks()) {
576576
deterministicTaskQueue.runRandomTask();

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
package org.elasticsearch.index.engine;
1111

1212
import org.apache.lucene.index.MergePolicy;
13-
import org.apache.lucene.index.MergeScheduler;
13+
import org.apache.lucene.index.MergeScheduler.MergeSource;
1414
import org.apache.lucene.index.MergeTrigger;
1515
import org.apache.lucene.store.MergeInfo;
1616
import org.elasticsearch.common.settings.Settings;
@@ -24,32 +24,75 @@
2424
import org.junit.Before;
2525
import org.mockito.ArgumentCaptor;
2626

27+
import java.io.IOException;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
import static org.hamcrest.Matchers.is;
32+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
33+
import static org.mockito.ArgumentMatchers.any;
34+
import static org.mockito.Mockito.doAnswer;
2735
import static org.mockito.Mockito.mock;
2836
import static org.mockito.Mockito.verify;
2937
import static org.mockito.Mockito.when;
3038

3139
public class ThreadPoolMergeSchedulerTests extends ESTestCase {
3240

3341
DeterministicTaskQueue deterministicTaskQueue;
34-
ThreadPool testThreadPool;
42+
ThreadPool mergesExecutorThreadPool;
3543
Settings settingsWithMergeScheduler;
3644
IndexSettings indexSettings;
3745
ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
3846

3947
@Before
4048
public void setUpThreadPool() {
4149
deterministicTaskQueue = new DeterministicTaskQueue();
42-
testThreadPool = deterministicTaskQueue.getThreadPool();
50+
mergesExecutorThreadPool = deterministicTaskQueue.getThreadPool();
4351
settingsWithMergeScheduler = Settings.builder()
4452
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
4553
.build();
4654
indexSettings = IndexSettingsModule.newIndexSettings("index", settingsWithMergeScheduler);
4755
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
48-
testThreadPool,
56+
mergesExecutorThreadPool,
4957
settingsWithMergeScheduler
5058
);
5159
}
5260

61+
public void testMergesExecuteInSizeOrder() throws IOException {
62+
try (
63+
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
64+
new ShardId("index", "_na_", 1),
65+
indexSettings,
66+
threadPoolMergeExecutorService
67+
)
68+
) {
69+
List<MergePolicy.OneMerge> executedMergesList = new ArrayList<>();
70+
int mergeCount = randomIntBetween(2, 10);
71+
for (int i = 0; i < mergeCount; i++) {
72+
MergeSource mergeSource = mock(MergeSource.class);
73+
MergePolicy.OneMerge oneMerge = mock(MergePolicy.OneMerge.class);
74+
when(oneMerge.getStoreMergeInfo()).thenReturn(
75+
new MergeInfo(randomNonNegativeInt(), randomLongBetween(1L, 10L), randomBoolean(), randomFrom(-1, randomNonNegativeInt()))
76+
);
77+
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
78+
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (MergePolicy.OneMerge) null);
79+
doAnswer(invocation -> {
80+
executedMergesList.add((MergePolicy.OneMerge) invocation.getArguments()[0]);
81+
return null;
82+
}).when(mergeSource).merge(any(MergePolicy.OneMerge.class));
83+
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
84+
}
85+
deterministicTaskQueue.runAllTasks();
86+
assertThat(executedMergesList.size(), is(mergeCount));
87+
// assert merges are executed in ascending size order
88+
for (int i = 1; i < mergeCount; i++) {
89+
assertThat(executedMergesList.get(i - 1).getStoreMergeInfo().estimatedMergeBytes(),
90+
lessThanOrEqualTo(executedMergesList.get(i).getStoreMergeInfo().estimatedMergeBytes()));
91+
}
92+
}
93+
assertTrue(threadPoolMergeExecutorService.allDone());
94+
}
95+
5396
public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exception {
5497
// merge scheduler configured with auto IO throttle disabled
5598
Settings settings = Settings.builder()
@@ -64,7 +107,7 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce
64107
new MergeInfo(randomNonNegativeInt(), randomNonNegativeLong(), randomBoolean(), randomFrom(-1, randomNonNegativeInt()))
65108
);
66109
when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress);
67-
MergeScheduler.MergeSource mergeSource = mock(MergeScheduler.MergeSource.class);
110+
MergeSource mergeSource = mock(MergeSource.class);
68111
when(mergeSource.getNextMerge()).thenReturn(oneMerge);
69112
try (
70113
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
@@ -99,7 +142,7 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
99142
new MergeInfo(randomNonNegativeInt(), randomNonNegativeLong(), randomBoolean(), randomNonNegativeInt())
100143
);
101144
when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress);
102-
MergeScheduler.MergeSource mergeSource = mock(MergeScheduler.MergeSource.class);
145+
MergeSource mergeSource = mock(MergeSource.class);
103146
when(mergeSource.getNextMerge()).thenReturn(oneMerge);
104147
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
105148
try (

0 commit comments

Comments
 (0)