diff --git a/docs/changelog/121843.yaml b/docs/changelog/121843.yaml new file mode 100644 index 0000000000000..85b19e317a09c --- /dev/null +++ b/docs/changelog/121843.yaml @@ -0,0 +1,6 @@ +pr: 121843 +summary: Fix async stop sometimes not properly collecting result +area: ES|QL +type: bug +issues: + - 121249 diff --git a/muted-tests.yml b/muted-tests.yml index cece04c496176..f588c190cd298 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -273,9 +273,6 @@ tests: - class: org.elasticsearch.xpack.ml.integration.ClassificationIT method: testWithDatastreams issue: https://github.com/elastic/elasticsearch/issues/121236 -- class: org.elasticsearch.xpack.remotecluster.RemoteClusterSecurityEsqlIT - method: testCrossClusterAsyncQueryStop - issue: https://github.com/elastic/elasticsearch/issues/121249 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/*} issue: https://github.com/elastic/elasticsearch/issues/120816 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index fe970bef87145..1993545075979 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -391,6 +391,22 @@ public T getTaskAndCheckAuthentication( TaskManager taskManager, AsyncExecutionId asyncExecutionId, Class tClass + ) throws IOException { + return getTaskAndCheckAuthentication(taskManager, security, asyncExecutionId, tClass); + } + + /** + * Returns the {@link AsyncTask} if the provided asyncTaskId + * is registered in the task manager, null otherwise. + * + * This method throws a {@link ResourceNotFoundException} if the authenticated user + * is not the creator of the original task. + */ + public static T getTaskAndCheckAuthentication( + TaskManager taskManager, + AsyncSearchSecurity security, + AsyncExecutionId asyncExecutionId, + Class tClass ) throws IOException { T asyncTask = getTask(taskManager, asyncExecutionId, tClass); if (asyncTask == null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java index a4007a520ed30..f5f51029ae8a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java @@ -7,7 +7,8 @@ package org.elasticsearch.xpack.esql.plugin; -import org.elasticsearch.ResourceNotFoundException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ActionFilters; @@ -16,10 +17,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.compute.EsqlRefCountingListener; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.operator.exchange.ExchangeService; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -32,12 +31,11 @@ import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.action.EsqlQueryTask; import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; @@ -55,6 +53,8 @@ public class TransportEsqlAsyncStopAction extends HandledTransportAction listener) { String asyncIdStr = asyncId.getEncoded(); - TransportEsqlQueryAction.EsqlQueryListener asyncListener = queryAction.getAsyncListener(asyncIdStr); - if (asyncListener == null) { + EsqlQueryTask asyncTask = getEsqlQueryTask(asyncId); + GetAsyncResultRequest getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr); + if (asyncTask == null) { // This should mean one of the two things: either bad request ID, or the query has already finished // In both cases, let regular async get deal with it. - var getAsyncResultRequest = new GetAsyncResultRequest(asyncIdStr); - // TODO: this should not be happening, but if the listener is not registered and the query is not finished, - // we give it some time to finish - getAsyncResultRequest.setWaitForCompletionTimeout(new TimeValue(1, TimeUnit.SECONDS)); + logger.debug("Async stop for task {}, no task present - passing to GetAsyncResultRequest", asyncIdStr); getResultsAction.execute(task, getAsyncResultRequest, listener); return; } - try { - EsqlQueryTask asyncTask = AsyncTaskIndexService.getTask(taskManager, asyncId, EsqlQueryTask.class); - if (false == security.currentUserHasAccessToTask(asyncTask)) { - throw new ResourceNotFoundException(asyncId + " not found"); + logger.debug("Async stop for task {} - stopping", asyncIdStr); + final EsqlExecutionInfo esqlExecutionInfo = asyncTask.executionInfo(); + if (esqlExecutionInfo != null) { + esqlExecutionInfo.markAsPartial(); + } + Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener); + exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.running(() -> { + if (asyncTask.addCompletionListener(() -> ActionListener.running(getResults)) == false) { + getResults.run(); } + })); + } + + private EsqlQueryTask getEsqlQueryTask(AsyncExecutionId asyncId) { + try { + return AsyncTaskIndexService.getTaskAndCheckAuthentication(taskManager, security, asyncId, EsqlQueryTask.class); } catch (IOException e) { - throw new ResourceNotFoundException(asyncId + " not found", e); - } - // Here we will wait for both the response to become available and for the finish operation to complete - var responseHolder = new AtomicReference(); - try (var refs = new EsqlRefCountingListener(listener.map(unused -> responseHolder.get()))) { - asyncListener.addListener(refs.acquire().map(r -> { - responseHolder.set(r); - return null; - })); - asyncListener.markAsPartial(); - exchangeService.finishSessionEarly(sessionID(asyncId), refs.acquire()); + return null; } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index caf70063e08fa..5b0dfa14014a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -13,14 +13,12 @@ import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.operator.exchange.ExchangeService; @@ -82,8 +80,6 @@ public class TransportEsqlQueryAction extends HandledTransportAction asyncListeners = ConcurrentCollections.newConcurrentMap(); @Inject @SuppressWarnings("this-escape") @@ -190,41 +186,11 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener } } - // Subscribable listener that can keep track of the EsqlExecutionInfo - // Used to mark an async query as partial if it is stopped - public static class EsqlQueryListener extends SubscribableListener { - private EsqlExecutionInfo executionInfo; - - public EsqlQueryListener(EsqlExecutionInfo executionInfo) { - this.executionInfo = executionInfo; - } - - public EsqlExecutionInfo getExecutionInfo() { - return executionInfo; - } - - public void markAsPartial() { - if (executionInfo != null) { - executionInfo.markAsPartial(); - } - } - } - @Override public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener listener) { // set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running task.setExecutionInfo(createEsqlExecutionInfo(request)); - // Since the request is async here, we need to wrap the listener in a SubscribableListener so that we can collect the results from - // other endpoints, such as _query/async/stop - EsqlQueryListener subListener = new EsqlQueryListener(task.executionInfo()); - String asyncExecutionId = task.getExecutionId().getEncoded(); - subListener.addListener(ActionListener.runAfter(listener, () -> asyncListeners.remove(asyncExecutionId))); - asyncListeners.put(asyncExecutionId, subListener); - ActionListener.run(subListener, l -> innerExecute(task, request, l)); - } - - public EsqlQueryListener getAsyncListener(String executionId) { - return asyncListeners.get(executionId); + ActionListener.run(listener, l -> innerExecute(task, request, l)); } private void innerExecute(Task task, EsqlQueryRequest request, ActionListener listener) {