Skip to content

Commit b5a3151

Browse files
authored
Fix async stop sometimes not properly collecting result (#121843) (#122113)
* Fix async stop sometimes not properly collecting result (cherry picked from commit d11dad4) # Conflicts: # muted-tests.yml
1 parent 6541319 commit b5a3151

File tree

4 files changed

+48
-61
lines changed

4 files changed

+48
-61
lines changed

docs/changelog/121843.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 121843
2+
summary: Fix async stop sometimes not properly collecting result
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 121249

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,22 @@ public <T extends AsyncTask> T getTaskAndCheckAuthentication(
392392
TaskManager taskManager,
393393
AsyncExecutionId asyncExecutionId,
394394
Class<T> tClass
395+
) throws IOException {
396+
return getTaskAndCheckAuthentication(taskManager, security, asyncExecutionId, tClass);
397+
}
398+
399+
/**
400+
* Returns the {@link AsyncTask} if the provided <code>asyncTaskId</code>
401+
* is registered in the task manager, <code>null</code> otherwise.
402+
*
403+
* This method throws a {@link ResourceNotFoundException} if the authenticated user
404+
* is not the creator of the original task.
405+
*/
406+
public static <T extends AsyncTask> T getTaskAndCheckAuthentication(
407+
TaskManager taskManager,
408+
AsyncSearchSecurity security,
409+
AsyncExecutionId asyncExecutionId,
410+
Class<T> tClass
395411
) throws IOException {
396412
T asyncTask = getTask(taskManager, asyncExecutionId, tClass);
397413
if (asyncTask == null) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10-
import org.elasticsearch.ResourceNotFoundException;
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.ActionListenerResponseHandler;
1314
import org.elasticsearch.action.support.ActionFilters;
@@ -16,10 +17,8 @@
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.service.ClusterService;
1819
import org.elasticsearch.common.util.concurrent.EsExecutors;
19-
import org.elasticsearch.compute.EsqlRefCountingListener;
2020
import org.elasticsearch.compute.data.BlockFactory;
2121
import org.elasticsearch.compute.operator.exchange.ExchangeService;
22-
import org.elasticsearch.core.TimeValue;
2322
import org.elasticsearch.injection.guice.Inject;
2423
import org.elasticsearch.tasks.Task;
2524
import org.elasticsearch.tasks.TaskId;
@@ -32,12 +31,11 @@
3231
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
3332
import org.elasticsearch.xpack.core.security.SecurityContext;
3433
import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction;
34+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
3535
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
3636
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
3737

3838
import java.io.IOException;
39-
import java.util.concurrent.TimeUnit;
40-
import java.util.concurrent.atomic.AtomicReference;
4139

4240
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
4341

@@ -55,6 +53,8 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncSt
5553
private final TransportService transportService;
5654
private final AsyncSearchSecurity security;
5755

56+
private static final Logger logger = LogManager.getLogger(TransportEsqlAsyncStopAction.class);
57+
5858
@Inject
5959
public TransportEsqlAsyncStopAction(
6060
TransportService transportService,
@@ -106,34 +106,33 @@ private String sessionID(AsyncExecutionId asyncId) {
106106

107107
private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, ActionListener<EsqlQueryResponse> listener) {
108108
String asyncIdStr = asyncId.getEncoded();
109-
TransportEsqlQueryAction.EsqlQueryListener asyncListener = queryAction.getAsyncListener(asyncIdStr);
110-
if (asyncListener == null) {
109+
EsqlQueryTask asyncTask = getEsqlQueryTask(asyncId);
110+
GetAsyncResultRequest getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
111+
if (asyncTask == null) {
111112
// This should mean one of the two things: either bad request ID, or the query has already finished
112113
// In both cases, let regular async get deal with it.
113-
var getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
114-
// TODO: this should not be happening, but if the listener is not registered and the query is not finished,
115-
// we give it some time to finish
116-
getAsyncResultRequest.setWaitForCompletionTimeout(new TimeValue(1, TimeUnit.SECONDS));
114+
logger.debug("Async stop for task {}, no task present - passing to GetAsyncResultRequest", asyncIdStr);
117115
getResultsAction.execute(task, getAsyncResultRequest, listener);
118116
return;
119117
}
120-
try {
121-
EsqlQueryTask asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncId, EsqlQueryTask.class);
122-
if (false == security.currentUserHasAccessToTask(asyncTask)) {
123-
throw new ResourceNotFoundException(asyncId + " not found");
118+
logger.debug("Async stop for task {} - stopping", asyncIdStr);
119+
final EsqlExecutionInfo esqlExecutionInfo = asyncTask.executionInfo();
120+
if (esqlExecutionInfo != null) {
121+
esqlExecutionInfo.markAsPartial();
122+
}
123+
Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener);
124+
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.running(() -> {
125+
if (asyncTask.addCompletionListener(() -> ActionListener.running(getResults)) == false) {
126+
getResults.run();
124127
}
128+
}));
129+
}
130+
131+
private EsqlQueryTask getEsqlQueryTask(AsyncExecutionId asyncId) {
132+
try {
133+
return AsyncTaskIndexService.getTaskAndCheckAuthentication(taskManager, security, asyncId, EsqlQueryTask.class);
125134
} catch (IOException e) {
126-
throw new ResourceNotFoundException(asyncId + " not found", e);
127-
}
128-
// Here we will wait for both the response to become available and for the finish operation to complete
129-
var responseHolder = new AtomicReference<EsqlQueryResponse>();
130-
try (var refs = new EsqlRefCountingListener(listener.map(unused -> responseHolder.get()))) {
131-
asyncListener.addListener(refs.acquire().map(r -> {
132-
responseHolder.set(r);
133-
return null;
134-
}));
135-
asyncListener.markAsPartial();
136-
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
135+
return null;
137136
}
138137
}
139138
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@
1313
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
1414
import org.elasticsearch.action.support.ActionFilters;
1515
import org.elasticsearch.action.support.HandledTransportAction;
16-
import org.elasticsearch.action.support.SubscribableListener;
1716
import org.elasticsearch.client.internal.Client;
1817
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1918
import org.elasticsearch.cluster.service.ClusterService;
2019
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2120
import org.elasticsearch.common.io.stream.StreamInput;
2221
import org.elasticsearch.common.util.BigArrays;
23-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2422
import org.elasticsearch.common.util.concurrent.EsExecutors;
2523
import org.elasticsearch.compute.data.BlockFactory;
2624
import org.elasticsearch.compute.operator.exchange.ExchangeService;
@@ -82,8 +80,6 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
8280
private final RemoteClusterService remoteClusterService;
8381
private final UsageService usageService;
8482
private final TransportActionServices services;
85-
// Listeners for active async queries, key being the async task execution ID
86-
private final Map<String, EsqlQueryListener> asyncListeners = ConcurrentCollections.newConcurrentMap();
8783

8884
@Inject
8985
@SuppressWarnings("this-escape")
@@ -190,41 +186,11 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
190186
}
191187
}
192188

193-
// Subscribable listener that can keep track of the EsqlExecutionInfo
194-
// Used to mark an async query as partial if it is stopped
195-
public static class EsqlQueryListener extends SubscribableListener<EsqlQueryResponse> {
196-
private EsqlExecutionInfo executionInfo;
197-
198-
public EsqlQueryListener(EsqlExecutionInfo executionInfo) {
199-
this.executionInfo = executionInfo;
200-
}
201-
202-
public EsqlExecutionInfo getExecutionInfo() {
203-
return executionInfo;
204-
}
205-
206-
public void markAsPartial() {
207-
if (executionInfo != null) {
208-
executionInfo.markAsPartial();
209-
}
210-
}
211-
}
212-
213189
@Override
214190
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
215191
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
216192
task.setExecutionInfo(createEsqlExecutionInfo(request));
217-
// Since the request is async here, we need to wrap the listener in a SubscribableListener so that we can collect the results from
218-
// other endpoints, such as _query/async/stop
219-
EsqlQueryListener subListener = new EsqlQueryListener(task.executionInfo());
220-
String asyncExecutionId = task.getExecutionId().getEncoded();
221-
subListener.addListener(ActionListener.runAfter(listener, () -> asyncListeners.remove(asyncExecutionId)));
222-
asyncListeners.put(asyncExecutionId, subListener);
223-
ActionListener.run(subListener, l -> innerExecute(task, request, l));
224-
}
225-
226-
public EsqlQueryListener getAsyncListener(String executionId) {
227-
return asyncListeners.get(executionId);
193+
ActionListener.run(listener, l -> innerExecute(task, request, l));
228194
}
229195

230196
private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {

0 commit comments

Comments
 (0)