diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index 571e89c3fa501..510f5945f745a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -14,9 +14,11 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.XContentTestUtils; @@ -273,4 +275,8 @@ protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse } return runQuery(request); } + + static List getDriverTasks(Client client) { + return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 99a81c60a9ad2..1d6acf51db032 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -10,7 +10,9 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.async.AsyncStopRequest; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -28,10 +30,13 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.elasticsearch.xpack.esql.action.AbstractCrossClusterTestCase.getDriverTasks; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; // This tests if enrich after stop works correctly public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase { @@ -87,10 +92,27 @@ public void testEnrichAfterStop() throws Exception { // wait until c1 is done waitForCluster(client(), "c1", asyncExecutionId); waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); + // wait until remote reduce task starts on c2 + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream() + .filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce")) + .toList(); + assertThat(reduceTasks, not(empty())); + }); // Run the stop request var stopRequest = new AsyncStopRequest(asyncExecutionId); var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // wait until remote reduce tasks are gone + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream() + .filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce")) + .toList(); + assertThat(reduceTasks, empty()); + }); + // Allow the processing to proceed SimplePauseFieldPlugin.allowEmitting.countDown(); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 8a09c4af8ca51..7401a2838ae82 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -9,8 +9,6 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.core.Tuple; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xpack.core.async.AsyncStopRequest; @@ -244,8 +242,4 @@ public void testStopQueryLocalNoRemotes() throws Exception { assertAcked(deleteAsyncId(client(), asyncExecutionId)); } } - - private static List getDriverTasks(Client client) { - return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); - } }