Skip to content

Commit d5984b0

Browse files
committed
Fix testStopQueryLocal (#131130)
By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, there are no workers left to execute the final driver or fetch pages from remote clusters. This can prevent remote clusters from being marked as successful on the coordinator, even if they have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. A single worker is enough, as these two tasks can be paused and yielded. Closes #121672 (cherry picked from commit b325f2b)
1 parent 09e03d8 commit d5984b0

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,6 @@ tests:
224224
- class: org.elasticsearch.packaging.test.BootstrapCheckTests
225225
method: test20RunWithBootstrapChecks
226226
issue: https://github.com/elastic/elasticsearch/issues/124940
227-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT
228-
method: testStopQueryLocal
229-
issue: https://github.com/elastic/elasticsearch/issues/121672
230227
- class: org.elasticsearch.packaging.test.BootstrapCheckTests
231228
method: test10Install
232229
issue: https://github.com/elastic/elasticsearch/issues/124957

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import org.elasticsearch.logging.LogManager;
1414
import org.elasticsearch.logging.Logger;
1515
import org.elasticsearch.tasks.TaskInfo;
16+
import org.elasticsearch.transport.TransportService;
1617
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
18+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
19+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
1720

1821
import java.util.Iterator;
1922
import java.util.List;
@@ -132,13 +135,21 @@ public void testStopQueryLocal() throws Exception {
132135

133136
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
134137
boolean responseExpectMeta = includeCCSMetadata.v2();
135-
136-
final String asyncExecutionId = startAsyncQuery(
138+
// By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes.
139+
// If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch,
140+
// there are no workers left to execute the final driver or fetch pages from remote clusters.
141+
// This can prevent remote clusters from being marked as successful on the coordinator, even if they
142+
// have completed. To avoid this, we reserve at least one worker for the final driver and page fetching.
143+
// A single worker is enough, as these two tasks can be paused and yielded.
144+
var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool();
145+
int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax();
146+
LOGGER.info("--> Launching async query");
147+
final String asyncExecutionId = startAsyncQueryWithPragmas(
137148
client(),
138149
"FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1",
139-
includeCCSMetadata.v1()
150+
includeCCSMetadata.v1(),
151+
Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1))
140152
);
141-
142153
try {
143154
// wait until we know that the local query against 'blocking' has started
144155
assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));

0 commit comments

Comments
 (0)