77
88package org .elasticsearch .xpack .esql .action ;
99
10+ import org .elasticsearch .Build ;
1011import org .elasticsearch .action .ActionFuture ;
1112import org .elasticsearch .action .admin .cluster .node .tasks .cancel .CancelTasksRequest ;
1213import org .elasticsearch .action .admin .cluster .node .tasks .cancel .TransportCancelTasksAction ;
2021import org .elasticsearch .compute .operator .exchange .ExchangeService ;
2122import org .elasticsearch .core .TimeValue ;
2223import org .elasticsearch .plugins .Plugin ;
24+ import org .elasticsearch .tasks .TaskCancelledException ;
2325import org .elasticsearch .tasks .TaskInfo ;
2426import org .elasticsearch .test .AbstractMultiClustersTestCase ;
2527import org .elasticsearch .transport .TransportService ;
3941import static org .hamcrest .Matchers .equalTo ;
4042import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
4143import static org .hamcrest .Matchers .hasSize ;
44+ import static org .hamcrest .Matchers .instanceOf ;
4245
4346public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
4447 private static final String REMOTE_CLUSTER = "cluster-a" ;
@@ -75,6 +78,11 @@ public void resetPlugin() {
7578 SimplePauseFieldPlugin .resetPlugin ();
7679 }
7780
81+ @ Override
82+ protected boolean reuseClusters () {
83+ return false ;
84+ }
85+
7886 private void createRemoteIndex (int numDocs ) throws Exception {
7987 XContentBuilder mapping = JsonXContent .contentBuilder ().startObject ();
8088 mapping .startObject ("runtime" );
@@ -96,6 +104,26 @@ private void createRemoteIndex(int numDocs) throws Exception {
96104 bulk .get ();
97105 }
98106
107+ private void createLocalIndex (int numDocs ) throws Exception {
108+ XContentBuilder mapping = JsonXContent .contentBuilder ().startObject ();
109+ mapping .startObject ("runtime" );
110+ {
111+ mapping .startObject ("const" );
112+ {
113+ mapping .field ("type" , "long" );
114+ }
115+ mapping .endObject ();
116+ }
117+ mapping .endObject ();
118+ mapping .endObject ();
119+ client (LOCAL_CLUSTER ).admin ().indices ().prepareCreate ("test" ).setMapping (mapping ).get ();
120+ BulkRequestBuilder bulk = client (LOCAL_CLUSTER ).prepareBulk ("test" ).setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
121+ for (int i = 0 ; i < numDocs ; i ++) {
122+ bulk .add (new IndexRequest ().source ("const" , i ));
123+ }
124+ bulk .get ();
125+ }
126+
99127 public void testCancel () throws Exception {
100128 createRemoteIndex (between (10 , 100 ));
101129 EsqlQueryRequest request = EsqlQueryRequest .syncEsqlQueryRequest ();
@@ -208,4 +236,93 @@ public void testTasks() throws Exception {
208236 }
209237 requestFuture .actionGet (30 , TimeUnit .SECONDS ).close ();
210238 }
239+
240+ // Check that cancelling remote task with skip_unavailable=true produces partial
241+ public void testCancelSkipUnavailable () throws Exception {
242+ createRemoteIndex (between (10 , 100 ));
243+ EsqlQueryRequest request = EsqlQueryRequest .syncEsqlQueryRequest ();
244+ request .query ("FROM *:test | STATS total=sum(const) | LIMIT 1" );
245+ request .pragmas (randomPragmas ());
246+ request .includeCCSMetadata (true );
247+ PlainActionFuture <EsqlQueryResponse > requestFuture = new PlainActionFuture <>();
248+ client ().execute (EsqlQueryAction .INSTANCE , request , requestFuture );
249+ assertTrue (SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS ));
250+ List <TaskInfo > rootTasks = new ArrayList <>();
251+ assertBusy (() -> {
252+ List <TaskInfo > tasks = client (REMOTE_CLUSTER ).admin ()
253+ .cluster ()
254+ .prepareListTasks ()
255+ .setActions (ComputeService .CLUSTER_ACTION_NAME )
256+ .get ()
257+ .getTasks ();
258+ assertThat (tasks , hasSize (1 ));
259+ rootTasks .addAll (tasks );
260+ });
261+ var cancelRequest = new CancelTasksRequest ().setTargetTaskId (rootTasks .get (0 ).taskId ()).setReason ("remote failed" );
262+ client (REMOTE_CLUSTER ).execute (TransportCancelTasksAction .TYPE , cancelRequest );
263+ try {
264+ assertBusy (() -> {
265+ List <TaskInfo > drivers = client (REMOTE_CLUSTER ).admin ()
266+ .cluster ()
267+ .prepareListTasks ()
268+ .setActions (DriverTaskRunner .ACTION_NAME )
269+ .get ()
270+ .getTasks ();
271+ assertThat (drivers .size (), greaterThanOrEqualTo (1 ));
272+ for (TaskInfo driver : drivers ) {
273+ assertTrue (driver .cancelled ());
274+ }
275+ });
276+ } finally {
277+ SimplePauseFieldPlugin .allowEmitting .countDown ();
278+ }
279+ var resp = requestFuture .actionGet ();
280+ EsqlExecutionInfo executionInfo = resp .getExecutionInfo ();
281+
282+ assertNotNull (executionInfo );
283+ EsqlExecutionInfo .Cluster cluster = executionInfo .getCluster (REMOTE_CLUSTER );
284+
285+ assertThat (cluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .PARTIAL ));
286+ assertThat (cluster .getFailures ().size (), equalTo (1 ));
287+ assertThat (cluster .getFailures ().get (0 ).getCause (), instanceOf (TaskCancelledException .class ));
288+ }
289+
290+ // Check that closing remote node with skip_unavailable=true produces partial
291+ public void testCloseSkipUnavailable () throws Exception {
292+ // We are using delay() here because closing cluster while inside pause fields doesn't seem to produce clean closure
293+ assumeTrue ("Only snapshot builds have delay()" , Build .current ().isSnapshot ());
294+ createRemoteIndex (between (1000 , 5000 ));
295+ createLocalIndex (10 );
296+ EsqlQueryRequest request = EsqlQueryRequest .syncEsqlQueryRequest ();
297+ request .query ("""
298+ FROM test*,cluster-a:test* METADATA _index
299+ | EVAL cluster=MV_FIRST(SPLIT(_index, ":"))
300+ | WHERE CASE(cluster == "cluster-a", delay(1ms), true)
301+ | STATS total = sum(const) | LIMIT 1
302+ """ );
303+ request .pragmas (randomPragmas ());
304+ var requestFuture = client ().execute (EsqlQueryAction .INSTANCE , request );
305+ assertTrue (SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS ));
306+ SimplePauseFieldPlugin .allowEmitting .countDown ();
307+ cluster (REMOTE_CLUSTER ).close ();
308+ try (var resp = requestFuture .actionGet ()) {
309+ EsqlExecutionInfo executionInfo = resp .getExecutionInfo ();
310+ assertNotNull (executionInfo );
311+
312+ List <List <Object >> values = getValuesList (resp );
313+ assertThat (values .get (0 ).size (), equalTo (1 ));
314+ // We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there
315+ assertThat ((long ) values .get (0 ).get (0 ), greaterThanOrEqualTo (45L ));
316+
317+ EsqlExecutionInfo .Cluster cluster = executionInfo .getCluster (REMOTE_CLUSTER );
318+ EsqlExecutionInfo .Cluster localCluster = executionInfo .getCluster (LOCAL_CLUSTER );
319+
320+ assertThat (localCluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .SUCCESSFUL ));
321+ assertThat (localCluster .getSuccessfulShards (), equalTo (1 ));
322+
323+ assertThat (cluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .PARTIAL ));
324+ assertThat (cluster .getSuccessfulShards (), equalTo (0 ));
325+ assertThat (cluster .getFailures ().size (), equalTo (1 ));
326+ }
327+ }
211328}
0 commit comments