Skip to content

Commit 5be018a

Browse files
committed
Fix expiration time in ES|QL async
1 parent fe4c41e commit 5be018a

File tree

2 files changed

+6
-7
lines changed

2 files changed

+6
-7
lines changed

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ public AsyncTaskManagementService(
177177
public void asyncExecute(
178178
Request request,
179179
TimeValue waitForCompletionTimeout,
180-
TimeValue keepAlive,
181180
boolean keepOnCompletion,
182181
ActionListener<Response> listener
183182
) {
@@ -190,7 +189,7 @@ public void asyncExecute(
190189
operation.execute(
191190
request,
192191
searchTask,
193-
wrapStoringListener(searchTask, waitForCompletionTimeout, keepAlive, keepOnCompletion, listener)
192+
wrapStoringListener(searchTask, waitForCompletionTimeout, keepOnCompletion, listener)
194193
);
195194
operationStarted = true;
196195
} finally {
@@ -205,7 +204,6 @@ public void asyncExecute(
205204
private ActionListener<Response> wrapStoringListener(
206205
T searchTask,
207206
TimeValue waitForCompletionTimeout,
208-
TimeValue keepAlive,
209207
boolean keepOnCompletion,
210208
ActionListener<Response> listener
211209
) {
@@ -227,7 +225,7 @@ private ActionListener<Response> wrapStoringListener(
227225
if (keepOnCompletion) {
228226
storeResults(
229227
searchTask,
230-
new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()),
228+
new StoredAsyncResponse<>(response, searchTask.getExpirationTimeMillis()),
231229
ActionListener.running(() -> acquiredListener.onResponse(response))
232230
);
233231
} else {
@@ -239,7 +237,7 @@ private ActionListener<Response> wrapStoringListener(
239237
// We finished after timeout - saving results
240238
storeResults(
241239
searchTask,
242-
new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()),
240+
new StoredAsyncResponse<>(response, searchTask.getExpirationTimeMillis()),
243241
ActionListener.running(response::decRef)
244242
);
245243
}
@@ -251,7 +249,7 @@ private ActionListener<Response> wrapStoringListener(
251249
if (keepOnCompletion) {
252250
storeResults(
253251
searchTask,
254-
new StoredAsyncResponse<>(e, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()),
252+
new StoredAsyncResponse<>(e, searchTask.getExpirationTimeMillis()),
255253
ActionListener.running(() -> acquiredListener.onFailure(e))
256254
);
257255
} else {
@@ -261,7 +259,7 @@ private ActionListener<Response> wrapStoringListener(
261259
}
262260
} else {
263261
// We finished after timeout - saving exception
264-
storeResults(searchTask, new StoredAsyncResponse<>(e, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()));
262+
storeResults(searchTask, new StoredAsyncResponse<>(e, searchTask.getExpirationTimeMillis()));
265263
}
266264
});
267265
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ public void testUpdateKeepAlive() throws Exception {
312312
assertThat(resp.isRunning(), is(false));
313313
}
314314
});
315+
assertThat(getExpirationFromDoc(asyncId), greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));
315316
// update the keepAlive after the query has completed
316317
int iters = between(1, 5);
317318
for (int i = 0; i < iters; i++) {

0 commit comments

Comments
 (0)