diff --git a/muted-tests.yml b/muted-tests.yml index 446f1ff2e409f..b90245c40fe9d 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -507,9 +507,6 @@ tests: - class: org.elasticsearch.xpack.ml.integration.ClassificationIT method: testWithCustomFeatureProcessors issue: https://github.com/elastic/elasticsearch/issues/134001 -- class: org.elasticsearch.cluster.ClusterInfoServiceIT - method: testMaxQueueLatenciesInClusterInfo - issue: https://github.com/elastic/elasticsearch/issues/134088 - class: org.elasticsearch.xpack.esql.ccq.MultiClusterSpecIT method: test {csv-spec:fork.ForkBeforeStats} issue: https://github.com/elastic/elasticsearch/issues/134100 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index c3d37d3036448..d71f7d7f43f76 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -27,11 +27,10 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.shard.GlobalCheckpointListeners; +import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -45,6 +44,7 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; @@ -57,7 +57,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptySet; @@ -431,7 +430,27 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { final int numShards = randomIntBetween(1, 5); createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(indexName); - final var indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next(); + + // Global checkpoint sync actions are asynchronous. We cannot really tell exactly when they are completely off the + // thread pool. To avoid busy waiting, we redirect them to the generic thread pool so that we have precise control + // over the write thread pool for assertions. + final MockTransportService mockTransportService = MockTransportService.getInstance(dataNodeName); + final var originalRegistry = mockTransportService.transport() + .getRequestHandlers() + .getHandler(GlobalCheckpointSyncAction.ACTION_NAME + "[p]"); + mockTransportService.transport() + .getRequestHandlers() + .forceRegister( + new RequestHandlerRegistry<>( + GlobalCheckpointSyncAction.ACTION_NAME + "[p]", + in -> null, // no need to deserialize the request since it's local + mockTransportService.getTaskManager(), + originalRegistry.getHandler(), + mockTransportService.getThreadPool().executor(ThreadPool.Names.GENERIC), + true, + true + ) + ); // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads. var barrier = blockDataNodeIndexing(dataNodeName); @@ -496,28 +515,6 @@ public void testMaxQueueLatenciesInClusterInfo() throws Exception { } Arrays.stream(threadsToJoin).forEach(thread -> assertFalse(thread.isAlive())); - // Wait for async post replication actions to complete - final var checkpointsSyncLatch = new CountDownLatch(numShards); - for (int i = 0; i < numShards; ++i) { - final var indexShard = indexService.getShard(i); - final long expectedGlobalCheckpoint = indexShard.seqNoStats().getGlobalCheckpoint(); - logger.info("--> shard [{}] waiting for global checkpoint {}", i, expectedGlobalCheckpoint); - indexShard.addGlobalCheckpointListener(expectedGlobalCheckpoint, new GlobalCheckpointListeners.GlobalCheckpointListener() { - @Override - public Executor executor() { - return EsExecutors.DIRECT_EXECUTOR_SERVICE; - } - - @Override - public void accept(long globalCheckpoint, Exception e) { - assertNull(e); // should have no error - logger.info("--> shard [{}] global checkpoint updated to {}", indexShard.shardId().id(), globalCheckpoint); - checkpointsSyncLatch.countDown(); - } - }, TimeValue.THIRTY_SECONDS); - } - safeAwait(checkpointsSyncLatch); - assertThat( "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor, trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),