From a5fbe90637d17193683cff67f7c61a9f39ad6a0a Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 12 Feb 2025 16:58:46 -0700 Subject: [PATCH 1/2] [8.x] Improve CrossClusterAsyncEnrichStopIT test (#122432) (#122441) * Improve CrossClusterAsyncEnrichStopIT test (#122432) (cherry picked from commit 88550f61265c81db6674b18ba0280e58c31315b3) * fix backport (cherry picked from commit 2f6de7d9e83f47ee156f1df2026b76350ba3de62) # Conflicts: # muted-tests.yml # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java --- .../action/CrossClusterAsyncEnrichStopIT.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index 99a81c60a9ad2..e8b3c21196a45 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.async.AsyncStopRequest; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -28,10 +29,13 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.elasticsearch.xpack.esql.action.AbstractCrossClusterTestCase.getDriverTasks; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; // This tests if enrich after stop works correctly public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase { @@ -87,10 +91,23 @@ public void testEnrichAfterStop() throws Exception { // wait until c1 is done waitForCluster(client(), "c1", asyncExecutionId); waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); + // wait until remote reduce task starts on c2 + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList(); + assertThat(reduceTasks, not(empty())); + }); // Run the stop request var stopRequest = new AsyncStopRequest(asyncExecutionId); var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // wait until remote reduce tasks are gone + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList(); + assertThat(reduceTasks, empty()); + }); + // Allow the processing to proceed SimplePauseFieldPlugin.allowEmitting.countDown(); From 9ae0e888add0ac5def036102ef23a01297ebf9ec Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 12 Feb 2025 18:15:59 -0700 Subject: [PATCH 2/2] fix backport --- .../xpack/esql/action/CrossClusterAsyncEnrichStopIT.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java index e8b3c21196a45..049a4cbec3b20 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java @@ -8,8 +8,10 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xcontent.json.JsonXContent; @@ -29,7 +31,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; -import static org.elasticsearch.xpack.esql.action.AbstractCrossClusterTestCase.getDriverTasks; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; @@ -170,4 +171,9 @@ record Event(long timestamp, String user, String host) {} } client.admin().indices().prepareRefresh("events").get(); } + + static List getDriverTasks(Client client) { + return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); + } + }