Skip to content

Commit 4b44039

Browse files
TEST Fix ThreadPoolMergeSchedulerStressTestIT testMergingFallsBehindAndThenCatchesUp (#131636) (#131675)
This addresses an unfounded test assumption that a merge following refreshes is a noop. Refreshes might trigger a merge, but segments can be part of a single merge task at a time so it's possible that after multiple independent merges finish, if the TieredMergePolicy is invoked again it might still find yet another merge to run. Fixes #131262
1 parent ce08b34 commit 4b44039

File tree

2 files changed

+8
-20
lines changed

2 files changed

+8
-20
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
1717
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
1818
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
19+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.util.CollectionUtils;
2122
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -44,6 +45,7 @@
4445
import java.util.concurrent.atomic.AtomicReference;
4546

4647
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
48+
import static org.hamcrest.Matchers.equalTo;
4749
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4850
import static org.hamcrest.Matchers.instanceOf;
4951
import static org.hamcrest.Matchers.is;
@@ -270,12 +272,11 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception {
270272
assertThat(testEnginePlugin.enqueuedMergesSet.size(), is(0));
271273
testEnginePlugin.mergeExecutorServiceReference.get().allDone();
272274
}, 1, TimeUnit.MINUTES);
273-
var segmentsCountAfterMergingCaughtUp = getSegmentsCountForAllShards("index");
274-
// force merge should be a noop after all available merging was done
275-
assertAllSuccessful(indicesAdmin().prepareForceMerge("index").get());
276-
var segmentsCountAfterForceMerge = getSegmentsCountForAllShards("index");
277-
assertThat(segmentsCountAfterForceMerge, is(segmentsCountAfterMergingCaughtUp));
278-
// let's also run a force-merge to 1 segment
275+
// indices stats says that no merge is currently running (meaning merging did catch up)
276+
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("index").setMerge(true).get();
277+
long currentMergeCount = indicesStatsResponse.getIndices().get("index").getPrimaries().merge.getCurrent();
278+
assertThat(currentMergeCount, equalTo(0L));
279+
// run a force-merge to 1 segment to make sure nothing is broken
279280
assertAllSuccessful(indicesAdmin().prepareForceMerge("index").setMaxNumSegments(1).get());
280281
assertAllSuccessful(indicesAdmin().prepareRefresh("index").get());
281282
// assert one segment per shard
@@ -290,20 +291,6 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception {
290291
}
291292
}
292293

293-
private int getSegmentsCountForAllShards(String indexName) {
294-
// refresh, otherwise we'd be still seeing the old merged-away segments
295-
assertAllSuccessful(indicesAdmin().prepareRefresh(indexName).get());
296-
int count = 0;
297-
IndicesSegmentResponse indicesSegmentResponse = indicesAdmin().prepareSegments(indexName).get();
298-
Iterator<IndexShardSegments> indexShardSegmentsIterator = indicesSegmentResponse.getIndices().get(indexName).iterator();
299-
while (indexShardSegmentsIterator.hasNext()) {
300-
for (ShardSegments segments : indexShardSegmentsIterator.next()) {
301-
count += segments.getSegments().size();
302-
}
303-
}
304-
return count;
305-
}
306-
307294
private TestEnginePlugin getTestEnginePlugin() {
308295
return getInstanceFromNode(PluginsService.class).filterPlugins(TestEnginePlugin.class).toList().get(0);
309296
}

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception {
326326
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
327327
Semaphore runMergeSemaphore = new Semaphore(0);
328328
Set<MergeTask> currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet();
329+
329330
while (mergesStillToComplete > 0) {
330331
if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) {
331332
MergeTask mergeTask = mock(MergeTask.class);

0 commit comments

Comments
 (0)