Skip to content

Commit 7f05a31

Browse files
authored
[8.18] Improve CrossClusterAsyncEnrichStopIT test (elastic#122432) (elastic#122441) (elastic#122453)
* [8.x] Improve CrossClusterAsyncEnrichStopIT test (elastic#122432) (elastic#122441) * Improve CrossClusterAsyncEnrichStopIT test (elastic#122432) (cherry picked from commit 88550f6) * fix backport (cherry picked from commit 2f6de7d) # Conflicts: # muted-tests.yml # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java * fix backport
1 parent a63e712 commit 7f05a31

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
11+
import org.elasticsearch.client.internal.Client;
1112
import org.elasticsearch.common.bytes.BytesReference;
1213
import org.elasticsearch.common.xcontent.XContentHelper;
14+
import org.elasticsearch.compute.operator.DriverTaskRunner;
1315
import org.elasticsearch.plugins.Plugin;
16+
import org.elasticsearch.tasks.TaskInfo;
1417
import org.elasticsearch.xcontent.json.JsonXContent;
1518
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
1619
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -31,7 +34,9 @@
3134
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId;
3235
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery;
3336
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster;
37+
import static org.hamcrest.Matchers.empty;
3438
import static org.hamcrest.Matchers.equalTo;
39+
import static org.hamcrest.Matchers.not;
3540

3641
// This tests if enrich after stop works correctly
3742
public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase {
@@ -87,10 +92,23 @@ public void testEnrichAfterStop() throws Exception {
8792
// wait until c1 is done
8893
waitForCluster(client(), "c1", asyncExecutionId);
8994
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);
95+
// wait until remote reduce task starts on c2
96+
assertBusy(() -> {
97+
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
98+
List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
99+
assertThat(reduceTasks, not(empty()));
100+
});
90101

91102
// Run the stop request
92103
var stopRequest = new AsyncStopRequest(asyncExecutionId);
93104
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
105+
// wait until remote reduce tasks are gone
106+
assertBusy(() -> {
107+
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
108+
List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
109+
assertThat(reduceTasks, empty());
110+
});
111+
94112
// Allow the processing to proceed
95113
SimplePauseFieldPlugin.allowEmitting.countDown();
96114

@@ -153,4 +171,9 @@ record Event(long timestamp, String user, String host) {}
153171
}
154172
client.admin().indices().prepareRefresh("events").get();
155173
}
174+
175+
static List<TaskInfo> getDriverTasks(Client client) {
176+
return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks();
177+
}
178+
156179
}

0 commit comments

Comments
 (0)