Skip to content

Commit abe7880

Browse files
authored
Fix async stop sometimes not properly collecting result (#121843) (#122115)
* Fix async stop sometimes not properly collecting result (cherry picked from commit d11dad4)
1 parent 7869fbe commit abe7880

File tree

4 files changed

+48
-61
lines changed

4 files changed

+48
-61
lines changed

docs/changelog/121843.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 121843
2+
summary: Fix async stop sometimes not properly collecting result
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 121249

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,22 @@ public <T extends AsyncTask> T getTaskAndCheckAuthentication(
392392
TaskManager taskManager,
393393
AsyncExecutionId asyncExecutionId,
394394
Class<T> tClass
395+
) throws IOException {
396+
return getTaskAndCheckAuthentication(taskManager, security, asyncExecutionId, tClass);
397+
}
398+
399+
/**
400+
* Returns the {@link AsyncTask} if the provided <code>asyncTaskId</code>
401+
* is registered in the task manager, <code>null</code> otherwise.
402+
*
403+
* This method throws a {@link ResourceNotFoundException} if the authenticated user
404+
* is not the creator of the original task.
405+
*/
406+
public static <T extends AsyncTask> T getTaskAndCheckAuthentication(
407+
TaskManager taskManager,
408+
AsyncSearchSecurity security,
409+
AsyncExecutionId asyncExecutionId,
410+
Class<T> tClass
395411
) throws IOException {
396412
T asyncTask = getTask(taskManager, asyncExecutionId, tClass);
397413
if (asyncTask == null) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10-
import org.elasticsearch.ResourceNotFoundException;
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.ActionListenerResponseHandler;
1314
import org.elasticsearch.action.support.ActionFilters;
@@ -16,10 +17,8 @@
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.service.ClusterService;
1819
import org.elasticsearch.common.util.concurrent.EsExecutors;
19-
import org.elasticsearch.compute.EsqlRefCountingListener;
2020
import org.elasticsearch.compute.data.BlockFactory;
2121
import org.elasticsearch.compute.operator.exchange.ExchangeService;
22-
import org.elasticsearch.core.TimeValue;
2322
import org.elasticsearch.injection.guice.Inject;
2423
import org.elasticsearch.tasks.Task;
2524
import org.elasticsearch.tasks.TaskId;
@@ -32,12 +31,11 @@
3231
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
3332
import org.elasticsearch.xpack.core.security.SecurityContext;
3433
import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction;
34+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
3535
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
3636
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
3737

3838
import java.io.IOException;
39-
import java.util.concurrent.TimeUnit;
40-
import java.util.concurrent.atomic.AtomicReference;
4139

4240
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
4341

@@ -55,6 +53,8 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncSt
5553
private final TransportService transportService;
5654
private final AsyncSearchSecurity security;
5755

56+
private static final Logger logger = LogManager.getLogger(TransportEsqlAsyncStopAction.class);
57+
5858
@Inject
5959
public TransportEsqlAsyncStopAction(
6060
TransportService transportService,
@@ -106,34 +106,33 @@ private String sessionID(AsyncExecutionId asyncId) {
106106

107107
private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, ActionListener<EsqlQueryResponse> listener) {
108108
String asyncIdStr = asyncId.getEncoded();
109-
TransportEsqlQueryAction.EsqlQueryListener asyncListener = queryAction.getAsyncListener(asyncIdStr);
110-
if (asyncListener == null) {
109+
EsqlQueryTask asyncTask = getEsqlQueryTask(asyncId);
110+
GetAsyncResultRequest getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
111+
if (asyncTask == null) {
111112
// This should mean one of the two things: either bad request ID, or the query has already finished
112113
// In both cases, let regular async get deal with it.
113-
var getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
114-
// TODO: this should not be happening, but if the listener is not registered and the query is not finished,
115-
// we give it some time to finish
116-
getAsyncResultRequest.setWaitForCompletionTimeout(new TimeValue(1, TimeUnit.SECONDS));
114+
logger.debug("Async stop for task {}, no task present - passing to GetAsyncResultRequest", asyncIdStr);
117115
getResultsAction.execute(task, getAsyncResultRequest, listener);
118116
return;
119117
}
120-
try {
121-
EsqlQueryTask asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncId, EsqlQueryTask.class);
122-
if (false == security.currentUserHasAccessToTask(asyncTask)) {
123-
throw new ResourceNotFoundException(asyncId + " not found");
118+
logger.debug("Async stop for task {} - stopping", asyncIdStr);
119+
final EsqlExecutionInfo esqlExecutionInfo = asyncTask.executionInfo();
120+
if (esqlExecutionInfo != null) {
121+
esqlExecutionInfo.markAsPartial();
122+
}
123+
Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener);
124+
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.running(() -> {
125+
if (asyncTask.addCompletionListener(() -> ActionListener.running(getResults)) == false) {
126+
getResults.run();
124127
}
128+
}));
129+
}
130+
131+
private EsqlQueryTask getEsqlQueryTask(AsyncExecutionId asyncId) {
132+
try {
133+
return AsyncTaskIndexService.getTaskAndCheckAuthentication(taskManager, security, asyncId, EsqlQueryTask.class);
125134
} catch (IOException e) {
126-
throw new ResourceNotFoundException(asyncId + " not found", e);
127-
}
128-
// Here we will wait for both the response to become available and for the finish operation to complete
129-
var responseHolder = new AtomicReference<EsqlQueryResponse>();
130-
try (var refs = new EsqlRefCountingListener(listener.map(unused -> responseHolder.get()))) {
131-
asyncListener.addListener(refs.acquire().map(r -> {
132-
responseHolder.set(r);
133-
return null;
134-
}));
135-
asyncListener.markAsPartial();
136-
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
135+
return null;
137136
}
138137
}
139138
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@
1313
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
1414
import org.elasticsearch.action.support.ActionFilters;
1515
import org.elasticsearch.action.support.HandledTransportAction;
16-
import org.elasticsearch.action.support.SubscribableListener;
1716
import org.elasticsearch.client.internal.Client;
1817
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1918
import org.elasticsearch.cluster.service.ClusterService;
2019
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2120
import org.elasticsearch.common.io.stream.StreamInput;
2221
import org.elasticsearch.common.util.BigArrays;
23-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2422
import org.elasticsearch.common.util.concurrent.EsExecutors;
2523
import org.elasticsearch.compute.data.BlockFactory;
2624
import org.elasticsearch.compute.operator.exchange.ExchangeService;
@@ -83,8 +81,6 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
8381
private final RemoteClusterService remoteClusterService;
8482
private final QueryBuilderResolver queryBuilderResolver;
8583
private final UsageService usageService;
86-
// Listeners for active async queries, key being the async task execution ID
87-
private final Map<String, EsqlQueryListener> asyncListeners = ConcurrentCollections.newConcurrentMap();
8884

8985
@Inject
9086
@SuppressWarnings("this-escape")
@@ -183,41 +179,11 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
183179
}
184180
}
185181

186-
// Subscribable listener that can keep track of the EsqlExecutionInfo
187-
// Used to mark an async query as partial if it is stopped
188-
public static class EsqlQueryListener extends SubscribableListener<EsqlQueryResponse> {
189-
private EsqlExecutionInfo executionInfo;
190-
191-
public EsqlQueryListener(EsqlExecutionInfo executionInfo) {
192-
this.executionInfo = executionInfo;
193-
}
194-
195-
public EsqlExecutionInfo getExecutionInfo() {
196-
return executionInfo;
197-
}
198-
199-
public void markAsPartial() {
200-
if (executionInfo != null) {
201-
executionInfo.markAsPartial();
202-
}
203-
}
204-
}
205-
206182
@Override
207183
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
208184
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
209185
task.setExecutionInfo(createEsqlExecutionInfo(request));
210-
// Since the request is async here, we need to wrap the listener in a SubscribableListener so that we can collect the results from
211-
// other endpoints, such as _query/async/stop
212-
EsqlQueryListener subListener = new EsqlQueryListener(task.executionInfo());
213-
String asyncExecutionId = task.getExecutionId().getEncoded();
214-
subListener.addListener(ActionListener.runAfter(listener, () -> asyncListeners.remove(asyncExecutionId)));
215-
asyncListeners.put(asyncExecutionId, subListener);
216-
ActionListener.run(subListener, l -> innerExecute(task, request, l));
217-
}
218-
219-
public EsqlQueryListener getAsyncListener(String executionId) {
220-
return asyncListeners.get(executionId);
186+
ActionListener.run(listener, l -> innerExecute(task, request, l));
221187
}
222188

223189
private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {

0 commit comments

Comments
 (0)