@@ -360,63 +360,66 @@ public void testStopQueryLocal() throws Exception {
360360 includeCCSMetadata .v1 ()
361361 );
362362
363- // wait until we know that the query against 'remote-b:blocking' has started
364- SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
365-
366- // wait until the remotes are done
367- waitForCluster (client (), REMOTE_CLUSTER_1 , asyncExecutionId );
368- waitForCluster (client (), REMOTE_CLUSTER_2 , asyncExecutionId );
369-
370- /* at this point:
371- * the query against remotes should be finished
372- * the query against the local cluster should be running because it's blocked
373- */
363+ try {
364+ // wait until we know that the query against 'remote-b:blocking' has started
365+ SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
366+
367+ // wait until the remotes are done
368+ waitForCluster (client (), REMOTE_CLUSTER_1 , asyncExecutionId );
369+ waitForCluster (client (), REMOTE_CLUSTER_2 , asyncExecutionId );
370+
371+ /* at this point:
372+ * the query against remotes should be finished
373+ * the query against the local cluster should be running because it's blocked
374+ */
375+
376+ // run the stop query
377+ AsyncStopRequest stopRequest = new AsyncStopRequest (asyncExecutionId );
378+ ActionFuture <EsqlQueryResponse > stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
379+ // ensure stop operation is running
380+ assertBusy (() -> {
381+ try (EsqlQueryResponse asyncResponse = getAsyncResponse (client (), asyncExecutionId )) {
382+ EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
383+ assertNotNull (executionInfo );
384+ assertThat (executionInfo .isPartial (), is (true ));
385+ }
386+ });
387+ // allow local query to proceed
388+ SimplePauseFieldPlugin .allowEmitting .countDown ();
389+
390+ // Since part of the query has not been stopped, we expect some result to emerge here
391+ try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
392+ assertThat (asyncResponse .isRunning (), is (false ));
393+ assertThat (asyncResponse .columns ().size (), equalTo (1 ));
394+ assertThat (asyncResponse .values ().hasNext (), is (true ));
395+ Iterator <Object > row = asyncResponse .values ().next ();
396+ // sum of 0-9 squared is 285, from two remotes it's 570
397+ assertThat (row .next (), equalTo (570L ));
374398
375- // run the stop query
376- AsyncStopRequest stopRequest = new AsyncStopRequest (asyncExecutionId );
377- ActionFuture <EsqlQueryResponse > stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
378- // ensure stop operation is running
379- assertBusy (() -> {
380- try (EsqlQueryResponse asyncResponse = getAsyncResponse (client (), asyncExecutionId )) {
381399 EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
382400 assertNotNull (executionInfo );
383- assertThat (executionInfo .isPartial (), is (true ));
384- }
385- });
386- // allow local query to proceed
387- SimplePauseFieldPlugin .allowEmitting .countDown ();
388-
389- // Since part of the query has not been stopped, we expect some result to emerge here
390- try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
391- assertThat (asyncResponse .isRunning (), is (false ));
392- assertThat (asyncResponse .columns ().size (), equalTo (1 ));
393- assertThat (asyncResponse .values ().hasNext (), is (true ));
394- Iterator <Object > row = asyncResponse .values ().next ();
395- // sum of 0-9 squared is 285, from two remotes it's 570
396- assertThat (row .next (), equalTo (570L ));
397-
398- EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
399- assertNotNull (executionInfo );
400- assertThat (executionInfo .isCrossClusterSearch (), is (true ));
401- long overallTookMillis = executionInfo .overallTook ().millis ();
402- assertThat (overallTookMillis , greaterThanOrEqualTo (0L ));
403- assertThat (executionInfo .clusterAliases (), equalTo (Set .of (LOCAL_CLUSTER , REMOTE_CLUSTER_1 , REMOTE_CLUSTER_2 )));
404- assertThat (executionInfo .isPartial (), equalTo (true ));
401+ assertThat (executionInfo .isCrossClusterSearch (), is (true ));
402+ long overallTookMillis = executionInfo .overallTook ().millis ();
403+ assertThat (overallTookMillis , greaterThanOrEqualTo (0L ));
404+ assertThat (executionInfo .clusterAliases (), equalTo (Set .of (LOCAL_CLUSTER , REMOTE_CLUSTER_1 , REMOTE_CLUSTER_2 )));
405+ assertThat (executionInfo .isPartial (), equalTo (true ));
405406
406- EsqlExecutionInfo .Cluster remoteCluster = executionInfo .getCluster (REMOTE_CLUSTER_1 );
407- assertThat (remoteCluster .getIndexExpression (), equalTo ("logs-*" ));
408- assertClusterInfoSuccess (remoteCluster , remote1NumShards );
407+ EsqlExecutionInfo .Cluster remoteCluster = executionInfo .getCluster (REMOTE_CLUSTER_1 );
408+ assertThat (remoteCluster .getIndexExpression (), equalTo ("logs-*" ));
409+ assertClusterInfoSuccess (remoteCluster , remote1NumShards );
409410
410- EsqlExecutionInfo .Cluster remote2Cluster = executionInfo .getCluster (REMOTE_CLUSTER_2 );
411- assertThat (remote2Cluster .getIndexExpression (), equalTo ("logs-*" ));
412- assertClusterInfoSuccess (remote2Cluster , remote2NumShards );
411+ EsqlExecutionInfo .Cluster remote2Cluster = executionInfo .getCluster (REMOTE_CLUSTER_2 );
412+ assertThat (remote2Cluster .getIndexExpression (), equalTo ("logs-*" ));
413+ assertClusterInfoSuccess (remote2Cluster , remote2NumShards );
413414
414- EsqlExecutionInfo .Cluster localCluster = executionInfo .getCluster (LOCAL_CLUSTER );
415- assertThat (localCluster .getIndexExpression (), equalTo ("blocking" ));
416- assertThat (localCluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .PARTIAL ));
415+ EsqlExecutionInfo .Cluster localCluster = executionInfo .getCluster (LOCAL_CLUSTER );
416+ assertThat (localCluster .getIndexExpression (), equalTo ("blocking" ));
417+ assertThat (localCluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .PARTIAL ));
417418
418- assertClusterMetadataInResponse (asyncResponse , responseExpectMeta , 3 );
419+ assertClusterMetadataInResponse (asyncResponse , responseExpectMeta , 3 );
420+ }
419421 } finally {
422+ SimplePauseFieldPlugin .allowEmitting .countDown ();
420423 assertAcked (deleteAsyncId (client (), asyncExecutionId ));
421424 }
422425 }
@@ -434,30 +437,33 @@ public void testStopQueryLocalNoRemotes() throws Exception {
434437 includeCCSMetadata .v1 ()
435438 );
436439
437- // wait until we know that the query against 'remote-b:blocking' has started
438- SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
440+ try {
441+ // wait until we know that the query against 'remote-b:blocking' has started
442+ SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
439443
440- /* at this point:
441- * the query against the local cluster should be running because it's blocked
442- */
444+ /* at this point:
445+ * the query against the local cluster should be running because it's blocked
446+ */
443447
444- // run the stop query
445- var stopRequest = new AsyncStopRequest (asyncExecutionId );
446- var stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
447- // allow local query to proceed
448- SimplePauseFieldPlugin .allowEmitting .countDown ();
448+ // run the stop query
449+ var stopRequest = new AsyncStopRequest (asyncExecutionId );
450+ var stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
451+ // allow local query to proceed
452+ SimplePauseFieldPlugin .allowEmitting .countDown ();
449453
450- try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
451- assertThat (asyncResponse .isRunning (), is (false ));
452- assertThat (asyncResponse .columns ().size (), equalTo (1 ));
453- assertThat (asyncResponse .values ().hasNext (), is (true ));
454- Iterator <Object > row = asyncResponse .values ().next ();
455- assertThat ((long ) row .next (), greaterThanOrEqualTo (0L ));
454+ try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
455+ assertThat (asyncResponse .isRunning (), is (false ));
456+ assertThat (asyncResponse .columns ().size (), equalTo (1 ));
457+ assertThat (asyncResponse .values ().hasNext (), is (true ));
458+ Iterator <Object > row = asyncResponse .values ().next ();
459+ assertThat ((long ) row .next (), greaterThanOrEqualTo (0L ));
456460
457- EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
458- assertNotNull (executionInfo );
459- assertThat (executionInfo .isCrossClusterSearch (), is (false ));
461+ EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
462+ assertNotNull (executionInfo );
463+ assertThat (executionInfo .isCrossClusterSearch (), is (false ));
464+ }
460465 } finally {
466+ SimplePauseFieldPlugin .allowEmitting .countDown ();
461467 assertAcked (deleteAsyncId (client (), asyncExecutionId ));
462468 }
463469 }
0 commit comments