diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java index 9129292d43837..41a05202b2357 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -272,12 +274,11 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception { assertThat(testEnginePlugin.enqueuedMergesSet.size(), is(0)); testEnginePlugin.mergeExecutorServiceReference.get().allDone(); }, 1, TimeUnit.MINUTES); - var segmentsCountAfterMergingCaughtUp = getSegmentsCountForAllShards("index"); - // force merge should be a noop after all available merging was done - assertAllSuccessful(indicesAdmin().prepareForceMerge("index").get()); - var segmentsCountAfterForceMerge = getSegmentsCountForAllShards("index"); - assertThat(segmentsCountAfterForceMerge, is(segmentsCountAfterMergingCaughtUp)); - // let's also run a force-merge to 1 segment + // indices stats says that no merge is currently running (meaning merging did catch up) + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("index").setMerge(true).get(); + long currentMergeCount = indicesStatsResponse.getIndices().get("index").getPrimaries().merge.getCurrent(); + assertThat(currentMergeCount, equalTo(0L)); + // run a force-merge to 1 segment to make sure nothing is broken assertAllSuccessful(indicesAdmin().prepareForceMerge("index").setMaxNumSegments(1).get()); assertAllSuccessful(indicesAdmin().prepareRefresh("index").get()); // assert one segment per shard @@ -292,20 +293,6 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception { } } - private int getSegmentsCountForAllShards(String indexName) { - // refresh, otherwise we'd be still seeing the old merged-away segments - assertAllSuccessful(indicesAdmin().prepareRefresh(indexName).get()); - int count = 0; - IndicesSegmentResponse indicesSegmentResponse = indicesAdmin().prepareSegments(indexName).get(); - Iterator indexShardSegmentsIterator = indicesSegmentResponse.getIndices().get(indexName).iterator(); - while (indexShardSegmentsIterator.hasNext()) { - for (ShardSegments segments : indexShardSegmentsIterator.next()) { - count += segments.getSegments().size(); - } - } - return count; - } - private TestEnginePlugin getTestEnginePlugin() { return getInstanceFromNode(PluginsService.class).filterPlugins(TestEnginePlugin.class).toList().get(0); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java index 2f3494ebb065d..2417ee3abf8d2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -328,6 +328,7 @@ public void testIORateIsAdjustedForAllRunningMergeTasks() throws Exception { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); Semaphore runMergeSemaphore = new Semaphore(0); Set currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet(); + while (mergesStillToComplete > 0) { if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) { MergeTask mergeTask = mock(MergeTask.class);