Skip to content

Commit 0bdf397

Browse files
authored
[9.0] Improve CrossClusterAsyncEnrichStopIT test (elastic#122432) (elastic#122442)
* Improve CrossClusterAsyncEnrichStopIT test (elastic#122432) (cherry picked from commit 88550f6) # Conflicts: # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java * fix backport
1 parent aef6531 commit 0bdf397

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
11+
import org.elasticsearch.client.internal.Client;
1112
import org.elasticsearch.common.bytes.BytesReference;
1213
import org.elasticsearch.common.xcontent.XContentHelper;
14+
import org.elasticsearch.compute.operator.DriverStatus;
15+
import org.elasticsearch.compute.operator.DriverTaskRunner;
1316
import org.elasticsearch.plugins.Plugin;
17+
import org.elasticsearch.tasks.TaskInfo;
1418
import org.elasticsearch.xcontent.json.JsonXContent;
1519
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
1620
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -31,7 +35,9 @@
3135
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId;
3236
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery;
3337
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster;
38+
import static org.hamcrest.Matchers.empty;
3439
import static org.hamcrest.Matchers.equalTo;
40+
import static org.hamcrest.Matchers.not;
3541

3642
// This tests if enrich after stop works correctly
3743
public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase {
@@ -87,10 +93,27 @@ public void testEnrichAfterStop() throws Exception {
8793
// wait until c1 is done
8894
waitForCluster(client(), "c1", asyncExecutionId);
8995
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);
96+
// wait until remote reduce task starts on c2
97+
assertBusy(() -> {
98+
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
99+
List<TaskInfo> reduceTasks = tasks.stream()
100+
.filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce"))
101+
.toList();
102+
assertThat(reduceTasks, not(empty()));
103+
});
90104

91105
// Run the stop request
92106
var stopRequest = new AsyncStopRequest(asyncExecutionId);
93107
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
108+
// wait until remote reduce tasks are gone
109+
assertBusy(() -> {
110+
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
111+
List<TaskInfo> reduceTasks = tasks.stream()
112+
.filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce"))
113+
.toList();
114+
assertThat(reduceTasks, empty());
115+
});
116+
94117
// Allow the processing to proceed
95118
SimplePauseFieldPlugin.allowEmitting.countDown();
96119

@@ -153,4 +176,8 @@ record Event(long timestamp, String user, String host) {}
153176
}
154177
client.admin().indices().prepareRefresh("events").get();
155178
}
179+
180+
static List<TaskInfo> getDriverTasks(Client client) {
181+
return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks();
182+
}
156183
}

0 commit comments

Comments
 (0)