Skip to content

Commit 35d209e

Browse files
committed
Fix testStopQueryLocal
1 parent 79e2e04 commit 35d209e

File tree

2 files changed

+14
-7
lines changed

2 files changed

+14
-7
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -531,9 +531,6 @@ tests:
531531
- class: org.elasticsearch.indices.stats.IndexStatsIT
532532
method: testFilterCacheStats
533533
issue: https://github.com/elastic/elasticsearch/issues/124447
534-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
535-
method: testStopQueryLocal
536-
issue: https://github.com/elastic/elasticsearch/issues/121672
537534
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
538535
method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item}
539536
issue: https://github.com/elastic/elasticsearch/issues/122414

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import org.elasticsearch.logging.LogManager;
1414
import org.elasticsearch.logging.Logger;
1515
import org.elasticsearch.tasks.TaskInfo;
16+
import org.elasticsearch.transport.TransportService;
1617
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
18+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
19+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
1720

1821
import java.util.Iterator;
1922
import java.util.List;
@@ -132,14 +135,21 @@ public void testStopQueryLocal() throws Exception {
132135

133136
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
134137
boolean responseExpectMeta = includeCCSMetadata.v2();
135-
138+
// By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes.
139+
// If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch,
140+
// there are no workers left to execute the final driver or fetch pages from remote clusters.
141+
// This can prevent remote clusters from being marked as successful on the coordinator, even if they
142+
// have completed. To avoid this, we reserve at least one worker for the final driver and page fetching.
143+
// A single worker is enough, as these two tasks can be paused and yielded.
144+
var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool();
145+
int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax();
136146
LOGGER.info("--> Launching async query");
137-
final String asyncExecutionId = startAsyncQuery(
147+
final String asyncExecutionId = startAsyncQueryWithPragmas(
138148
client(),
139149
"FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1",
140-
includeCCSMetadata.v1()
150+
includeCCSMetadata.v1(),
151+
Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1))
141152
);
142-
143153
try {
144154
// wait until we know that the local query against 'blocking' has started
145155
LOGGER.info("--> Waiting for {} to start", asyncExecutionId);

0 commit comments

Comments
 (0)