Skip to content

Commit 4199dc8

Browse files
committed
Improve stop action
1 parent d4a96ff commit 4199dc8

File tree

2 files changed

+15
-61
lines changed

2 files changed

+15
-61
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.ResourceNotFoundException;
1312
import org.elasticsearch.action.ActionListener;
1413
import org.elasticsearch.action.ActionListenerResponseHandler;
1514
import org.elasticsearch.action.support.ActionFilters;
@@ -18,7 +17,6 @@
1817
import org.elasticsearch.cluster.node.DiscoveryNode;
1918
import org.elasticsearch.cluster.service.ClusterService;
2019
import org.elasticsearch.common.util.concurrent.EsExecutors;
21-
import org.elasticsearch.compute.EsqlRefCountingListener;
2220
import org.elasticsearch.compute.data.BlockFactory;
2321
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2422
import org.elasticsearch.injection.guice.Inject;
@@ -33,11 +31,11 @@
3331
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
3432
import org.elasticsearch.xpack.core.security.SecurityContext;
3533
import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction;
34+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
3635
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
3736
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
3837

3938
import java.io.IOException;
40-
import java.util.concurrent.atomic.AtomicReference;
4139

4240
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
4341

@@ -108,36 +106,26 @@ private String sessionID(AsyncExecutionId asyncId) {
108106

109107
private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, ActionListener<EsqlQueryResponse> listener) {
110108
String asyncIdStr = asyncId.getEncoded();
111-
TransportEsqlQueryAction.EsqlQueryListener asyncListener = queryAction.getAsyncListener(asyncIdStr);
112109
EsqlQueryTask asyncTask = getEsqlQueryTask(asyncId);
113-
if (asyncListener == null) {
114-
logger.debug("Async stop for task {}, no listener - collecting the result", asyncIdStr);
110+
GetAsyncResultRequest getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
111+
if (asyncTask == null) {
115112
// This should mean one of the two things: either bad request ID, or the query has already finished
116113
// In both cases, let regular async get deal with it.
117-
var getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
118-
Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener);
119-
// If the listener is not present but the task is still alive, this may mean it's not finished writing the response
120-
// We will wait for the task to be done and then collect the results.
121-
if (asyncTask == null || asyncTask.addCompletionListener(() -> ActionListener.running(getResults)) == false) {
122-
getResults.run();
123-
}
114+
logger.debug("Async stop for task {}, no task present - passing to GetAsyncResultRequest", asyncIdStr);
115+
getResultsAction.execute(task, getAsyncResultRequest, listener);
124116
return;
125117
}
126-
127-
if (asyncTask == null) {
128-
throw new ResourceNotFoundException(asyncId + " not found");
129-
}
130118
logger.debug("Async stop for task {} - stopping", asyncIdStr);
131-
// Here we will wait for both the response to become available and for the finish operation to complete
132-
var responseHolder = new AtomicReference<EsqlQueryResponse>();
133-
try (var refs = new EsqlRefCountingListener(listener.map(unused -> responseHolder.get()))) {
134-
asyncListener.addListener(refs.acquire().map(r -> {
135-
responseHolder.set(r);
136-
return null;
137-
}));
138-
asyncListener.markAsPartial();
139-
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
119+
final EsqlExecutionInfo esqlExecutionInfo = asyncTask.executionInfo();
120+
if (esqlExecutionInfo != null) {
121+
esqlExecutionInfo.markAsPartial();
140122
}
123+
Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener);
124+
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.running(() -> {
125+
if (asyncTask.addCompletionListener(() -> ActionListener.running(getResults)) == false) {
126+
getResults.run();
127+
}
128+
}));
141129
}
142130

143131
private EsqlQueryTask getEsqlQueryTask(AsyncExecutionId asyncId) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@
1313
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
1414
import org.elasticsearch.action.support.ActionFilters;
1515
import org.elasticsearch.action.support.HandledTransportAction;
16-
import org.elasticsearch.action.support.SubscribableListener;
1716
import org.elasticsearch.client.internal.Client;
1817
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1918
import org.elasticsearch.cluster.service.ClusterService;
2019
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2120
import org.elasticsearch.common.io.stream.StreamInput;
2221
import org.elasticsearch.common.util.BigArrays;
23-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2422
import org.elasticsearch.common.util.concurrent.EsExecutors;
2523
import org.elasticsearch.compute.data.BlockFactory;
2624
import org.elasticsearch.compute.operator.exchange.ExchangeService;
@@ -82,8 +80,6 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
8280
private final RemoteClusterService remoteClusterService;
8381
private final UsageService usageService;
8482
private final TransportActionServices services;
85-
// Listeners for active async queries, key being the async task execution ID
86-
private final Map<String, EsqlQueryListener> asyncListeners = ConcurrentCollections.newConcurrentMap();
8783

8884
@Inject
8985
@SuppressWarnings("this-escape")
@@ -190,41 +186,11 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
190186
}
191187
}
192188

193-
// Subscribable listener that can keep track of the EsqlExecutionInfo
194-
// Used to mark an async query as partial if it is stopped
195-
public static class EsqlQueryListener extends SubscribableListener<EsqlQueryResponse> {
196-
private EsqlExecutionInfo executionInfo;
197-
198-
public EsqlQueryListener(EsqlExecutionInfo executionInfo) {
199-
this.executionInfo = executionInfo;
200-
}
201-
202-
public EsqlExecutionInfo getExecutionInfo() {
203-
return executionInfo;
204-
}
205-
206-
public void markAsPartial() {
207-
if (executionInfo != null) {
208-
executionInfo.markAsPartial();
209-
}
210-
}
211-
}
212-
213189
@Override
214190
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
215191
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
216192
task.setExecutionInfo(createEsqlExecutionInfo(request));
217-
// Since the request is async here, we need to wrap the listener in a SubscribableListener so that we can collect the results from
218-
// other endpoints, such as _query/async/stop
219-
EsqlQueryListener subListener = new EsqlQueryListener(task.executionInfo());
220-
String asyncExecutionId = task.getExecutionId().getEncoded();
221-
subListener.addListener(ActionListener.runAfter(listener, () -> asyncListeners.remove(asyncExecutionId)));
222-
asyncListeners.put(asyncExecutionId, subListener);
223-
ActionListener.run(subListener, l -> innerExecute(task, request, l));
224-
}
225-
226-
public EsqlQueryListener getAsyncListener(String executionId) {
227-
return asyncListeners.get(executionId);
193+
ActionListener.run(listener, l -> innerExecute(task, request, l));
228194
}
229195

230196
private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {

0 commit comments

Comments
 (0)