Skip to content

Commit e0ca944

Browse files
committed
Fix async stop sometimes not properly collecting result
1 parent 534e171 commit e0ca944

File tree

2 files changed

+28
-16
lines changed

2 files changed

+28
-16
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,6 @@ tests:
282282
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
283283
method: testWithDatastreams
284284
issue: https://github.com/elastic/elasticsearch/issues/121236
285-
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityEsqlIT
286-
method: testCrossClusterAsyncQueryStop
287-
issue: https://github.com/elastic/elasticsearch/issues/121249
288285
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
289286
method: test {p0=transform/*}
290287
issue: https://github.com/elastic/elasticsearch/issues/120816

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.ResourceNotFoundException;
1113
import org.elasticsearch.action.ActionListener;
1214
import org.elasticsearch.action.ActionListenerResponseHandler;
@@ -19,7 +21,6 @@
1921
import org.elasticsearch.compute.EsqlRefCountingListener;
2022
import org.elasticsearch.compute.data.BlockFactory;
2123
import org.elasticsearch.compute.operator.exchange.ExchangeService;
22-
import org.elasticsearch.core.TimeValue;
2324
import org.elasticsearch.injection.guice.Inject;
2425
import org.elasticsearch.tasks.Task;
2526
import org.elasticsearch.tasks.TaskId;
@@ -36,7 +37,6 @@
3637
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
3738

3839
import java.io.IOException;
39-
import java.util.concurrent.TimeUnit;
4040
import java.util.concurrent.atomic.AtomicReference;
4141

4242
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
@@ -55,6 +55,8 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncSt
5555
private final TransportService transportService;
5656
private final AsyncSearchSecurity security;
5757

58+
private static final Logger logger = LogManager.getLogger(TransportEsqlAsyncStopAction.class);
59+
5860
@Inject
5961
public TransportEsqlAsyncStopAction(
6062
TransportService transportService,
@@ -107,24 +109,25 @@ private String sessionID(AsyncExecutionId asyncId) {
107109
private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, ActionListener<EsqlQueryResponse> listener) {
108110
String asyncIdStr = asyncId.getEncoded();
109111
TransportEsqlQueryAction.EsqlQueryListener asyncListener = queryAction.getAsyncListener(asyncIdStr);
112+
EsqlQueryTask asyncTask = getEsqlQueryTask(asyncId);
110113
if (asyncListener == null) {
114+
logger.debug("Async stop for task {}, no listener - collecting the result", asyncIdStr);
111115
// This should mean one of the two things: either bad request ID, or the query has already finished
112116
// In both cases, let regular async get deal with it.
113117
var getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
114-
// TODO: this should not be happening, but if the listener is not registered and the query is not finished,
115-
// we give it some time to finish
116-
getAsyncResultRequest.setWaitForCompletionTimeout(new TimeValue(1, TimeUnit.SECONDS));
117-
getResultsAction.execute(task, getAsyncResultRequest, listener);
118+
Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener);
119+
// If the listener is not present but the task is still alive, this may mean it's not finished writing the response
120+
// We will wait for the task to be done and then collect the results.
121+
if (asyncTask == null || asyncTask.addCompletionListener(() -> ActionListener.running(getResults)) == false) {
122+
getResults.run();
123+
}
118124
return;
119125
}
120-
try {
121-
EsqlQueryTask asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncId, EsqlQueryTask.class);
122-
if (false == security.currentUserHasAccessToTask(asyncTask)) {
123-
throw new ResourceNotFoundException(asyncId + " not found");
124-
}
125-
} catch (IOException e) {
126-
throw new ResourceNotFoundException(asyncId + " not found", e);
126+
127+
if (asyncTask == null) {
128+
throw new ResourceNotFoundException(asyncId + " not found");
127129
}
130+
logger.debug("Async stop for task {} - stopping", asyncIdStr);
128131
// Here we will wait for both the response to become available and for the finish operation to complete
129132
var responseHolder = new AtomicReference<EsqlQueryResponse>();
130133
try (var refs = new EsqlRefCountingListener(listener.map(unused -> responseHolder.get()))) {
@@ -136,4 +139,16 @@ private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, Actio
136139
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
137140
}
138141
}
142+
143+
private EsqlQueryTask getEsqlQueryTask(AsyncExecutionId asyncId) {
144+
try {
145+
EsqlQueryTask asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncId, EsqlQueryTask.class);
146+
if (asyncTask == null || false == security.currentUserHasAccessToTask(asyncTask)) {
147+
return null;
148+
}
149+
return asyncTask;
150+
} catch (IOException e) {
151+
return null;
152+
}
153+
}
139154
}

0 commit comments

Comments
 (0)