Skip to content

Commit 79fd77e

Browse files
committed
Fix update expiration for async query
1 parent 1235efc commit 79fd77e

File tree

3 files changed

+155
-40
lines changed

3 files changed

+155
-40
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.ResourceNotFoundException;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.update.UpdateResponse;
1516
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.TriFunction;
@@ -86,23 +87,10 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener<Respons
8687
// EQL doesn't store initial or intermediate results so we only need to update expiration time in store for only in case of
8788
// async search
8889
if (updateInitialResultsInStore & expirationTime > 0) {
89-
store.updateExpirationTime(
90-
searchId.getDocId(),
90+
updateExpirationTime(
91+
searchId,
9192
expirationTime,
92-
ActionListener.wrap(p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> {
93-
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
94-
if (status != RestStatus.NOT_FOUND) {
95-
logger.error(
96-
() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()),
97-
exc
98-
);
99-
listener.onFailure(exc);
100-
} else {
101-
// the async search document or its index is not found.
102-
// That can happen if an invalid/deleted search id is provided.
103-
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
104-
}
105-
})
93+
listener.delegateFailure((l, unused) -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, l))
10694
);
10795
} else {
10896
getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener);
@@ -122,45 +110,63 @@ private void getSearchResponseFromTask(
122110
try {
123111
final Task task = store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass);
124112
if (task == null || (updateInitialResultsInStore && task.isCancelled())) {
125-
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
113+
getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener);
126114
return;
127115
}
128116

129117
if (expirationTimeMillis != -1) {
130118
task.setExpirationTime(expirationTimeMillis);
131119
}
132-
boolean added = addCompletionListener.apply(
133-
task,
134-
listener.delegateFailure((l, response) -> sendFinalResponse(request, response, nowInMillis, l)),
135-
request.getWaitForCompletionTimeout()
136-
);
120+
boolean added = addCompletionListener.apply(task, listener.delegateFailure((l, response) -> {
121+
// If the task expiration is updated after the document was created with an earlier expiration time,
122+
// we should TRY to update the document's expiration time here.
123+
if (updateInitialResultsInStore == false && expirationTimeMillis != -1) {
124+
store.updateExpirationTime(
125+
searchId.getDocId(),
126+
expirationTimeMillis,
127+
ActionListener.running(() -> sendFinalResponse(request, response, nowInMillis, l))
128+
);
129+
} else {
130+
sendFinalResponse(request, response, nowInMillis, l);
131+
}
132+
}), request.getWaitForCompletionTimeout());
137133
if (added == false) {
138134
// the task must have completed, since we cannot add a completion listener
139135
assert store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass) == null;
140-
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
136+
getSearchResponseFromIndexAndUpdateExpiration(searchId, request, nowInMillis, expirationTimeMillis, listener);
141137
}
142138
} catch (Exception exc) {
143139
listener.onFailure(exc);
144140
}
145141
}
146142

147-
private void getSearchResponseFromIndex(
143+
private void getSearchResponseFromIndexAndUpdateExpiration(
148144
AsyncExecutionId searchId,
149145
GetAsyncResultRequest request,
150146
long nowInMillis,
151-
ActionListener<Response> listener
147+
long expirationTime,
148+
ActionListener<Response> outListener
152149
) {
153-
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
154-
try {
155-
sendFinalResponse(request, response, nowInMillis, l);
156-
} finally {
157-
if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
158-
&& storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
159-
refCounted.decRef();
150+
var updateListener = outListener.delegateFailure((listener, unused) -> {
151+
store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
152+
try {
153+
sendFinalResponse(request, response, nowInMillis, l);
154+
} finally {
155+
if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
156+
&& storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
157+
refCounted.decRef();
158+
}
160159
}
161-
}
162160

163-
}));
161+
}));
162+
});
163+
// If updateInitialResultsInStore=false, we can't update expiration while the task is running since the document doesn't exist yet.
164+
// So let's update the expiration here when the task has been completed.
165+
if (updateInitialResultsInStore == false && expirationTime != -1) {
166+
updateExpirationTime(searchId, expirationTime, updateListener.map(unused -> null));
167+
} else {
168+
updateListener.onResponse(null);
169+
}
164170
}
165171

166172
private void sendFinalResponse(GetAsyncResultRequest request, Response response, long nowInMillis, ActionListener<Response> listener) {
@@ -172,4 +178,18 @@ private void sendFinalResponse(GetAsyncResultRequest request, Response response,
172178

173179
listener.onResponse(response);
174180
}
181+
182+
private void updateExpirationTime(AsyncExecutionId searchId, long expirationTime, ActionListener<UpdateResponse> listener) {
183+
store.updateExpirationTime(searchId.getDocId(), expirationTime, listener.delegateResponse((l, e) -> {
184+
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(e));
185+
if (status != RestStatus.NOT_FOUND) {
186+
logger.error(() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()), e);
187+
l.onFailure(e);
188+
} else {
189+
// the async search document or its index is not found.
190+
// That can happen if an invalid/deleted search id is provided.
191+
l.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
192+
}
193+
}));
194+
}
175195
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncResultsServiceTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,11 @@ public void testAssertExpirationPropagation() throws Exception {
231231
try {
232232
long startTime = System.currentTimeMillis();
233233
task.setExpirationTime(startTime + TimeValue.timeValueMinutes(1).getMillis());
234-
235-
if (updateInitialResultsInStore) {
234+
boolean taskCompleted = randomBoolean();
235+
if (taskCompleted) {
236+
taskManager.unregister(task);
237+
}
238+
if (taskCompleted || updateInitialResultsInStore) {
236239
// we need to store initial result
237240
PlainActionFuture<DocWriteResponse> future = new PlainActionFuture<>();
238241
indexService.createResponse(
@@ -249,10 +252,11 @@ public void testAssertExpirationPropagation() throws Exception {
249252
// not waiting for completion, so should return immediately with timeout
250253
service.retrieveResult(new GetAsyncResultRequest(task.getExecutionId().getEncoded()).setKeepAlive(newKeepAlive), listener);
251254
listener.actionGet(TimeValue.timeValueSeconds(10));
252-
assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis()));
253-
assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis()));
254-
255-
if (updateInitialResultsInStore) {
255+
if (taskCompleted == false) {
256+
assertThat(task.getExpirationTime(), greaterThanOrEqualTo(startTime + newKeepAlive.getMillis()));
257+
assertThat(task.getExpirationTime(), lessThanOrEqualTo(System.currentTimeMillis() + newKeepAlive.getMillis()));
258+
}
259+
if (updateInitialResultsInStore || taskCompleted) {
256260
PlainActionFuture<TestAsyncResponse> future = new PlainActionFuture<>();
257261
indexService.getResponse(task.executionId, randomBoolean(), future);
258262
TestAsyncResponse response = future.actionGet(TimeValue.timeValueMinutes(10));

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,21 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.ResourceNotFoundException;
11+
import org.elasticsearch.action.get.GetResponse;
1112
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.compute.operator.DriverTaskRunner;
1415
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1516
import org.elasticsearch.core.TimeValue;
1617
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.tasks.CancellableTask;
1719
import org.elasticsearch.tasks.TaskCancelledException;
1820
import org.elasticsearch.tasks.TaskInfo;
21+
import org.elasticsearch.transport.TransportService;
1922
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
23+
import org.elasticsearch.xpack.core.XPackPlugin;
24+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
25+
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
2026
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
2127
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
2228
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
@@ -40,6 +46,7 @@
4046
import static org.hamcrest.Matchers.empty;
4147
import static org.hamcrest.Matchers.equalTo;
4248
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
49+
import static org.hamcrest.Matchers.hasSize;
4350
import static org.hamcrest.Matchers.is;
4451
import static org.hamcrest.Matchers.not;
4552
import static org.hamcrest.Matchers.notNullValue;
@@ -260,6 +267,90 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) {
260267
}
261268
}
262269

270+
public void testUpdateKeepAlive() throws Exception {
271+
long nowInMillis = System.currentTimeMillis();
272+
TimeValue keepAlive = timeValueSeconds(between(30, 60));
273+
var request = EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client())
274+
.query("from test | stats sum(pause_me)")
275+
.pragmas(queryPragmas())
276+
.waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10)))
277+
.keepOnCompletion(randomBoolean())
278+
.keepAlive(keepAlive);
279+
final String asyncId;
280+
long currentExpiration;
281+
try {
282+
try (EsqlQueryResponse initialResponse = request.execute().actionGet(60, TimeUnit.SECONDS)) {
283+
assertThat(initialResponse.isRunning(), is(true));
284+
assertTrue(initialResponse.asyncExecutionId().isPresent());
285+
asyncId = initialResponse.asyncExecutionId().get();
286+
}
287+
currentExpiration = getExpirationFromTask(asyncId);
288+
assertThat(currentExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
289+
// update the expiration while the task is still running
290+
int iters = iterations(1, 5);
291+
for (int i = 0; i < iters; i++) {
292+
long extraKeepAlive = randomIntBetween(30, 60);
293+
keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);
294+
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);
295+
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
296+
assertThat(resp.asyncExecutionId(), isPresent());
297+
assertThat(resp.asyncExecutionId().get(), equalTo(asyncId));
298+
assertTrue(resp.isRunning());
299+
}
300+
long updatedExpiration = getExpirationFromTask(asyncId);
301+
assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));
302+
assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
303+
currentExpiration = updatedExpiration;
304+
}
305+
} finally {
306+
scriptPermits.release(numberOfDocs());
307+
}
308+
// allow the query to complete, then update the expiration with the result is being stored in the async index
309+
assertBusy(() -> {
310+
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId);
311+
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
312+
assertThat(resp.isRunning(), is(false));
313+
}
314+
});
315+
// update the keepAlive after the query has completed
316+
int iters = between(1, 5);
317+
for (int i = 0; i < iters; i++) {
318+
long extraKeepAlive = randomIntBetween(30, 60);
319+
keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);
320+
GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);
321+
try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {
322+
assertThat(resp.isRunning(), is(false));
323+
}
324+
long updatedExpiration = getExpirationFromDoc(asyncId);
325+
assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));
326+
assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
327+
currentExpiration = updatedExpiration;
328+
}
329+
}
330+
331+
private static long getExpirationFromTask(String asyncId) {
332+
List<EsqlQueryTask> tasks = new ArrayList<>();
333+
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
334+
for (CancellableTask task : ts.getTaskManager().getCancellableTasks().values()) {
335+
if (task instanceof EsqlQueryTask queryTask) {
336+
EsqlQueryResponse result = queryTask.getCurrentResult();
337+
if (result.isAsync() && result.asyncExecutionId().get().equals(asyncId)) {
338+
tasks.add(queryTask);
339+
}
340+
}
341+
}
342+
}
343+
assertThat(tasks, hasSize(1));
344+
return tasks.getFirst().getExpirationTimeMillis();
345+
}
346+
347+
private static long getExpirationFromDoc(String asyncId) {
348+
String docId = AsyncExecutionId.decode(asyncId).getDocId();
349+
GetResponse doc = client().prepareGet().setIndex(XPackPlugin.ASYNC_RESULTS_INDEX).setId(docId).get();
350+
assertTrue(doc.isExists());
351+
return ((Number) doc.getSource().get(AsyncTaskIndexService.EXPIRATION_TIME_FIELD)).longValue();
352+
}
353+
263354
private List<TaskInfo> getEsqlQueryTasks() throws Exception {
264355
List<TaskInfo> foundTasks = new ArrayList<>();
265356
assertBusy(() -> {

0 commit comments

Comments
 (0)