Skip to content

Commit 433dbf0

Browse files
authored
Fix update expiration for async query (#133021)
Async queries in EQL and ES|QL do not create an initial response, and the current logic does not correctly handle expiration updates when the query has already completed. With initial response (no change): First, update the expiration in the async index, then update the task's expiration if the task still exists. Without initial response: First, try to update the task's expiration, then attempt to get the result from the task or async index. If the result is no longer available from the task, update the expiration in the async index before retrieving it (similar to the initial response case). This second step was introduced in this fix. Ideally, we should always create the initial response up front to unify the logic for both async_search and async_query, but this fix is preferred for now as it is more contained. When reviewing the code, I also found a race condition where async-get can return a NOT_FOUND error if the task completes but has not yet stored its result in the async index. This issue would also be resolved by storing an initial response up front. I will open a follow-up issue for it. Closes #130619
1 parent 6dfb5dc commit 433dbf0

File tree

4 files changed

+148
-35
lines changed

4 files changed

+148
-35
lines changed

docs/changelog/133021.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133021
2+
summary: Fix update expiration for async query
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 130619

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.ResourceNotFoundException;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.update.UpdateResponse;
1516
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.TriFunction;
@@ -86,23 +87,10 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener<Respons
8687
// EQL doesn't store initial or intermediate results so we only need to update expiration time in store for only in case of
8788
// async search
8889
if (updateInitialResultsInStore & expirationTime > 0) {
89-
store.updateExpirationTime(
90-
searchId.getDocId(),
90+
updateExpirationTime(
91+
searchId,
9192
expirationTime,
92-
ActionListener.wrap(p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> {
93-
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
94-
if (status != RestStatus.NOT_FOUND) {
95-
logger.error(
96-
() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()),
97-
exc
98-
);
99-
listener.onFailure(exc);
100-
} else {
101-
// the async search document or its index is not found.
102-
// That can happen if an invalid/deleted search id is provided.
103-
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
104-
}
105-
})
93+
listener.delegateFailure((l, unused) -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, l))
10694
);
10795
} else {
10896
getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener);
@@ -122,7 +110,7 @@ private void getSearchResponseFromTask(
122110
try {
123111
final Task task = store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass);
124112
if (task == null || (updateInitialResultsInStore && task.isCancelled())) {
125-
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
113+
getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener);
126114
return;
127115
}
128116

@@ -137,30 +125,40 @@ private void getSearchResponseFromTask(
137125
if (added == false) {
138126
// the task must have completed, since we cannot add a completion listener
139127
assert store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass) == null;
140-
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
128+
getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener);
141129
}
142130
} catch (Exception exc) {
143131
listener.onFailure(exc);
144132
}
145133
}
146134

147-
private void getSearchResponseFromIndex(
135+
private void getSearchResponseFromIndexAndUpdateExpiration(
148136
AsyncExecutionId searchId,
149137
GetAsyncResultRequest request,
150138
long nowInMillis,
151-
ActionListener<Response> listener
139+
long expirationTime,
140+
ActionListener<Response> outListener
152141
) {
153-
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
154-
try {
155-
sendFinalResponse(request, response, nowInMillis, l);
156-
} finally {
157-
if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
158-
&& storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
159-
refCounted.decRef();
142+
var updateListener = outListener.delegateFailure((listener, unused) -> {
143+
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
144+
try {
145+
sendFinalResponse(request, response, nowInMillis, l);
146+
} finally {
147+
if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
148+
&& storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
149+
refCounted.decRef();
150+
}
160151
}
161-
}
162152

163-
}));
153+
}));
154+
});
155+
// If updateInitialResultsInStore=false, we can't update expiration while the task is running since the document doesn't exist yet.
156+
// So let's update the expiration here when the task has been completed.
157+
if (updateInitialResultsInStore == false && expirationTime != -1) {
158+
updateExpirationTime(searchId, expirationTime, updateListener.map(unused -> null));
159+
} else {
160+
updateListener.onResponse(null);
161+
}
164162
}
165163

166164
private void sendFinalResponse(GetAsyncResultRequest request, Response response, long nowInMillis, ActionListener<Response> listener) {
@@ -172,4 +170,18 @@ private void sendFinalResponse(GetAsyncResultRequest request, Response response,
172170

173171
listener.onResponse(response);
174172
}
173+
174+
private void updateExpirationTime(AsyncExecutionId searchId, long expirationTime, ActionListener<UpdateResponse> listener) {
175+
store.updateExpirationTime(searchId.getDocId(), expirationTime, listener.delegateResponse((l, e) -> {
176+
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(e));
177+
if (status != RestStatus.NOT_FOUND) {
178+
logger.error(() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()), e);
179+
l.onFailure(e);
180+
} else {
181+
// the async search document or its index is not found.
182+
// That can happen if an invalid/deleted search id is provided.
183+
l.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
184+
}
185+
}));
186+
}
175187
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,11 @@ public void testAssertExpirationPropagation() throws Exception {
231231
try {
232232
long startTime = System.currentTimeMillis();
233233
task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());
234-
235-
if (updateInitialResultsInStore) {
234+
boolean taskCompleted = randomBoolean();
235+
if (taskCompleted) {
236+
taskManager.unregister(task);
237+
}
238+
if (taskCompleted || updateInitialResultsInStore) {
236239
// we need to store initial result
237240
PlainActionFuture<DocWriteResponse> future = new PlainActionFuture<>();
238241
indexService.createResponse(
@@ -249,10 +252,11 @@ public void testAssertExpirationPropagation() throws Exception {
249252
// not waiting for completion, so should return immediately with timeout
250253
service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()).setKeepAlive(newKeepAlive), listener);
251254
listener.actionGet(TimeValue.timeValueSeconds(10));
252-
assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis()));
253-
assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis()));
254-
255-
if (updateInitialResultsInStore) {
255+
if (taskCompleted == false) {
256+
assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis()));
257+
assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis()));
258+
}
259+
if (updateInitialResultsInStore || taskCompleted) {
256260
PlainActionFuture<TestAsyncResponse> future = new PlainActionFuture<>();
257261
indexService.getResponse(task.executionId, randomBoolean(), future);
258262
TestAsyncResponse response = future.actionGet(TimeValue.timeValueMinutes(10));

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,21 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.ResourceNotFoundException;
11+
import org.elasticsearch.action.get.GetResponse;
1112
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.compute.operator.DriverTaskRunner;
1415
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1516
import org.elasticsearch.core.TimeValue;
1617
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.tasks.CancellableTask;
1719
import org.elasticsearch.tasks.TaskCancelledException;
1820
import org.elasticsearch.tasks.TaskInfo;
21+
import org.elasticsearch.transport.TransportService;
1922
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
23+
import org.elasticsearch.xpack.core.XPackPlugin;
24+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
25+
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
2026
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
2127
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
2228
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
@@ -40,6 +46,7 @@
4046
import static org.hamcrest.Matchers.empty;
4147
import static org.hamcrest.Matchers.equalTo;
4248
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
49+
import static org.hamcrest.Matchers.hasSize;
4350
import static org.hamcrest.Matchers.is;
4451
import static org.hamcrest.Matchers.not;
4552
import static org.hamcrest.Matchers.notNullValue;
@@ -260,6 +267,90 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) {
260267
}
261268
}
262269

270+
public void testUpdateKeepAlive() throws Exception {
271+
long nowInMillis = System.currentTimeMillis();
272+
TimeValue keepAlive = timeValueSeconds(between(30, 60));
273+
var request = EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client())
274+
.query("from test | stats sum(pause_me)")
275+
.pragmas(queryPragmas())
276+
.waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10)))
277+
.keepOnCompletion(randomBoolean())
278+
.keepAlive(keepAlive);
279+
final String asyncId;
280+
long currentExpiration;
281+
try {
282+
try (EsqlQueryResponse initialResponse = request.execute().actionGet(60, TimeUnit.SECONDS)) {
283+
assertThat(initialResponse.isRunning(), is(true));
284+
assertTrue(initialResponse.asyncExecutionId().isPresent());
285+
asyncId = initialResponse.asyncExecutionId().get();
286+
}
287+
currentExpiration = getExpirationFromTask(asyncId);
288+
assertThat(currentExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
289+
// update the expiration while the task is still running
290+
int iters = iterations(1, 5);
291+
for (int i = 0; i < iters; i++) {
292+
long extraKeepAlive = randomIntBetween(30, 60);
293+
keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);
294+
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);
295+
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
296+
assertThat(resp.asyncExecutionId(), isPresent());
297+
assertThat(resp.asyncExecutionId().get(), equalTo(asyncId));
298+
assertTrue(resp.isRunning());
299+
}
300+
long updatedExpiration = getExpirationFromTask(asyncId);
301+
assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));
302+
assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
303+
currentExpiration = updatedExpiration;
304+
}
305+
} finally {
306+
scriptPermits.release(numberOfDocs());
307+
}
308+
// allow the query to complete, then update the expiration with the result is being stored in the async index
309+
assertBusy(() -> {
310+
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId);
311+
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
312+
assertThat(resp.isRunning(), is(false));
313+
}
314+
});
315+
// update the keepAlive after the query has completed
316+
int iters = between(1, 5);
317+
for (int i = 0; i < iters; i++) {
318+
long extraKeepAlive = randomIntBetween(30, 60);
319+
keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);
320+
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);
321+
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
322+
assertThat(resp.isRunning(), is(false));
323+
}
324+
long updatedExpiration = getExpirationFromDoc(asyncId);
325+
assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));
326+
assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
327+
currentExpiration = updatedExpiration;
328+
}
329+
}
330+
331+
private static long getExpirationFromTask(String asyncId) {
332+
List<EsqlQueryTask> tasks = new ArrayList<>();
333+
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
334+
for (CancellableTask task : ts.getTaskManager().getCancellableTasks().values()) {
335+
if (task instanceof EsqlQueryTask queryTask) {
336+
EsqlQueryResponse result = queryTask.getCurrentResult();
337+
if (result.isAsync() && result.asyncExecutionId().get().equals(asyncId)) {
338+
tasks.add(queryTask);
339+
}
340+
}
341+
}
342+
}
343+
assertThat(tasks, hasSize(1));
344+
return tasks.getFirst().getExpirationTimeMillis();
345+
}
346+
347+
private static long getExpirationFromDoc(String asyncId) {
348+
String docId = AsyncExecutionId.decode(asyncId).getDocId();
349+
GetResponse doc = client().prepareGet().setIndex(XPackPlugin.ASYNC_RESULTS_INDEX).setId(docId).get();
350+
assertTrue(doc.isExists());
351+
return ((Number) doc.getSource().get(AsyncTaskIndexService.EXPIRATION_TIME_FIELD)).longValue();
352+
}
353+
263354
private List<TaskInfo> getEsqlQueryTasks() throws Exception {
264355
List<TaskInfo> foundTasks = new ArrayList<>();
265356
assertBusy(() -> {

0 commit comments

Comments
 (0)