8
8
package org .elasticsearch .xpack .esql .action ;
9
9
10
10
import org .elasticsearch .Build ;
11
+ import org .elasticsearch .ElasticsearchTimeoutException ;
11
12
import org .elasticsearch .action .ActionFuture ;
12
13
import org .elasticsearch .client .internal .Client ;
13
14
import org .elasticsearch .core .Tuple ;
16
17
import org .elasticsearch .tasks .TaskInfo ;
17
18
import org .elasticsearch .transport .TransportService ;
18
19
import org .elasticsearch .xpack .core .async .AsyncStopRequest ;
20
+ import org .elasticsearch .xpack .core .async .DeleteAsyncResultRequest ;
21
+ import org .elasticsearch .xpack .core .async .TransportDeleteAsyncResultAction ;
19
22
import org .elasticsearch .xpack .esql .plugin .EsqlPlugin ;
20
23
import org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
21
24
28
31
29
32
import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
30
33
import static org .elasticsearch .xpack .esql .action .AbstractEsqlIntegTestCase .randomIncludeCCSMetadata ;
31
- import static org .elasticsearch .xpack .esql .action .EsqlAsyncTestUtils .deleteAsyncId ;
32
34
import static org .elasticsearch .xpack .esql .action .EsqlAsyncTestUtils .getAsyncResponse ;
33
35
import static org .elasticsearch .xpack .esql .action .EsqlAsyncTestUtils .startAsyncQuery ;
34
36
import static org .elasticsearch .xpack .esql .action .EsqlAsyncTestUtils .startAsyncQueryWithPragmas ;
@@ -43,6 +45,11 @@ public class CrossClusterAsyncQueryStopIT extends AbstractCrossClusterTestCase {
43
45
44
46
private static final Logger LOGGER = LogManager .getLogger (CrossClusterAsyncQueryStopIT .class );
45
47
48
+ @ Override
49
+ protected boolean reuseClusters () {
50
+ return false ;
51
+ }
52
+
46
53
public void testStopQuery () throws Exception {
47
54
assumeTrue ("Pragma does not work in release builds" , Build .current ().isSnapshot ());
48
55
Map <String , Object > testClusterInfo = setupClusters (3 );
@@ -125,7 +132,7 @@ public void testStopQuery() throws Exception {
125
132
} finally {
126
133
// Ensure proper cleanup if the test fails
127
134
CountingPauseFieldPlugin .allowEmitting .countDown ();
128
- assertAcked ( deleteAsyncId (client (), asyncExecutionId ) );
135
+ deleteAsyncId (client (), asyncExecutionId );
129
136
}
130
137
}
131
138
@@ -222,7 +229,7 @@ public void testStopQueryLocal() throws Exception {
222
229
}
223
230
} finally {
224
231
SimplePauseFieldPlugin .allowEmitting .countDown ();
225
- assertAcked ( deleteAsyncId (client , asyncExecutionId ) );
232
+ deleteAsyncId (client , asyncExecutionId );
226
233
}
227
234
}
228
235
@@ -265,7 +272,7 @@ public void testStopQueryLocalNoRemotes() throws Exception {
265
272
}
266
273
} finally {
267
274
SimplePauseFieldPlugin .allowEmitting .countDown ();
268
- assertAcked ( deleteAsyncId (client (), asyncExecutionId ) );
275
+ deleteAsyncId (client (), asyncExecutionId );
269
276
}
270
277
}
271
278
@@ -351,7 +358,16 @@ public void testStopQueryInlineStats() throws Exception {
351
358
} finally {
352
359
// Ensure proper cleanup if the test fails
353
360
CountingPauseFieldPlugin .allowEmitting .countDown ();
354
- assertAcked (deleteAsyncId (client (), asyncExecutionId ));
361
+ deleteAsyncId (client (), asyncExecutionId );
362
+ }
363
+ }
364
+
365
+ public void deleteAsyncId (Client client , String id ) {
366
+ try {
367
+ DeleteAsyncResultRequest request = new DeleteAsyncResultRequest (id );
368
+ assertAcked (client .execute (TransportDeleteAsyncResultAction .TYPE , request ).actionGet (30 , TimeUnit .SECONDS ));
369
+ } catch (ElasticsearchTimeoutException e ) {
370
+ LOGGER .warn ("timeout waiting for DELETE response: {}: {}" , id , e );
355
371
}
356
372
}
357
373
0 commit comments