Skip to content

Commit 2f6de7d

Browse files
authored
[8.x] Improve CrossClusterAsyncEnrichStopIT test (elastic#122432) (elastic#122441)
* Improve CrossClusterAsyncEnrichStopIT test (elastic#122432) (cherry picked from commit 88550f6) * fix backport
1 parent 26059b7 commit 2f6de7d

File tree

4 files changed

+23
-9
lines changed

4 files changed

+23
-9
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -531,9 +531,6 @@ tests:
531531
issue: https://github.com/elastic/elasticsearch/issues/121483
532532
- class: org.elasticsearch.xpack.application.FullClusterRestartIT
533533
issue: https://github.com/elastic/elasticsearch/issues/121935
534-
- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncEnrichStopIT
535-
method: testEnrichAfterStop
536-
issue: https://github.com/elastic/elasticsearch/issues/121994
537534
- class: org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT
538535
method: test {date_nanos.Bucket Date nanos by 10 minutes SYNC}
539536
issue: https://github.com/elastic/elasticsearch/issues/122273

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
import org.elasticsearch.common.Strings;
1515
import org.elasticsearch.common.settings.Setting;
1616
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.compute.operator.DriverTaskRunner;
1718
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1819
import org.elasticsearch.core.TimeValue;
1920
import org.elasticsearch.plugins.Plugin;
21+
import org.elasticsearch.tasks.TaskInfo;
2022
import org.elasticsearch.test.AbstractMultiClustersTestCase;
2123
import org.elasticsearch.test.FailingFieldPlugin;
2224
import org.elasticsearch.test.XContentTestUtils;
@@ -273,4 +275,8 @@ protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse
273275
}
274276
return runQuery(request);
275277
}
278+
279+
static List<TaskInfo> getDriverTasks(Client client) {
280+
return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks();
281+
}
276282
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.bytes.BytesReference;
1212
import org.elasticsearch.common.xcontent.XContentHelper;
1313
import org.elasticsearch.plugins.Plugin;
14+
import org.elasticsearch.tasks.TaskInfo;
1415
import org.elasticsearch.xcontent.json.JsonXContent;
1516
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
1617
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -28,10 +29,13 @@
2829

2930
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3031
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
32+
import static org.elasticsearch.xpack.esql.action.AbstractCrossClusterTestCase.getDriverTasks;
3133
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId;
3234
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery;
3335
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster;
36+
import static org.hamcrest.Matchers.empty;
3437
import static org.hamcrest.Matchers.equalTo;
38+
import static org.hamcrest.Matchers.not;
3539

3640
// This tests if enrich after stop works correctly
3741
public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase {
@@ -87,10 +91,23 @@ public void testEnrichAfterStop() throws Exception {
8791
// wait until c1 is done
8892
waitForCluster(client(), "c1", asyncExecutionId);
8993
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);
94+
// wait until remote reduce task starts on c2
95+
assertBusy(() -> {
96+
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
97+
List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
98+
assertThat(reduceTasks, not(empty()));
99+
});
90100

91101
// Run the stop request
92102
var stopRequest = new AsyncStopRequest(asyncExecutionId);
93103
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
104+
// wait until remote reduce tasks are gone
105+
assertBusy(() -> {
106+
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
107+
List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
108+
assertThat(reduceTasks, empty());
109+
});
110+
94111
// Allow the processing to proceed
95112
SimplePauseFieldPlugin.allowEmitting.countDown();
96113

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99

1010
import org.elasticsearch.Build;
1111
import org.elasticsearch.action.ActionFuture;
12-
import org.elasticsearch.client.internal.Client;
13-
import org.elasticsearch.compute.operator.DriverTaskRunner;
1412
import org.elasticsearch.core.Tuple;
1513
import org.elasticsearch.tasks.TaskInfo;
1614
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
@@ -244,8 +242,4 @@ public void testStopQueryLocalNoRemotes() throws Exception {
244242
assertAcked(deleteAsyncId(client(), asyncExecutionId));
245243
}
246244
}
247-
248-
private static List<TaskInfo> getDriverTasks(Client client) {
249-
return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks();
250-
}
251245
}

0 commit comments

Comments
 (0)