diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 1c67b956de5e5..8a09c4af8ca51 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -53,65 +53,71 @@ public void testStopQuery() throws Exception { includeCCSMetadata.v1(), Map.of("page_size", 1, "data_partitioning", "shard", "task_concurrency", 1) ); + try { + // wait until we know that the query against 'remote-b:blocking' has started + CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); + + // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on + // it) + waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); + waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); + + /* at this point: + * the query against cluster-a should be finished + * the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown) + * the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b + */ + + // run the stop query + AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); + ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream() + .filter(t -> t.description().contains("_LuceneSourceOperator") == false) + .toList(); + assertThat(reduceTasks, empty()); + }); + // allow remoteB query to proceed + CountingPauseFieldPlugin.allowEmitting.countDown(); + + // Since part of the query has not been stopped, we expect some result to emerge here + try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { + // Check that we did not process all the fields on remote-b + // Should not be getting more than one page here, and we set page size to 1 + assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L)); + assertThat(asyncResponse.isRunning(), is(false)); + assertThat(asyncResponse.columns().size(), equalTo(1)); + assertThat(asyncResponse.values().hasNext(), is(true)); + Iterator row = asyncResponse.values().next(); + // sum of 0-9 is 45, and sum of 0-9 squared is 285 + assertThat(row.next(), equalTo(330L)); - // wait until we know that the query against 'remote-b:blocking' has started - CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); - - // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it) - waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); - waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); - - /* at this point: - * the query against cluster-a should be finished - * the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown) - * the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b - */ - - // run the stop query - AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); - ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); - assertBusy(() -> { - List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); - List reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList(); - assertThat(reduceTasks, empty()); - }); - // allow remoteB query to proceed - CountingPauseFieldPlugin.allowEmitting.countDown(); - - // Since part of the query has not been stopped, we expect some result to emerge here - try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { - // Check that we did not process all the fields on remote-b - // Should not be getting more than one page here, and we set page size to 1 - assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L)); - assertThat(asyncResponse.isRunning(), is(false)); - assertThat(asyncResponse.columns().size(), equalTo(1)); - assertThat(asyncResponse.values().hasNext(), is(true)); - Iterator row = asyncResponse.values().next(); - // sum of 0-9 is 45, and sum of 0-9 squared is 285 - assertThat(row.next(), equalTo(330L)); - - EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); - assertNotNull(executionInfo); - assertThat(executionInfo.isCrossClusterSearch(), is(true)); - long overallTookMillis = executionInfo.overallTook().millis(); - assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); - assertThat(executionInfo.isPartial(), equalTo(true)); - - EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(remoteCluster, remote1NumShards); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); - assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking")); - assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(localCluster, localNumShards); - - assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + assertThat(executionInfo.isPartial(), equalTo(true)); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); + assertClusterInfoSuccess(remoteCluster, remote1NumShards); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking")); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); + assertClusterInfoSuccess(localCluster, localNumShards); + + assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); + } } finally { + // Ensure proper cleanup if the test fails + CountingPauseFieldPlugin.allowEmitting.countDown(); assertAcked(deleteAsyncId(client(), asyncExecutionId)); } } @@ -131,63 +137,66 @@ public void testStopQueryLocal() throws Exception { includeCCSMetadata.v1() ); - // wait until we know that the query against 'remote-b:blocking' has started - SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); - - // wait until the remotes are done - waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); - waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId); + try { + // wait until we know that the query against 'remote-b:blocking' has started + SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); + + // wait until the remotes are done + waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); + waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId); + + /* at this point: + * the query against remotes should be finished + * the query against the local cluster should be running because it's blocked + */ + + // run the stop query + AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); + ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // ensure stop operation is running + assertBusy(() -> { + try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) { + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isPartial(), is(true)); + } + }); + // allow local query to proceed + SimplePauseFieldPlugin.allowEmitting.countDown(); + + // Since part of the query has not been stopped, we expect some result to emerge here + try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { + assertThat(asyncResponse.isRunning(), is(false)); + assertThat(asyncResponse.columns().size(), equalTo(1)); + assertThat(asyncResponse.values().hasNext(), is(true)); + Iterator row = asyncResponse.values().next(); + // sum of 0-9 squared is 285, from two remotes it's 570 + assertThat(row.next(), equalTo(570L)); - /* at this point: - * the query against remotes should be finished - * the query against the local cluster should be running because it's blocked - */ - - // run the stop query - AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); - ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); - // ensure stop operation is running - assertBusy(() -> { - try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) { EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); assertNotNull(executionInfo); - assertThat(executionInfo.isPartial(), is(true)); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + assertThat(executionInfo.isPartial(), equalTo(true)); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); + assertClusterInfoSuccess(remoteCluster, remote1NumShards); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*")); + assertClusterInfoSuccess(remote2Cluster, remote2NumShards); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getIndexExpression(), equalTo("blocking")); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); } - }); - // allow local query to proceed - SimplePauseFieldPlugin.allowEmitting.countDown(); - - // Since part of the query has not been stopped, we expect some result to emerge here - try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { - assertThat(asyncResponse.isRunning(), is(false)); - assertThat(asyncResponse.columns().size(), equalTo(1)); - assertThat(asyncResponse.values().hasNext(), is(true)); - Iterator row = asyncResponse.values().next(); - // sum of 0-9 squared is 285, from two remotes it's 570 - assertThat(row.next(), equalTo(570L)); - - EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); - assertNotNull(executionInfo); - assertThat(executionInfo.isCrossClusterSearch(), is(true)); - long overallTookMillis = executionInfo.overallTook().millis(); - assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); - assertThat(executionInfo.isPartial(), equalTo(true)); - - EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(remoteCluster, remote1NumShards); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); - assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(remote2Cluster, remote2NumShards); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("blocking")); - assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); - - assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); } finally { + SimplePauseFieldPlugin.allowEmitting.countDown(); assertAcked(deleteAsyncId(client(), asyncExecutionId)); } } @@ -205,30 +214,33 @@ public void testStopQueryLocalNoRemotes() throws Exception { includeCCSMetadata.v1() ); - // wait until we know that the query against 'remote-b:blocking' has started - SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); - - /* at this point: - * the query against the local cluster should be running because it's blocked - */ - - // run the stop query - var stopRequest = new AsyncStopRequest(asyncExecutionId); - var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); - // allow local query to proceed - SimplePauseFieldPlugin.allowEmitting.countDown(); - - try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { - assertThat(asyncResponse.isRunning(), is(false)); - assertThat(asyncResponse.columns().size(), equalTo(1)); - assertThat(asyncResponse.values().hasNext(), is(true)); - Iterator row = asyncResponse.values().next(); - assertThat((long) row.next(), greaterThanOrEqualTo(0L)); - - EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); - assertNotNull(executionInfo); - assertThat(executionInfo.isCrossClusterSearch(), is(false)); + try { + // wait until we know that the query against 'remote-b:blocking' has started + SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); + + /* at this point: + * the query against the local cluster should be running because it's blocked + */ + + // run the stop query + var stopRequest = new AsyncStopRequest(asyncExecutionId); + var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // allow local query to proceed + SimplePauseFieldPlugin.allowEmitting.countDown(); + + try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { + assertThat(asyncResponse.isRunning(), is(false)); + assertThat(asyncResponse.columns().size(), equalTo(1)); + assertThat(asyncResponse.values().hasNext(), is(true)); + Iterator row = asyncResponse.values().next(); + assertThat((long) row.next(), greaterThanOrEqualTo(0L)); + + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(false)); + } } finally { + SimplePauseFieldPlugin.allowEmitting.countDown(); assertAcked(deleteAsyncId(client(), asyncExecutionId)); } }