From 96acb769bd3c492189c10e00e64786a99045b9a6 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 12 Feb 2025 13:40:27 -0700 Subject: [PATCH 1/4] Improve CrossClusterAsyncEnrichStopIT test Wait until reduce is finished Fixes https://github.com/elastic/elasticsearch/issues/121994 --- .../esql/action/AbstractCrossClusterTestCase.java | 6 ++++++ .../esql/action/CrossClusterAsyncEnrichStopIT.java | 10 ++++++++++ .../esql/action/CrossClusterAsyncQueryStopIT.java | 4 ---- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index 571e89c3fa501..510f5945f745a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -14,9 +14,11 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.XContentTestUtils; @@ -273,4 +275,8 @@ protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse } return runQuery(request); } + + static List getDriverTasks(Client client) { + return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 99a81c60a9ad2..951d2869dfc25 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.async.AsyncStopRequest; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -28,9 +29,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.elasticsearch.xpack.esql.action.AbstractCrossClusterTestCase.getDriverTasks; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; // This tests if enrich after stop works correctly @@ -91,6 +94,13 @@ public void testEnrichAfterStop() throws Exception { // Run the stop request var stopRequest = new AsyncStopRequest(asyncExecutionId); var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // wait until reduce tasks are gone + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList(); + assertThat(reduceTasks, empty()); + }); + // Allow the processing to proceed SimplePauseFieldPlugin.allowEmitting.countDown(); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 8a09c4af8ca51..041b3124e82e3 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -244,8 +244,4 @@ public void testStopQueryLocalNoRemotes() throws Exception { assertAcked(deleteAsyncId(client(), asyncExecutionId)); } } - - private static List getDriverTasks(Client client) { - return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); - } } From b2d81e81599397d9d628658bd5f6b45ed54145b1 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 12 Feb 2025 20:48:09 +0000 Subject: [PATCH 2/4] [CI] Auto commit changes from spotless --- .../xpack/esql/action/CrossClusterAsyncQueryStopIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 041b3124e82e3..7401a2838ae82 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -9,8 +9,6 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.core.Tuple; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xpack.core.async.AsyncStopRequest; From da1612e7c9c36c2170be81ddf08e985003f7d514 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 12 Feb 2025 14:28:07 -0700 Subject: [PATCH 3/4] Update condition to only target remote_reduce --- .../xpack/esql/action/CrossClusterAsyncEnrichStopIT.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 951d2869dfc25..120cd89f3406e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.json.JsonXContent; @@ -94,10 +95,12 @@ public void testEnrichAfterStop() throws Exception { // Run the stop request var stopRequest = new AsyncStopRequest(asyncExecutionId); var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); - // wait until reduce tasks are gone + // wait until remote reduce tasks are gone assertBusy(() -> { List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); - List reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList(); + List reduceTasks = tasks.stream() + .filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce")) + .toList(); assertThat(reduceTasks, empty()); }); From 31c88edfde80bdca84c0762a2c8d5876bba53d60 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 12 Feb 2025 14:47:05 -0700 Subject: [PATCH 4/4] Add another chec for remote_reduce start --- .../xpack/esql/action/CrossClusterAsyncEnrichStopIT.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 120cd89f3406e..1d6acf51db032 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -36,6 +36,7 @@ import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; // This tests if enrich after stop works correctly public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase { @@ -91,6 +92,14 @@ public void testEnrichAfterStop() throws Exception { // wait until c1 is done waitForCluster(client(), "c1", asyncExecutionId); waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); + // wait until remote reduce task starts on c2 + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream() + .filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce")) + .toList(); + assertThat(reduceTasks, not(empty())); + }); // Run the stop request var stopRequest = new AsyncStopRequest(asyncExecutionId);