diff --git a/docs/changelog/133021.yaml b/docs/changelog/133021.yaml new file mode 100644 index 0000000000000..f25dd723b4102 --- /dev/null +++ b/docs/changelog/133021.yaml @@ -0,0 +1,6 @@ +pr: 133021 +summary: Fix update expiration for async query +area: ES|QL +type: bug +issues: + - 130619 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index 2236304d8c4d0..1a0254f7d8471 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -12,6 +12,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; @@ -86,23 +87,10 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener 0) { - store.updateExpirationTime( - searchId.getDocId(), + updateExpirationTime( + searchId, expirationTime, - ActionListener.wrap(p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> { - RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); - if (status != RestStatus.NOT_FOUND) { - logger.error( - () -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()), - exc - ); - listener.onFailure(exc); - } else { - // the async search document or its index is not found. - // That can happen if an invalid/deleted search id is provided. - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); - } - }) + listener.delegateFailure((l, unused) -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, l)) ); } else { getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener); @@ -122,7 +110,7 @@ private void getSearchResponseFromTask( try { final Task task = store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass); if (task == null || (updateInitialResultsInStore && task.isCancelled())) { - getSearchResponseFromIndex(searchId, request, nowInMillis, listener); + getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener); return; } @@ -137,30 +125,40 @@ private void getSearchResponseFromTask( if (added == false) { // the task must have completed, since we cannot add a completion listener assert store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass) == null; - getSearchResponseFromIndex(searchId, request, nowInMillis, listener); + getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener); } } catch (Exception exc) { listener.onFailure(exc); } } - private void getSearchResponseFromIndex( + private void getSearchResponseFromIndexAndUpdateExpiration( AsyncExecutionId searchId, GetAsyncResultRequest request, long nowInMillis, - ActionListener listener + long expirationTime, + ActionListener outListener ) { - store.getResponse(searchId, true, listener.delegateFailure((l, response) -> { - try { - sendFinalResponse(request, response, nowInMillis, l); - } finally { - if (response instanceof StoredAsyncResponse storedAsyncResponse - && storedAsyncResponse.getResponse() instanceof RefCounted refCounted) { - refCounted.decRef(); + var updateListener = outListener.delegateFailure((listener, unused) -> { + store.getResponse(searchId, true, listener.delegateFailure((l, response) -> { + try { + sendFinalResponse(request, response, nowInMillis, l); + } finally { + if (response instanceof StoredAsyncResponse storedAsyncResponse + && storedAsyncResponse.getResponse() instanceof RefCounted refCounted) { + refCounted.decRef(); + } } - } - })); + })); + }); + // If updateInitialResultsInStore=false, we can't update expiration while the task is running since the document doesn't exist yet. + // So let's update the expiration here when the task has been completed. + if (updateInitialResultsInStore == false && expirationTime != -1) { + updateExpirationTime(searchId, expirationTime, updateListener.map(unused -> null)); + } else { + updateListener.onResponse(null); + } } private void sendFinalResponse(GetAsyncResultRequest request, Response response, long nowInMillis, ActionListener listener) { @@ -172,4 +170,18 @@ private void sendFinalResponse(GetAsyncResultRequest request, Response response, listener.onResponse(response); } + + private void updateExpirationTime(AsyncExecutionId searchId, long expirationTime, ActionListener listener) { + store.updateExpirationTime(searchId.getDocId(), expirationTime, listener.delegateResponse((l, e) -> { + RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(e)); + if (status != RestStatus.NOT_FOUND) { + logger.error(() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()), e); + l.onFailure(e); + } else { + // the async search document or its index is not found. + // That can happen if an invalid/deleted search id is provided. + l.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + } + })); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java index 1c69a6a52951a..5e304530b064f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java @@ -231,8 +231,11 @@ public void testAssertExpirationPropagation() throws Exception { try { long startTime = System.currentTimeMillis(); task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis()); - - if (updateInitialResultsInStore) { + boolean taskCompleted = randomBoolean(); + if (taskCompleted) { + taskManager.unregister(task); + } + if (taskCompleted || updateInitialResultsInStore) { // we need to store initial result PlainActionFuture future = new PlainActionFuture<>(); indexService.createResponse( @@ -249,10 +252,11 @@ public void testAssertExpirationPropagation() throws Exception { // not waiting for completion, so should return immediately with timeout service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()).setKeepAlive(newKeepAlive), listener); listener.actionGet(TimeValue.timeValueSeconds(10)); - assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis())); - assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis())); - - if (updateInitialResultsInStore) { + if (taskCompleted == false) { + assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis())); + assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis())); + } + if (updateInitialResultsInStore || taskCompleted) { PlainActionFuture future = new PlainActionFuture<>(); indexService.getResponse(task.executionId, randomBoolean(), future); TestAsyncResponse response = future.actionGet(TimeValue.timeValueMinutes(10)); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java index 34b94207c5a8d..2aa731eaa5d29 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java @@ -8,15 +8,21 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService; import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction; @@ -40,6 +46,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -260,6 +267,90 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) { } } + public void testUpdateKeepAlive() throws Exception { + long nowInMillis = System.currentTimeMillis(); + TimeValue keepAlive = timeValueSeconds(between(30, 60)); + var request = EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()) + .query("from test | stats sum(pause_me)") + .pragmas(queryPragmas()) + .waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10))) + .keepOnCompletion(randomBoolean()) + .keepAlive(keepAlive); + final String asyncId; + long currentExpiration; + try { + try (EsqlQueryResponse initialResponse = request.execute().actionGet(60, TimeUnit.SECONDS)) { + assertThat(initialResponse.isRunning(), is(true)); + assertTrue(initialResponse.asyncExecutionId().isPresent()); + asyncId = initialResponse.asyncExecutionId().get(); + } + currentExpiration = getExpirationFromTask(asyncId); + assertThat(currentExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis())); + // update the expiration while the task is still running + int iters = iterations(1, 5); + for (int i = 0; i < iters; i++) { + long extraKeepAlive = randomIntBetween(30, 60); + keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive); + GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive); + try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) { + assertThat(resp.asyncExecutionId(), isPresent()); + assertThat(resp.asyncExecutionId().get(), equalTo(asyncId)); + assertTrue(resp.isRunning()); + } + long updatedExpiration = getExpirationFromTask(asyncId); + assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive)); + assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis())); + currentExpiration = updatedExpiration; + } + } finally { + scriptPermits.release(numberOfDocs()); + } + // allow the query to complete, then update the expiration with the result is being stored in the async index + assertBusy(() -> { + GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId); + try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) { + assertThat(resp.isRunning(), is(false)); + } + }); + // update the keepAlive after the query has completed + int iters = between(1, 5); + for (int i = 0; i < iters; i++) { + long extraKeepAlive = randomIntBetween(30, 60); + keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive); + GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive); + try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) { + assertThat(resp.isRunning(), is(false)); + } + long updatedExpiration = getExpirationFromDoc(asyncId); + assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive)); + assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis())); + currentExpiration = updatedExpiration; + } + } + + private static long getExpirationFromTask(String asyncId) { + List tasks = new ArrayList<>(); + for (TransportService ts : internalCluster().getInstances(TransportService.class)) { + for (CancellableTask task : ts.getTaskManager().getCancellableTasks().values()) { + if (task instanceof EsqlQueryTask queryTask) { + EsqlQueryResponse result = queryTask.getCurrentResult(); + if (result.isAsync() && result.asyncExecutionId().get().equals(asyncId)) { + tasks.add(queryTask); + } + } + } + } + assertThat(tasks, hasSize(1)); + return tasks.getFirst().getExpirationTimeMillis(); + } + + private static long getExpirationFromDoc(String asyncId) { + String docId = AsyncExecutionId.decode(asyncId).getDocId(); + GetResponse doc = client().prepareGet().setIndex(XPackPlugin.ASYNC_RESULTS_INDEX).setId(docId).get(); + assertTrue(doc.isExists()); + return ((Number) doc.getSource().get(AsyncTaskIndexService.EXPIRATION_TIME_FIELD)).longValue(); + } + private List getEsqlQueryTasks() throws Exception { List foundTasks = new ArrayList<>(); assertBusy(() -> {