diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 99a81c60a9ad2..59cb6eff68831 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -8,9 +8,13 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.compute.operator.DriverStatus; +import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.async.AsyncStopRequest; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -31,7 +35,9 @@ import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; // This tests if enrich after stop works correctly public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase { @@ -87,10 +93,27 @@ public void testEnrichAfterStop() throws Exception { // wait until c1 is done waitForCluster(client(), "c1", asyncExecutionId); waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); + // wait until remote reduce task starts on c2 + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream() + .filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce")) + .toList(); + assertThat(reduceTasks, not(empty())); + }); // Run the stop request var stopRequest = new AsyncStopRequest(asyncExecutionId); var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // wait until remote reduce tasks are gone + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream() + .filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce")) + .toList(); + assertThat(reduceTasks, empty()); + }); + // Allow the processing to proceed SimplePauseFieldPlugin.allowEmitting.countDown(); @@ -153,4 +176,8 @@ record Event(long timestamp, String user, String host) {} } client.admin().indices().prepareRefresh("events").get(); } + + static List getDriverTasks(Client client) { + return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); + } }