From 85cea228f7c8a895fbbb4eb61a9a56817eb3e7db Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 2 Sep 2025 11:48:19 +1000 Subject: [PATCH 1/2] [Test] Fix ClusterInfoServiceIT testMaxQueueLatenciesInClusterInfo Ensure async post replication actions (checkpoint syncs for the specific test) to complete before assertions. Resolves: #132957 --- .../cluster/ClusterInfoServiceIT.java | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index e138a2d0b5f6d..c3d37d3036448 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -27,9 +27,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.GlobalCheckpointListeners; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -55,6 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptySet; @@ -423,9 +426,12 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { var masterName = internalCluster().startMasterOnlyNode(settings); var dataNodeName = internalCluster().startDataOnlyNode(settings); ensureStableCluster(2); - assertEquals(internalCluster().getMasterName(), masterName); - assertNotEquals(internalCluster().getMasterName(), dataNodeName); - logger.info("---> master node: " + masterName + ", data node: " + dataNodeName); + + String indexName = randomIdentifier(); + final int numShards = randomIntBetween(1, 5); + createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(indexName); + final var indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads. var barrier = blockDataNodeIndexing(dataNodeName); @@ -434,12 +440,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { // strictly need a single task to occupy the queue. int numberOfTasks = randomIntBetween(1, 5); Thread[] threadsToJoin = new Thread[numberOfTasks]; - String indexName = randomIdentifier(); - createIndex( - indexName, - // NB: Set 0 replicas so that there aren't any stray GlobalCheckpointSyncAction tasks on the write thread pool. - Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)).put(SETTING_NUMBER_OF_REPLICAS, 0).build() - ); for (int i = 0; i < numberOfTasks; ++i) { threadsToJoin[i] = startParallelSingleWrite(indexName); } @@ -494,6 +494,30 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { for (int i = 0; i < numberOfTasks; ++i) { threadsToJoin[i].join(); } + Arrays.stream(threadsToJoin).forEach(thread -> assertFalse(thread.isAlive())); + + // Wait for async post replication actions to complete + final var checkpointsSyncLatch = new CountDownLatch(numShards); + for (int i = 0; i < numShards; ++i) { + final var indexShard = indexService.getShard(i); + final long expectedGlobalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint(); + logger.info("--> shard [{}] waiting for global checkpoint {}", i, expectedGlobalCheckpoint); + indexShard.addGlobalCheckpointListener(expectedGlobalCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() { + @Override + public Executor executor() { + return EsExecutors.DIRECT_EXECUTOR_SERVICE; + } + + @Override + public void accept(long globalCheckpoint, Exception e) { + assertNull(e); // should have no error + logger.info("--> shard [{}] global checkpoint updated to {}", indexShard.shardId().id(), globalCheckpoint); + checkpointsSyncLatch.countDown(); + } + }, TimeValue.THIRTY_SECONDS); + } + safeAwait(checkpointsSyncLatch); + assertThat( "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor, trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(), From 98eff554a89698fbd71d23830c0183bbb5a94513 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 2 Sep 2025 14:09:34 +1000 Subject: [PATCH 2/2] unmute --- muted-tests.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 1730b6f6b8967..46eed8a257ea6 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -462,9 +462,6 @@ tests: - class: org.elasticsearch.search.CCSDuelIT method: testTermsAggsWithProfile issue: https://github.com/elastic/elasticsearch/issues/132880 -- class: org.elasticsearch.cluster.ClusterInfoServiceIT - method: testMaxQueueLatenciesInClusterInfo - issue: https://github.com/elastic/elasticsearch/issues/132957 - class: org.elasticsearch.xpack.search.AsyncSearchErrorTraceIT method: testAsyncSearchFailingQueryErrorTraceDefault issue: https://github.com/elastic/elasticsearch/issues/133010