Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/121843.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 121843
summary: Fix async stop sometimes not properly collecting result
area: ES|QL
type: bug
issues:
- 121249
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,6 @@ tests:
- class: org.elasticsearch.xpack.ml.integration.ClassificationIT
method: testWithDatastreams
issue: https://github.com/elastic/elasticsearch/issues/121236
- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityEsqlIT
method: testCrossClusterAsyncQueryStop
issue: https://github.com/elastic/elasticsearch/issues/121249
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/*}
issue: https://github.com/elastic/elasticsearch/issues/120816
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ResourceNotFoundException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -16,10 +17,8 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
Expand All @@ -32,12 +31,11 @@
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

Expand All @@ -55,6 +53,8 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction<AsyncSt
private final TransportService transportService;
private final AsyncSearchSecurity security;

private static final Logger logger = LogManager.getLogger(TransportEsqlAsyncStopAction.class);

@Inject
public TransportEsqlAsyncStopAction(
TransportService transportService,
Expand Down Expand Up @@ -106,34 +106,37 @@ private String sessionID(AsyncExecutionId asyncId) {

private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, ActionListener<EsqlQueryResponse> listener) {
String asyncIdStr = asyncId.getEncoded();
TransportEsqlQueryAction.EsqlQueryListener asyncListener = queryAction.getAsyncListener(asyncIdStr);
if (asyncListener == null) {
EsqlQueryTask asyncTask = getEsqlQueryTask(asyncId);
GetAsyncResultRequest getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
if (asyncTask == null) {
// This should mean one of the two things: either bad request ID, or the query has already finished
// In both cases, let regular async get deal with it.
var getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr);
// TODO: this should not be happening, but if the listener is not registered and the query is not finished,
// we give it some time to finish
getAsyncResultRequest.setWaitForCompletionTimeout(new TimeValue(1, TimeUnit.SECONDS));
logger.debug("Async stop for task {}, no task present - passing to GetAsyncResultRequest", asyncIdStr);
getResultsAction.execute(task, getAsyncResultRequest, listener);
return;
}
logger.debug("Async stop for task {} - stopping", asyncIdStr);
final EsqlExecutionInfo esqlExecutionInfo = asyncTask.executionInfo();
if (esqlExecutionInfo != null) {
esqlExecutionInfo.markAsPartial();
}
Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener);
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.running(() -> {
if (asyncTask.addCompletionListener(() -> ActionListener.running(getResults)) == false) {
getResults.run();
}
}));
}

private EsqlQueryTask getEsqlQueryTask(AsyncExecutionId asyncId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is related to security, can we expose and use AsyncTaskIndexService#getTaskAndCheckAuthentication instead of duplicating it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't notice this exists. This is even better.

try {
EsqlQueryTask asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncId, EsqlQueryTask.class);
if (false == security.currentUserHasAccessToTask(asyncTask)) {
throw new ResourceNotFoundException(asyncId + " not found");
if (asyncTask == null || false == security.currentUserHasAccessToTask(asyncTask)) {
return null;
}
return asyncTask;
} catch (IOException e) {
throw new ResourceNotFoundException(asyncId + " not found", e);
}
// Here we will wait for both the response to become available and for the finish operation to complete
var responseHolder = new AtomicReference<EsqlQueryResponse>();
try (var refs = new EsqlRefCountingListener(listener.map(unused -> responseHolder.get()))) {
asyncListener.addListener(refs.acquire().map(r -> {
responseHolder.set(r);
return null;
}));
asyncListener.markAsPartial();
exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire());
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
Expand Down Expand Up @@ -82,8 +80,6 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
private final RemoteClusterService remoteClusterService;
private final UsageService usageService;
private final TransportActionServices services;
// Listeners for active async queries, key being the async task execution ID
private final Map<String, EsqlQueryListener> asyncListeners = ConcurrentCollections.newConcurrentMap();

@Inject
@SuppressWarnings("this-escape")
Expand Down Expand Up @@ -190,41 +186,11 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
}
}

// Subscribable listener that can keep track of the EsqlExecutionInfo
// Used to mark an async query as partial if it is stopped
public static class EsqlQueryListener extends SubscribableListener<EsqlQueryResponse> {
private EsqlExecutionInfo executionInfo;

public EsqlQueryListener(EsqlExecutionInfo executionInfo) {
this.executionInfo = executionInfo;
}

public EsqlExecutionInfo getExecutionInfo() {
return executionInfo;
}

public void markAsPartial() {
if (executionInfo != null) {
executionInfo.markAsPartial();
}
}
}

@Override
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
task.setExecutionInfo(createEsqlExecutionInfo(request));
// Since the request is async here, we need to wrap the listener in a SubscribableListener so that we can collect the results from
// other endpoints, such as _query/async/stop
EsqlQueryListener subListener = new EsqlQueryListener(task.executionInfo());
String asyncExecutionId = task.getExecutionId().getEncoded();
subListener.addListener(ActionListener.runAfter(listener, () -> asyncListeners.remove(asyncExecutionId)));
asyncListeners.put(asyncExecutionId, subListener);
ActionListener.run(subListener, l -> innerExecute(task, request, l));
}

public EsqlQueryListener getAsyncListener(String executionId) {
return asyncListeners.get(executionId);
ActionListener.run(listener, l -> innerExecute(task, request, l));
}

private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
Expand Down