Skip to content

Commit 34ca0fa

Browse files
testMergeSourceWithFollowUpMergesRunSequentially
1 parent dc80bef commit 34ca0fa

File tree

1 file changed

+90
-15
lines changed

1 file changed

+90
-15
lines changed

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,25 @@
1616
import org.apache.lucene.store.MergeInfo;
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
19+
import org.elasticsearch.common.util.concurrent.EsExecutors;
1920
import org.elasticsearch.index.IndexSettings;
2021
import org.elasticsearch.index.MergeSchedulerConfig;
2122
import org.elasticsearch.index.shard.ShardId;
2223
import org.elasticsearch.test.ESTestCase;
2324
import org.elasticsearch.test.IndexSettingsModule;
25+
import org.elasticsearch.threadpool.TestThreadPool;
2426
import org.elasticsearch.threadpool.ThreadPool;
2527
import org.junit.Before;
2628
import org.mockito.ArgumentCaptor;
2729

2830
import java.io.IOException;
2931
import java.util.ArrayList;
3032
import java.util.List;
33+
import java.util.concurrent.Semaphore;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.concurrent.atomic.AtomicInteger;
3136

37+
import static org.hamcrest.Matchers.equalTo;
3238
import static org.hamcrest.Matchers.is;
3339
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3440
import static org.mockito.ArgumentMatchers.any;
@@ -72,14 +78,7 @@ public void testMergesExecuteInSizeOrder() throws IOException {
7278
for (int i = 0; i < mergeCount; i++) {
7379
MergeSource mergeSource = mock(MergeSource.class);
7480
OneMerge oneMerge = mock(OneMerge.class);
75-
when(oneMerge.getStoreMergeInfo()).thenReturn(
76-
new MergeInfo(
77-
randomNonNegativeInt(),
78-
randomLongBetween(1L, 10L),
79-
randomBoolean(),
80-
randomFrom(-1, randomNonNegativeInt())
81-
)
82-
);
81+
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
8382
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
8483
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
8584
doAnswer(invocation -> {
@@ -103,6 +102,73 @@ public void testMergesExecuteInSizeOrder() throws IOException {
103102
assertTrue(threadPoolMergeExecutorService.allDone());
104103
}
105104

105+
public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception {
106+
// test with min 2 allowed concurrent merges
107+
int mergeExecutorThreadCount = randomIntBetween(2, 5);
108+
Settings settings = Settings.builder()
109+
.put(settingsWithMergeScheduler)
110+
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
111+
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount)
112+
.build();
113+
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
114+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
115+
.maybeCreateThreadPoolMergeExecutorService(testThreadPool, settings);
116+
assertNotNull(threadPoolMergeExecutorService);
117+
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
118+
try (
119+
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
120+
new ShardId("index", "_na_", 1),
121+
IndexSettingsModule.newIndexSettings("index", settings),
122+
threadPoolMergeExecutorService
123+
)
124+
) {
125+
MergeSource mergeSource = mock(MergeSource.class);
126+
OneMerge firstMerge = mock(OneMerge.class);
127+
when(firstMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
128+
when(firstMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
129+
// at least one followup merge + null (i.e. no more followups)
130+
int followUpMergeCount = randomIntBetween(2, 10);
131+
OneMerge[] followUpMerges = new OneMerge[followUpMergeCount];
132+
followUpMerges[followUpMergeCount - 1] = null;
133+
for (int i = 0; i < followUpMergeCount - 1; i++) {
134+
OneMerge oneMerge = mock(OneMerge.class);
135+
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
136+
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
137+
followUpMerges[i] = oneMerge;
138+
}
139+
when(mergeSource.getNextMerge()).thenReturn(firstMerge, followUpMerges);
140+
AtomicBoolean isMergeInProgress = new AtomicBoolean();
141+
AtomicInteger runMergeIdx = new AtomicInteger();
142+
Semaphore runMergeSemaphore = new Semaphore(0);
143+
Semaphore nextMergeSemaphore = new Semaphore(0);
144+
doAnswer(invocation -> {
145+
// assert only one merge can be in-progress at any point-in-time
146+
assertTrue(isMergeInProgress.compareAndSet(false, true));
147+
OneMerge mergeInvocation = (OneMerge) invocation.getArguments()[0];
148+
assertFalse(mergeInvocation.isAborted());
149+
// assert merges run in the order they are submitted
150+
if (runMergeIdx.get() == 0) {
151+
assertThat(mergeInvocation, is(firstMerge));
152+
} else {
153+
assertThat(mergeInvocation, is(followUpMerges[runMergeIdx.get() - 1]));
154+
}
155+
runMergeIdx.incrementAndGet();
156+
// await before returning from the merge in order to really ensure that follow-up merges don't run concurrently
157+
nextMergeSemaphore.release();
158+
runMergeSemaphore.acquire();
159+
assertTrue(isMergeInProgress.compareAndSet(true, false));
160+
return null;
161+
}).when(mergeSource).merge(any(OneMerge.class));
162+
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
163+
do {
164+
nextMergeSemaphore.acquire();
165+
runMergeSemaphore.release();
166+
} while (runMergeIdx.get() < followUpMergeCount);
167+
assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone()));
168+
}
169+
}
170+
}
171+
106172
public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exception {
107173
// merge scheduler configured with auto IO throttle disabled
108174
Settings settings = Settings.builder()
@@ -113,9 +179,7 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce
113179
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
114180
MergePolicy.OneMergeProgress oneMergeProgress = new MergePolicy.OneMergeProgress();
115181
OneMerge oneMerge = mock(OneMerge.class);
116-
when(oneMerge.getStoreMergeInfo()).thenReturn(
117-
new MergeInfo(randomNonNegativeInt(), randomNonNegativeLong(), randomBoolean(), randomFrom(-1, randomNonNegativeInt()))
118-
);
182+
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomNonNegativeLong()));
119183
when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress);
120184
MergeSource mergeSource = mock(MergeSource.class);
121185
when(mergeSource.getNextMerge()).thenReturn(oneMerge);
@@ -148,9 +212,7 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
148212
MergePolicy.OneMergeProgress oneMergeProgress = new MergePolicy.OneMergeProgress();
149213
OneMerge oneMerge = mock(OneMerge.class);
150214
// forced merge with a set number of segments
151-
when(oneMerge.getStoreMergeInfo()).thenReturn(
152-
new MergeInfo(randomNonNegativeInt(), randomNonNegativeLong(), randomBoolean(), randomNonNegativeInt())
153-
);
215+
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomNonNegativeLong(), randomNonNegativeInt()));
154216
when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress);
155217
MergeSource mergeSource = mock(MergeSource.class);
156218
when(mergeSource.getNextMerge()).thenReturn(oneMerge);
@@ -169,7 +231,7 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
169231
assertFalse(submittedMergeTaskCaptor.getValue().supportsIOThrottling());
170232
}
171233
// NOT a forced merge
172-
when(oneMerge.getStoreMergeInfo()).thenReturn(new MergeInfo(randomNonNegativeInt(), randomNonNegativeLong(), randomBoolean(), -1));
234+
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomNonNegativeLong(), -1));
173235
threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
174236
try (
175237
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
@@ -205,4 +267,17 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
205267
assertTrue(submittedMergeTaskCaptor.getValue().supportsIOThrottling());
206268
}
207269
}
270+
271+
private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {
272+
return getNewMergeInfo(estimatedMergeBytes, randomFrom(-1, randomNonNegativeInt()));
273+
}
274+
275+
private static MergeInfo getNewMergeInfo(long estimatedMergeBytes, int maxNumSegments) {
276+
return new MergeInfo(
277+
randomNonNegativeInt(),
278+
estimatedMergeBytes,
279+
randomBoolean(),
280+
maxNumSegments
281+
);
282+
}
208283
}

0 commit comments

Comments
 (0)