Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133021.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133021
summary: Fix update expiration for async query
area: ES|QL
type: bug
issues:
- 130619
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
Expand Down Expand Up @@ -86,23 +87,10 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener<Respons
// EQL doesn't store initial or intermediate results so we only need to update expiration time in store for only in case of
// async search
if (updateInitialResultsInStore & expirationTime > 0) {
store.updateExpirationTime(
searchId.getDocId(),
updateExpirationTime(
searchId,
expirationTime,
ActionListener.wrap(p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> {
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
if (status != RestStatus.NOT_FOUND) {
logger.error(
() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()),
exc
);
listener.onFailure(exc);
} else {
// the async search document or its index is not found.
// That can happen if an invalid/deleted search id is provided.
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
})
listener.delegateFailure((l, unused) -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, l))
);
} else {
getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener);
Expand All @@ -122,7 +110,7 @@ private void getSearchResponseFromTask(
try {
final Task task = store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass);
if (task == null || (updateInitialResultsInStore && task.isCancelled())) {
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener);
return;
}

Expand All @@ -137,30 +125,40 @@ private void getSearchResponseFromTask(
if (added == false) {
// the task must have completed, since we cannot add a completion listener
assert store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass) == null;
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener);
}
} catch (Exception exc) {
listener.onFailure(exc);
}
}

private void getSearchResponseFromIndex(
private void getSearchResponseFromIndexAndUpdateExpiration(
AsyncExecutionId searchId,
GetAsyncResultRequest request,
long nowInMillis,
ActionListener<Response> listener
long expirationTime,
ActionListener<Response> outListener
) {
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
try {
sendFinalResponse(request, response, nowInMillis, l);
} finally {
if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
&& storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
refCounted.decRef();
var updateListener = outListener.delegateFailure((listener, unused) -> {
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
try {
sendFinalResponse(request, response, nowInMillis, l);
} finally {
if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
&& storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
refCounted.decRef();
}
}
}

}));
}));
});
// If updateInitialResultsInStore=false, we can't update expiration while the task is running since the document doesn't exist yet.
// So let's update the expiration here when the task has been completed.
if (updateInitialResultsInStore == false && expirationTime != -1) {
updateExpirationTime(searchId, expirationTime, updateListener.map(unused -> null));
} else {
updateListener.onResponse(null);
}
}

private void sendFinalResponse(GetAsyncResultRequest request, Response response, long nowInMillis, ActionListener<Response> listener) {
Expand All @@ -172,4 +170,18 @@ private void sendFinalResponse(GetAsyncResultRequest request, Response response,

listener.onResponse(response);
}

private void updateExpirationTime(AsyncExecutionId searchId, long expirationTime, ActionListener<UpdateResponse> listener) {
store.updateExpirationTime(searchId.getDocId(), expirationTime, listener.delegateResponse((l, e) -> {
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(e));
if (status != RestStatus.NOT_FOUND) {
logger.error(() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()), e);
l.onFailure(e);
} else {
// the async search document or its index is not found.
// That can happen if an invalid/deleted search id is provided.
l.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,11 @@ public void testAssertExpirationPropagation() throws Exception {
try {
long startTime = System.currentTimeMillis();
task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());

if (updateInitialResultsInStore) {
boolean taskCompleted = randomBoolean();
if (taskCompleted) {
taskManager.unregister(task);
}
if (taskCompleted || updateInitialResultsInStore) {
// we need to store initial result
PlainActionFuture<DocWriteResponse> future = new PlainActionFuture<>();
indexService.createResponse(
Expand All @@ -249,10 +252,11 @@ public void testAssertExpirationPropagation() throws Exception {
// not waiting for completion, so should return immediately with timeout
service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()).setKeepAlive(newKeepAlive), listener);
listener.actionGet(TimeValue.timeValueSeconds(10));
assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis()));
assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis()));

if (updateInitialResultsInStore) {
if (taskCompleted == false) {
assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis()));
assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis()));
}
if (updateInitialResultsInStore || taskCompleted) {
PlainActionFuture<TestAsyncResponse> future = new PlainActionFuture<>();
indexService.getResponse(task.executionId, randomBoolean(), future);
TestAsyncResponse response = future.actionGet(TimeValue.timeValueMinutes(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
Expand All @@ -40,6 +46,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -260,6 +267,90 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) {
}
}

public void testUpdateKeepAlive() throws Exception {
long nowInMillis = System.currentTimeMillis();
TimeValue keepAlive = timeValueSeconds(between(30, 60));
var request = EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client())
.query("from test | stats sum(pause_me)")
.pragmas(queryPragmas())
.waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10)))
.keepOnCompletion(randomBoolean())
.keepAlive(keepAlive);
final String asyncId;
long currentExpiration;
try {
try (EsqlQueryResponse initialResponse = request.execute().actionGet(60, TimeUnit.SECONDS)) {
assertThat(initialResponse.isRunning(), is(true));
assertTrue(initialResponse.asyncExecutionId().isPresent());
asyncId = initialResponse.asyncExecutionId().get();
}
currentExpiration = getExpirationFromTask(asyncId);
assertThat(currentExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
// update the expiration while the task is still running
int iters = iterations(1, 5);
for (int i = 0; i < iters; i++) {
long extraKeepAlive = randomIntBetween(30, 60);
keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
assertThat(resp.asyncExecutionId(), isPresent());
assertThat(resp.asyncExecutionId().get(), equalTo(asyncId));
assertTrue(resp.isRunning());
}
long updatedExpiration = getExpirationFromTask(asyncId);
assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));
assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
currentExpiration = updatedExpiration;
}
} finally {
scriptPermits.release(numberOfDocs());
}
// allow the query to complete, then update the expiration with the result is being stored in the async index
assertBusy(() -> {
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId);
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
assertThat(resp.isRunning(), is(false));
}
});
// update the keepAlive after the query has completed
int iters = between(1, 5);
for (int i = 0; i < iters; i++) {
long extraKeepAlive = randomIntBetween(30, 60);
keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
assertThat(resp.isRunning(), is(false));
}
long updatedExpiration = getExpirationFromDoc(asyncId);
assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));
assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
currentExpiration = updatedExpiration;
}
}

private static long getExpirationFromTask(String asyncId) {
List<EsqlQueryTask> tasks = new ArrayList<>();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
for (CancellableTask task : ts.getTaskManager().getCancellableTasks().values()) {
if (task instanceof EsqlQueryTask queryTask) {
EsqlQueryResponse result = queryTask.getCurrentResult();
if (result.isAsync() && result.asyncExecutionId().get().equals(asyncId)) {
tasks.add(queryTask);
}
}
}
}
assertThat(tasks, hasSize(1));
return tasks.getFirst().getExpirationTimeMillis();
}

private static long getExpirationFromDoc(String asyncId) {
String docId = AsyncExecutionId.decode(asyncId).getDocId();
GetResponse doc = client().prepareGet().setIndex(XPackPlugin.ASYNC_RESULTS_INDEX).setId(docId).get();
assertTrue(doc.isExists());
return ((Number) doc.getSource().get(AsyncTaskIndexService.EXPIRATION_TIME_FIELD)).longValue();
}

private List<TaskInfo> getEsqlQueryTasks() throws Exception {
List<TaskInfo> foundTasks = new ArrayList<>();
assertBusy(() -> {
Expand Down