|
13 | 13 | import org.elasticsearch.logging.LogManager; |
14 | 14 | import org.elasticsearch.logging.Logger; |
15 | 15 | import org.elasticsearch.tasks.TaskInfo; |
| 16 | +import org.elasticsearch.transport.TransportService; |
16 | 17 | import org.elasticsearch.xpack.core.async.AsyncStopRequest; |
| 18 | +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; |
| 19 | +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; |
17 | 20 |
|
18 | 21 | import java.util.Iterator; |
19 | 22 | import java.util.List; |
@@ -132,13 +135,21 @@ public void testStopQueryLocal() throws Exception { |
132 | 135 |
|
133 | 136 | Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata(); |
134 | 137 | boolean responseExpectMeta = includeCCSMetadata.v2(); |
135 | | - |
136 | | - final String asyncExecutionId = startAsyncQuery( |
| 138 | + // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. |
| 139 | + // If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, |
| 140 | + // there are no workers left to execute the final driver or fetch pages from remote clusters. |
| 141 | + // This can prevent remote clusters from being marked as successful on the coordinator, even if they |
| 142 | + // have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. |
| 143 | + // A single worker is enough, as these two tasks can be paused and yielded. |
| 144 | + var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool(); |
| 145 | + int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); |
| 146 | + LOGGER.info("--> Launching async query"); |
| 147 | + final String asyncExecutionId = startAsyncQueryWithPragmas( |
137 | 148 | client(), |
138 | 149 | "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", |
139 | | - includeCCSMetadata.v1() |
| 150 | + includeCCSMetadata.v1(), |
| 151 | + Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) |
140 | 152 | ); |
141 | | - |
142 | 153 | try { |
143 | 154 | // wait until we know that the local query against 'blocking' has started |
144 | 155 | assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); |
|
0 commit comments