@@ -53,65 +53,71 @@ public void testStopQuery() throws Exception {
5353 includeCCSMetadata .v1 (),
5454 Map .of ("page_size" , 1 , "data_partitioning" , "shard" , "task_concurrency" , 1 )
5555 );
56+ try {
57+ // wait until we know that the query against 'remote-b:blocking' has started
58+ CountingPauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
59+
60+ // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on
61+ // it)
62+ waitForCluster (client (), REMOTE_CLUSTER_1 , asyncExecutionId );
63+ waitForCluster (client (), LOCAL_CLUSTER , asyncExecutionId );
64+
65+ /* at this point:
66+ * the query against cluster-a should be finished
67+ * the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
68+ * the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
69+ */
70+
71+ // run the stop query
72+ AsyncStopRequest stopRequest = new AsyncStopRequest (asyncExecutionId );
73+ ActionFuture <EsqlQueryResponse > stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
74+ assertBusy (() -> {
75+ List <TaskInfo > tasks = getDriverTasks (client (REMOTE_CLUSTER_2 ));
76+ List <TaskInfo > reduceTasks = tasks .stream ()
77+ .filter (t -> t .description ().contains ("_LuceneSourceOperator" ) == false )
78+ .toList ();
79+ assertThat (reduceTasks , empty ());
80+ });
81+ // allow remoteB query to proceed
82+ CountingPauseFieldPlugin .allowEmitting .countDown ();
83+
84+ // Since part of the query has not been stopped, we expect some result to emerge here
85+ try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
86+ // Check that we did not process all the fields on remote-b
87+ // Should not be getting more than one page here, and we set page size to 1
88+ assertThat (CountingPauseFieldPlugin .count .get (), lessThanOrEqualTo (1L ));
89+ assertThat (asyncResponse .isRunning (), is (false ));
90+ assertThat (asyncResponse .columns ().size (), equalTo (1 ));
91+ assertThat (asyncResponse .values ().hasNext (), is (true ));
92+ Iterator <Object > row = asyncResponse .values ().next ();
93+ // sum of 0-9 is 45, and sum of 0-9 squared is 285
94+ assertThat (row .next (), equalTo (330L ));
5695
57- // wait until we know that the query against 'remote-b:blocking' has started
58- CountingPauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
59-
60- // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
61- waitForCluster (client (), REMOTE_CLUSTER_1 , asyncExecutionId );
62- waitForCluster (client (), LOCAL_CLUSTER , asyncExecutionId );
63-
64- /* at this point:
65- * the query against cluster-a should be finished
66- * the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown)
67- * the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b
68- */
69-
70- // run the stop query
71- AsyncStopRequest stopRequest = new AsyncStopRequest (asyncExecutionId );
72- ActionFuture <EsqlQueryResponse > stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
73- assertBusy (() -> {
74- List <TaskInfo > tasks = getDriverTasks (client (REMOTE_CLUSTER_2 ));
75- List <TaskInfo > reduceTasks = tasks .stream ().filter (t -> t .description ().contains ("_LuceneSourceOperator" ) == false ).toList ();
76- assertThat (reduceTasks , empty ());
77- });
78- // allow remoteB query to proceed
79- CountingPauseFieldPlugin .allowEmitting .countDown ();
80-
81- // Since part of the query has not been stopped, we expect some result to emerge here
82- try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
83- // Check that we did not process all the fields on remote-b
84- // Should not be getting more than one page here, and we set page size to 1
85- assertThat (CountingPauseFieldPlugin .count .get (), lessThanOrEqualTo (1L ));
86- assertThat (asyncResponse .isRunning (), is (false ));
87- assertThat (asyncResponse .columns ().size (), equalTo (1 ));
88- assertThat (asyncResponse .values ().hasNext (), is (true ));
89- Iterator <Object > row = asyncResponse .values ().next ();
90- // sum of 0-9 is 45, and sum of 0-9 squared is 285
91- assertThat (row .next (), equalTo (330L ));
92-
93- EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
94- assertNotNull (executionInfo );
95- assertThat (executionInfo .isCrossClusterSearch (), is (true ));
96- long overallTookMillis = executionInfo .overallTook ().millis ();
97- assertThat (overallTookMillis , greaterThanOrEqualTo (0L ));
98- assertThat (executionInfo .clusterAliases (), equalTo (Set .of (LOCAL_CLUSTER , REMOTE_CLUSTER_1 , REMOTE_CLUSTER_2 )));
99- assertThat (executionInfo .isPartial (), equalTo (true ));
100-
101- EsqlExecutionInfo .Cluster remoteCluster = executionInfo .getCluster (REMOTE_CLUSTER_1 );
102- assertThat (remoteCluster .getIndexExpression (), equalTo ("logs-*" ));
103- assertClusterInfoSuccess (remoteCluster , remote1NumShards );
104-
105- EsqlExecutionInfo .Cluster remote2Cluster = executionInfo .getCluster (REMOTE_CLUSTER_2 );
106- assertThat (remote2Cluster .getIndexExpression (), equalTo ("blocking" ));
107- assertThat (remote2Cluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .PARTIAL ));
108-
109- EsqlExecutionInfo .Cluster localCluster = executionInfo .getCluster (LOCAL_CLUSTER );
110- assertThat (localCluster .getIndexExpression (), equalTo ("logs-*" ));
111- assertClusterInfoSuccess (localCluster , localNumShards );
112-
113- assertClusterMetadataInResponse (asyncResponse , responseExpectMeta , 3 );
96+ EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
97+ assertNotNull (executionInfo );
98+ assertThat (executionInfo .isCrossClusterSearch (), is (true ));
99+ long overallTookMillis = executionInfo .overallTook ().millis ();
100+ assertThat (overallTookMillis , greaterThanOrEqualTo (0L ));
101+ assertThat (executionInfo .clusterAliases (), equalTo (Set .of (LOCAL_CLUSTER , REMOTE_CLUSTER_1 , REMOTE_CLUSTER_2 )));
102+ assertThat (executionInfo .isPartial (), equalTo (true ));
103+
104+ EsqlExecutionInfo .Cluster remoteCluster = executionInfo .getCluster (REMOTE_CLUSTER_1 );
105+ assertThat (remoteCluster .getIndexExpression (), equalTo ("logs-*" ));
106+ assertClusterInfoSuccess (remoteCluster , remote1NumShards );
107+
108+ EsqlExecutionInfo .Cluster remote2Cluster = executionInfo .getCluster (REMOTE_CLUSTER_2 );
109+ assertThat (remote2Cluster .getIndexExpression (), equalTo ("blocking" ));
110+ assertThat (remote2Cluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .PARTIAL ));
111+
112+ EsqlExecutionInfo .Cluster localCluster = executionInfo .getCluster (LOCAL_CLUSTER );
113+ assertThat (localCluster .getIndexExpression (), equalTo ("logs-*" ));
114+ assertClusterInfoSuccess (localCluster , localNumShards );
115+
116+ assertClusterMetadataInResponse (asyncResponse , responseExpectMeta , 3 );
117+ }
114118 } finally {
119+ // Ensure proper cleanup if the test fails
120+ CountingPauseFieldPlugin .allowEmitting .countDown ();
115121 assertAcked (deleteAsyncId (client (), asyncExecutionId ));
116122 }
117123 }
@@ -131,63 +137,66 @@ public void testStopQueryLocal() throws Exception {
131137 includeCCSMetadata .v1 ()
132138 );
133139
134- // wait until we know that the query against 'remote-b:blocking' has started
135- SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
136-
137- // wait until the remotes are done
138- waitForCluster (client (), REMOTE_CLUSTER_1 , asyncExecutionId );
139- waitForCluster (client (), REMOTE_CLUSTER_2 , asyncExecutionId );
140+ try {
141+ // wait until we know that the query against 'remote-b:blocking' has started
142+ SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
143+
144+ // wait until the remotes are done
145+ waitForCluster (client (), REMOTE_CLUSTER_1 , asyncExecutionId );
146+ waitForCluster (client (), REMOTE_CLUSTER_2 , asyncExecutionId );
147+
148+ /* at this point:
149+ * the query against remotes should be finished
150+ * the query against the local cluster should be running because it's blocked
151+ */
152+
153+ // run the stop query
154+ AsyncStopRequest stopRequest = new AsyncStopRequest (asyncExecutionId );
155+ ActionFuture <EsqlQueryResponse > stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
156+ // ensure stop operation is running
157+ assertBusy (() -> {
158+ try (EsqlQueryResponse asyncResponse = getAsyncResponse (client (), asyncExecutionId )) {
159+ EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
160+ assertNotNull (executionInfo );
161+ assertThat (executionInfo .isPartial (), is (true ));
162+ }
163+ });
164+ // allow local query to proceed
165+ SimplePauseFieldPlugin .allowEmitting .countDown ();
166+
167+ // Since part of the query has not been stopped, we expect some result to emerge here
168+ try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
169+ assertThat (asyncResponse .isRunning (), is (false ));
170+ assertThat (asyncResponse .columns ().size (), equalTo (1 ));
171+ assertThat (asyncResponse .values ().hasNext (), is (true ));
172+ Iterator <Object > row = asyncResponse .values ().next ();
173+ // sum of 0-9 squared is 285, from two remotes it's 570
174+ assertThat (row .next (), equalTo (570L ));
140175
141- /* at this point:
142- * the query against remotes should be finished
143- * the query against the local cluster should be running because it's blocked
144- */
145-
146- // run the stop query
147- AsyncStopRequest stopRequest = new AsyncStopRequest (asyncExecutionId );
148- ActionFuture <EsqlQueryResponse > stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
149- // ensure stop operation is running
150- assertBusy (() -> {
151- try (EsqlQueryResponse asyncResponse = getAsyncResponse (client (), asyncExecutionId )) {
152176 EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
153177 assertNotNull (executionInfo );
154- assertThat (executionInfo .isPartial (), is (true ));
178+ assertThat (executionInfo .isCrossClusterSearch (), is (true ));
179+ long overallTookMillis = executionInfo .overallTook ().millis ();
180+ assertThat (overallTookMillis , greaterThanOrEqualTo (0L ));
181+ assertThat (executionInfo .clusterAliases (), equalTo (Set .of (LOCAL_CLUSTER , REMOTE_CLUSTER_1 , REMOTE_CLUSTER_2 )));
182+ assertThat (executionInfo .isPartial (), equalTo (true ));
183+
184+ EsqlExecutionInfo .Cluster remoteCluster = executionInfo .getCluster (REMOTE_CLUSTER_1 );
185+ assertThat (remoteCluster .getIndexExpression (), equalTo ("logs-*" ));
186+ assertClusterInfoSuccess (remoteCluster , remote1NumShards );
187+
188+ EsqlExecutionInfo .Cluster remote2Cluster = executionInfo .getCluster (REMOTE_CLUSTER_2 );
189+ assertThat (remote2Cluster .getIndexExpression (), equalTo ("logs-*" ));
190+ assertClusterInfoSuccess (remote2Cluster , remote2NumShards );
191+
192+ EsqlExecutionInfo .Cluster localCluster = executionInfo .getCluster (LOCAL_CLUSTER );
193+ assertThat (localCluster .getIndexExpression (), equalTo ("blocking" ));
194+ assertThat (localCluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .PARTIAL ));
195+
196+ assertClusterMetadataInResponse (asyncResponse , responseExpectMeta , 3 );
155197 }
156- });
157- // allow local query to proceed
158- SimplePauseFieldPlugin .allowEmitting .countDown ();
159-
160- // Since part of the query has not been stopped, we expect some result to emerge here
161- try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
162- assertThat (asyncResponse .isRunning (), is (false ));
163- assertThat (asyncResponse .columns ().size (), equalTo (1 ));
164- assertThat (asyncResponse .values ().hasNext (), is (true ));
165- Iterator <Object > row = asyncResponse .values ().next ();
166- // sum of 0-9 squared is 285, from two remotes it's 570
167- assertThat (row .next (), equalTo (570L ));
168-
169- EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
170- assertNotNull (executionInfo );
171- assertThat (executionInfo .isCrossClusterSearch (), is (true ));
172- long overallTookMillis = executionInfo .overallTook ().millis ();
173- assertThat (overallTookMillis , greaterThanOrEqualTo (0L ));
174- assertThat (executionInfo .clusterAliases (), equalTo (Set .of (LOCAL_CLUSTER , REMOTE_CLUSTER_1 , REMOTE_CLUSTER_2 )));
175- assertThat (executionInfo .isPartial (), equalTo (true ));
176-
177- EsqlExecutionInfo .Cluster remoteCluster = executionInfo .getCluster (REMOTE_CLUSTER_1 );
178- assertThat (remoteCluster .getIndexExpression (), equalTo ("logs-*" ));
179- assertClusterInfoSuccess (remoteCluster , remote1NumShards );
180-
181- EsqlExecutionInfo .Cluster remote2Cluster = executionInfo .getCluster (REMOTE_CLUSTER_2 );
182- assertThat (remote2Cluster .getIndexExpression (), equalTo ("logs-*" ));
183- assertClusterInfoSuccess (remote2Cluster , remote2NumShards );
184-
185- EsqlExecutionInfo .Cluster localCluster = executionInfo .getCluster (LOCAL_CLUSTER );
186- assertThat (localCluster .getIndexExpression (), equalTo ("blocking" ));
187- assertThat (localCluster .getStatus (), equalTo (EsqlExecutionInfo .Cluster .Status .PARTIAL ));
188-
189- assertClusterMetadataInResponse (asyncResponse , responseExpectMeta , 3 );
190198 } finally {
199+ SimplePauseFieldPlugin .allowEmitting .countDown ();
191200 assertAcked (deleteAsyncId (client (), asyncExecutionId ));
192201 }
193202 }
@@ -205,30 +214,33 @@ public void testStopQueryLocalNoRemotes() throws Exception {
205214 includeCCSMetadata .v1 ()
206215 );
207216
208- // wait until we know that the query against 'remote-b:blocking' has started
209- SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
210-
211- /* at this point:
212- * the query against the local cluster should be running because it's blocked
213- */
214-
215- // run the stop query
216- var stopRequest = new AsyncStopRequest (asyncExecutionId );
217- var stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
218- // allow local query to proceed
219- SimplePauseFieldPlugin .allowEmitting .countDown ();
220-
221- try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
222- assertThat (asyncResponse .isRunning (), is (false ));
223- assertThat (asyncResponse .columns ().size (), equalTo (1 ));
224- assertThat (asyncResponse .values ().hasNext (), is (true ));
225- Iterator <Object > row = asyncResponse .values ().next ();
226- assertThat ((long ) row .next (), greaterThanOrEqualTo (0L ));
227-
228- EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
229- assertNotNull (executionInfo );
230- assertThat (executionInfo .isCrossClusterSearch (), is (false ));
217+ try {
218+ // wait until we know that the query against 'remote-b:blocking' has started
219+ SimplePauseFieldPlugin .startEmitting .await (30 , TimeUnit .SECONDS );
220+
221+ /* at this point:
222+ * the query against the local cluster should be running because it's blocked
223+ */
224+
225+ // run the stop query
226+ var stopRequest = new AsyncStopRequest (asyncExecutionId );
227+ var stopAction = client ().execute (EsqlAsyncStopAction .INSTANCE , stopRequest );
228+ // allow local query to proceed
229+ SimplePauseFieldPlugin .allowEmitting .countDown ();
230+
231+ try (EsqlQueryResponse asyncResponse = stopAction .actionGet (30 , TimeUnit .SECONDS )) {
232+ assertThat (asyncResponse .isRunning (), is (false ));
233+ assertThat (asyncResponse .columns ().size (), equalTo (1 ));
234+ assertThat (asyncResponse .values ().hasNext (), is (true ));
235+ Iterator <Object > row = asyncResponse .values ().next ();
236+ assertThat ((long ) row .next (), greaterThanOrEqualTo (0L ));
237+
238+ EsqlExecutionInfo executionInfo = asyncResponse .getExecutionInfo ();
239+ assertNotNull (executionInfo );
240+ assertThat (executionInfo .isCrossClusterSearch (), is (false ));
241+ }
231242 } finally {
243+ SimplePauseFieldPlugin .allowEmitting .countDown ();
232244 assertAcked (deleteAsyncId (client (), asyncExecutionId ));
233245 }
234246 }
0 commit comments