Skip to content

Commit b60104c

Browse files
dnhatnmridula-s109
authored andcommitted
Fix testStopQueryLocal (elastic#131130)
By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, there are no workers left to execute the final driver or fetch pages from remote clusters. This can prevent remote clusters from being marked as successful on the coordinator, even if they have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. A single worker is enough, as these two tasks can be paused and yielded. Closes elastic#121672
1 parent c974b59 commit b60104c

File tree

2 files changed

+14
-7
lines changed

2 files changed

+14
-7
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -528,9 +528,6 @@ tests:
528528
- class: org.elasticsearch.indices.stats.IndexStatsIT
529529
method: testFilterCacheStats
530530
issue: https://github.com/elastic/elasticsearch/issues/124447
531-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
532-
method: testStopQueryLocal
533-
issue: https://github.com/elastic/elasticsearch/issues/121672
534531
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
535532
method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item}
536533
issue: https://github.com/elastic/elasticsearch/issues/122414

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import org.elasticsearch.logging.LogManager;
1414
import org.elasticsearch.logging.Logger;
1515
import org.elasticsearch.tasks.TaskInfo;
16+
import org.elasticsearch.transport.TransportService;
1617
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
18+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
19+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
1720

1821
import java.util.Iterator;
1922
import java.util.List;
@@ -132,14 +135,21 @@ public void testStopQueryLocal() throws Exception {
132135

133136
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
134137
boolean responseExpectMeta = includeCCSMetadata.v2();
135-
138+
// By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes.
139+
// If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch,
140+
// there are no workers left to execute the final driver or fetch pages from remote clusters.
141+
// This can prevent remote clusters from being marked as successful on the coordinator, even if they
142+
// have completed. To avoid this, we reserve at least one worker for the final driver and page fetching.
143+
// A single worker is enough, as these two tasks can be paused and yielded.
144+
var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool();
145+
int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax();
136146
LOGGER.info("--> Launching async query");
137-
final String asyncExecutionId = startAsyncQuery(
147+
final String asyncExecutionId = startAsyncQueryWithPragmas(
138148
client(),
139149
"FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1",
140-
includeCCSMetadata.v1()
150+
includeCCSMetadata.v1(),
151+
Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1))
141152
);
142-
143153
try {
144154
// wait until we know that the local query against 'blocking' has started
145155
LOGGER.info("--> Waiting for {} to start", asyncExecutionId);

0 commit comments

Comments
 (0)