| 
8 | 8 | package org.elasticsearch.xpack.esql.action;  | 
9 | 9 | 
 
  | 
10 | 10 | import org.elasticsearch.ResourceNotFoundException;  | 
 | 11 | +import org.elasticsearch.action.get.GetResponse;  | 
11 | 12 | import org.elasticsearch.action.support.master.AcknowledgedResponse;  | 
12 | 13 | import org.elasticsearch.common.settings.Settings;  | 
13 | 14 | import org.elasticsearch.compute.operator.DriverTaskRunner;  | 
14 | 15 | import org.elasticsearch.compute.operator.exchange.ExchangeService;  | 
15 | 16 | import org.elasticsearch.core.TimeValue;  | 
16 | 17 | import org.elasticsearch.plugins.Plugin;  | 
 | 18 | +import org.elasticsearch.tasks.CancellableTask;  | 
17 | 19 | import org.elasticsearch.tasks.TaskCancelledException;  | 
18 | 20 | import org.elasticsearch.tasks.TaskInfo;  | 
 | 21 | +import org.elasticsearch.transport.TransportService;  | 
19 | 22 | import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;  | 
 | 23 | +import org.elasticsearch.xpack.core.XPackPlugin;  | 
 | 24 | +import org.elasticsearch.xpack.core.async.AsyncExecutionId;  | 
 | 25 | +import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;  | 
20 | 26 | import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;  | 
21 | 27 | import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;  | 
22 | 28 | import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;  | 
 | 
40 | 46 | import static org.hamcrest.Matchers.empty;  | 
41 | 47 | import static org.hamcrest.Matchers.equalTo;  | 
42 | 48 | import static org.hamcrest.Matchers.greaterThanOrEqualTo;  | 
 | 49 | +import static org.hamcrest.Matchers.hasSize;  | 
43 | 50 | import static org.hamcrest.Matchers.is;  | 
44 | 51 | import static org.hamcrest.Matchers.not;  | 
45 | 52 | import static org.hamcrest.Matchers.notNullValue;  | 
@@ -260,6 +267,90 @@ private void testFinishingBeforeTimeout(boolean keepOnCompletion) {  | 
260 | 267 |         }  | 
261 | 268 |     }  | 
262 | 269 | 
 
  | 
 | 270 | +    public void testUpdateKeepAlive() throws Exception {  | 
 | 271 | +        long nowInMillis = System.currentTimeMillis();  | 
 | 272 | +        TimeValue keepAlive = timeValueSeconds(between(30, 60));  | 
 | 273 | +        var request = EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client())  | 
 | 274 | +            .query("from test | stats sum(pause_me)")  | 
 | 275 | +            .pragmas(queryPragmas())  | 
 | 276 | +            .waitForCompletionTimeout(TimeValue.timeValueMillis(between(1, 10)))  | 
 | 277 | +            .keepOnCompletion(randomBoolean())  | 
 | 278 | +            .keepAlive(keepAlive);  | 
 | 279 | +        final String asyncId;  | 
 | 280 | +        long currentExpiration;  | 
 | 281 | +        try {  | 
 | 282 | +            try (EsqlQueryResponse initialResponse = request.execute().actionGet(60, TimeUnit.SECONDS)) {  | 
 | 283 | +                assertThat(initialResponse.isRunning(), is(true));  | 
 | 284 | +                assertTrue(initialResponse.asyncExecutionId().isPresent());  | 
 | 285 | +                asyncId = initialResponse.asyncExecutionId().get();  | 
 | 286 | +            }  | 
 | 287 | +            currentExpiration = getExpirationFromTask(asyncId);  | 
 | 288 | +            assertThat(currentExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));  | 
 | 289 | +            // update the expiration while the task is still running  | 
 | 290 | +            int iters = iterations(1, 5);  | 
 | 291 | +            for (int i = 0; i < iters; i++) {  | 
 | 292 | +                long extraKeepAlive = randomIntBetween(30, 60);  | 
 | 293 | +                keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);  | 
 | 294 | +                GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);  | 
 | 295 | +                try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {  | 
 | 296 | +                    assertThat(resp.asyncExecutionId(), isPresent());  | 
 | 297 | +                    assertThat(resp.asyncExecutionId().get(), equalTo(asyncId));  | 
 | 298 | +                    assertTrue(resp.isRunning());  | 
 | 299 | +                }  | 
 | 300 | +                long updatedExpiration = getExpirationFromTask(asyncId);  | 
 | 301 | +                assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));  | 
 | 302 | +                assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));  | 
 | 303 | +                currentExpiration = updatedExpiration;  | 
 | 304 | +            }  | 
 | 305 | +        } finally {  | 
 | 306 | +            scriptPermits.release(numberOfDocs());  | 
 | 307 | +        }  | 
 | 308 | +        // allow the query to complete, then update the expiration with the result is being stored in the async index  | 
 | 309 | +        assertBusy(() -> {  | 
 | 310 | +            GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId);  | 
 | 311 | +            try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {  | 
 | 312 | +                assertThat(resp.isRunning(), is(false));  | 
 | 313 | +            }  | 
 | 314 | +        });  | 
 | 315 | +        // update the keepAlive after the query has completed  | 
 | 316 | +        int iters = between(1, 5);  | 
 | 317 | +        for (int i = 0; i < iters; i++) {  | 
 | 318 | +            long extraKeepAlive = randomIntBetween(30, 60);  | 
 | 319 | +            keepAlive = TimeValue.timeValueSeconds(keepAlive.seconds() + extraKeepAlive);  | 
 | 320 | +            GetAsyncResultRequest getRequest = new GetAsyncResultRequest(asyncId).setKeepAlive(keepAlive);  | 
 | 321 | +            try (var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getRequest).actionGet()) {  | 
 | 322 | +                assertThat(resp.isRunning(), is(false));  | 
 | 323 | +            }  | 
 | 324 | +            long updatedExpiration = getExpirationFromDoc(asyncId);  | 
 | 325 | +            assertThat(updatedExpiration, greaterThanOrEqualTo(currentExpiration + extraKeepAlive));  | 
 | 326 | +            assertThat(updatedExpiration, greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis()));  | 
 | 327 | +            currentExpiration = updatedExpiration;  | 
 | 328 | +        }  | 
 | 329 | +    }  | 
 | 330 | + | 
 | 331 | +    private static long getExpirationFromTask(String asyncId) {  | 
 | 332 | +        List<EsqlQueryTask> tasks = new ArrayList<>();  | 
 | 333 | +        for (TransportService ts : internalCluster().getInstances(TransportService.class)) {  | 
 | 334 | +            for (CancellableTask task : ts.getTaskManager().getCancellableTasks().values()) {  | 
 | 335 | +                if (task instanceof EsqlQueryTask queryTask) {  | 
 | 336 | +                    EsqlQueryResponse result = queryTask.getCurrentResult();  | 
 | 337 | +                    if (result.isAsync() && result.asyncExecutionId().get().equals(asyncId)) {  | 
 | 338 | +                        tasks.add(queryTask);  | 
 | 339 | +                    }  | 
 | 340 | +                }  | 
 | 341 | +            }  | 
 | 342 | +        }  | 
 | 343 | +        assertThat(tasks, hasSize(1));  | 
 | 344 | +        return tasks.getFirst().getExpirationTimeMillis();  | 
 | 345 | +    }  | 
 | 346 | + | 
 | 347 | +    private static long getExpirationFromDoc(String asyncId) {  | 
 | 348 | +        String docId = AsyncExecutionId.decode(asyncId).getDocId();  | 
 | 349 | +        GetResponse doc = client().prepareGet().setIndex(XPackPlugin.ASYNC_RESULTS_INDEX).setId(docId).get();  | 
 | 350 | +        assertTrue(doc.isExists());  | 
 | 351 | +        return ((Number) doc.getSource().get(AsyncTaskIndexService.EXPIRATION_TIME_FIELD)).longValue();  | 
 | 352 | +    }  | 
 | 353 | + | 
263 | 354 |     private List<TaskInfo> getEsqlQueryTasks() throws Exception {  | 
264 | 355 |         List<TaskInfo> foundTasks = new ArrayList<>();  | 
265 | 356 |         assertBusy(() -> {  | 
 | 
0 commit comments