From b573b99b332e27ba5d7695487583241144ac17cf Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 12 Feb 2025 15:37:44 -0700 Subject: [PATCH 1/2] Improve CrossClusterAsyncEnrichStopIT test (#122432) (cherry picked from commit 88550f61265c81db6674b18ba0280e58c31315b3) # Conflicts: # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java --- .../action/CrossClusterAsyncEnrichStopIT.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 99a81c60a9ad2..1d6acf51db032 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -10,7 +10,9 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.async.AsyncStopRequest; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -28,10 +30,13 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.elasticsearch.xpack.esql.action.AbstractCrossClusterTestCase.getDriverTasks; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; // This tests if enrich after stop works correctly public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase { @@ -87,10 +92,27 @@ public void testEnrichAfterStop() throws Exception { // wait until c1 is done waitForCluster(client(), "c1", asyncExecutionId); waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); + // wait until remote reduce task starts on c2 + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream() + .filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce")) + .toList(); + assertThat(reduceTasks, not(empty())); + }); // Run the stop request var stopRequest = new AsyncStopRequest(asyncExecutionId); var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // wait until remote reduce tasks are gone + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream() + .filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce")) + .toList(); + assertThat(reduceTasks, empty()); + }); + // Allow the processing to proceed SimplePauseFieldPlugin.allowEmitting.countDown(); From 459121395124e22762f1fd76e46d6ee3adde777b Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 12 Feb 2025 15:49:59 -0700 Subject: [PATCH 2/2] fix backport --- .../xpack/esql/action/CrossClusterAsyncEnrichStopIT.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 1d6acf51db032..59cb6eff68831 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -8,9 +8,11 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.compute.operator.DriverStatus; +import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.json.JsonXContent; @@ -30,7 +32,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; -import static org.elasticsearch.xpack.esql.action.AbstractCrossClusterTestCase.getDriverTasks; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; @@ -175,4 +176,8 @@ record Event(long timestamp, String user, String host) {} } client.admin().indices().prepareRefresh("events").get(); } + + static List getDriverTasks(Client client) { + return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); + } }