diff --git a/muted-tests.yml b/muted-tests.yml index e65553b0be8d2..ac685fd1548a9 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -216,9 +216,6 @@ tests: - class: org.elasticsearch.packaging.test.BootstrapCheckTests method: test20RunWithBootstrapChecks issue: https://github.com/elastic/elasticsearch/issues/124940 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT - method: testStopQueryLocal - issue: https://github.com/elastic/elasticsearch/issues/121672 - class: org.elasticsearch.packaging.test.BootstrapCheckTests method: test10Install issue: https://github.com/elastic/elasticsearch/issues/124957 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 37e6b0bb48404..9667b559ea6b0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -13,7 +13,10 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncStopRequest; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.util.Iterator; import java.util.List; @@ -132,13 +135,21 @@ public void testStopQueryLocal() throws Exception { Tuple includeCCSMetadata = randomIncludeCCSMetadata(); boolean responseExpectMeta = includeCCSMetadata.v2(); - - final String asyncExecutionId = startAsyncQuery( + // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. + // If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, + // there are no workers left to execute the final driver or fetch pages from remote clusters. + // This can prevent remote clusters from being marked as successful on the coordinator, even if they + // have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. + // A single worker is enough, as these two tasks can be paused and yielded. + var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool(); + int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); + LOGGER.info("--> Launching async query"); + final String asyncExecutionId = startAsyncQueryWithPragmas( client(), "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", - includeCCSMetadata.v1() + includeCCSMetadata.v1(), + Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) ); - try { // wait until we know that the local query against 'blocking' has started assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));