2121import org .elasticsearch .compute .operator .exchange .ExchangeService ;
2222import org .elasticsearch .core .TimeValue ;
2323import org .elasticsearch .plugins .Plugin ;
24+ import org .elasticsearch .tasks .TaskCancelledException ;
2425import org .elasticsearch .tasks .TaskInfo ;
2526import org .elasticsearch .test .AbstractMultiClustersTestCase ;
2627import org .elasticsearch .transport .TransportService ;
4142import static org .hamcrest .Matchers .equalTo ;
4243import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
4344import static org .hamcrest .Matchers .hasSize ;
45+ import static org .hamcrest .Matchers .instanceOf ;
4446
4547public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
4648 private static final String REMOTE_CLUSTER = "cluster-a" ;
@@ -115,6 +117,7 @@ private void createLocalIndex(int numDocs) throws Exception {
115117 mapping .startObject ("const" );
116118 {
117119 mapping .field ("type" , "long" );
120+ mapping .startObject ("script" ).field ("source" , "" ).field ("lang" , "pause" ).endObject ();
118121 }
119122 mapping .endObject ();
120123 }
@@ -123,7 +126,7 @@ private void createLocalIndex(int numDocs) throws Exception {
123126 client (LOCAL_CLUSTER ).admin ().indices ().prepareCreate ("test" ).setMapping (mapping ).get ();
124127 BulkRequestBuilder bulk = client (LOCAL_CLUSTER ).prepareBulk ("test" ).setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
125128 for (int i = 0 ; i < numDocs ; i ++) {
126- bulk .add (new IndexRequest ().source ("const " , i ));
129+ bulk .add (new IndexRequest ().source ("foo " , i ));
127130 }
128131 bulk .get ();
129132 }
@@ -282,7 +285,7 @@ public void testCancelSkipUnavailable() throws Exception {
282285 }
283286
284287 Exception error = expectThrows (Exception .class , requestFuture ::actionGet );
285- assertThat (error . getMessage (), containsString ( "remote failed" ));
288+ assertThat (error , instanceOf ( TaskCancelledException . class ));
286289 }
287290
288291 // Check that closing remote node with skip_unavailable=true produces partial
@@ -311,7 +314,7 @@ public void testCloseSkipUnavailable() throws Exception {
311314 List <List <Object >> values = getValuesList (resp );
312315 assertThat (values .get (0 ).size (), equalTo (1 ));
313316 // We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there
314- assertThat ((long ) values .get (0 ).get (0 ), greaterThanOrEqualTo (45L ));
317+ assertThat ((long ) values .get (0 ).get (0 ), greaterThanOrEqualTo (10L ));
315318
316319 EsqlExecutionInfo .Cluster cluster = executionInfo .getCluster (REMOTE_CLUSTER );
317320 EsqlExecutionInfo .Cluster localCluster = executionInfo .getCluster (LOCAL_CLUSTER );
0 commit comments