Skip to content

Commit eb0020f

Browse files
Introduce more parallelism into cross cluster test bootstrapping (#117820)
We can parallelize starting the clusters and a few other things to effectively speed up these tests by 2x which comes out to about a minute of execution time saved for all of those in :server:internalClusterTests on my workstation.
1 parent 9a81eb2 commit eb0020f

File tree

26 files changed

+80
-73
lines changed

26 files changed

+80
-73
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/ResolveClusterDataStreamIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public class ResolveClusterDataStreamIT extends AbstractMultiClustersTestCase {
7878
private static long LATEST_TIMESTAMP = 1691348820000L;
7979

8080
@Override
81-
protected Collection<String> remoteClusterAlias() {
81+
protected List<String> remoteClusterAlias() {
8282
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
8383
}
8484

modules/lang-painless/src/internalClusterTest/java/org/elasticsearch/painless/action/CrossClusterPainlessExecuteIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class CrossClusterPainlessExecuteIT extends AbstractMultiClustersTestCase
5454
private static final String KEYWORD_FIELD = "my_field";
5555

5656
@Override
57-
protected Collection<String> remoteClusterAlias() {
57+
protected List<String> remoteClusterAlias() {
5858
return List.of(REMOTE_CLUSTER);
5959
}
6060

modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/CrossClusterReindexIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ protected boolean reuseClusters() {
3636
}
3737

3838
@Override
39-
protected Collection<String> remoteClusterAlias() {
39+
protected List<String> remoteClusterAlias() {
4040
return List.of(REMOTE_CLUSTER);
4141
}
4242

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.test.InternalTestCluster;
1616
import org.elasticsearch.test.NodeRoles;
1717

18-
import java.util.Collection;
1918
import java.util.HashSet;
2019
import java.util.List;
2120
import java.util.Set;
@@ -24,7 +23,7 @@
2423

2524
public class RemoteInfoIT extends AbstractMultiClustersTestCase {
2625
@Override
27-
protected Collection<String> remoteClusterAlias() {
26+
protected List<String> remoteClusterAlias() {
2827
if (randomBoolean()) {
2928
return List.of();
3029
} else {

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRemoteIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.test.InternalTestCluster;
2424
import org.junit.Assert;
2525

26-
import java.util.Collection;
2726
import java.util.List;
2827
import java.util.Map;
2928
import java.util.concurrent.ExecutionException;
@@ -51,7 +50,7 @@ protected boolean reuseClusters() {
5150
}
5251

5352
@Override
54-
protected Collection<String> remoteClusterAlias() {
53+
protected List<String> remoteClusterAlias() {
5554
return List.of(REMOTE1, REMOTE2);
5655
}
5756

server/src/internalClusterTest/java/org/elasticsearch/action/search/CCSPointInTimeIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class CCSPointInTimeIT extends AbstractMultiClustersTestCase {
4444
public static final String REMOTE_CLUSTER = "remote_cluster";
4545

4646
@Override
47-
protected Collection<String> remoteClusterAlias() {
47+
protected List<String> remoteClusterAlias() {
4848
return List.of(REMOTE_CLUSTER);
4949
}
5050

server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/ResolveClusterIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.transport.RemoteClusterAware;
2929

3030
import java.io.IOException;
31-
import java.util.Collection;
3231
import java.util.HashMap;
3332
import java.util.List;
3433
import java.util.Map;
@@ -54,7 +53,7 @@ public class ResolveClusterIT extends AbstractMultiClustersTestCase {
5453
private static long LATEST_TIMESTAMP = 1691348820000L;
5554

5655
@Override
57-
protected Collection<String> remoteClusterAlias() {
56+
protected List<String> remoteClusterAlias() {
5857
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
5958
}
6059

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSCanMatchIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class CCSCanMatchIT extends AbstractMultiClustersTestCase {
5555
static final String REMOTE_CLUSTER = "cluster_a";
5656

5757
@Override
58-
protected Collection<String> remoteClusterAlias() {
58+
protected List<String> remoteClusterAlias() {
5959
return List.of("cluster_a");
6060
}
6161

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSUsageTelemetryIT.java

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,19 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
1516
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.Result;
17+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1618
import org.elasticsearch.action.search.ClosePointInTimeRequest;
1719
import org.elasticsearch.action.search.OpenPointInTimeRequest;
1820
import org.elasticsearch.action.search.SearchRequest;
19-
import org.elasticsearch.action.search.SearchResponse;
2021
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
2122
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
2223
import org.elasticsearch.action.search.TransportSearchAction;
2324
import org.elasticsearch.action.support.PlainActionFuture;
25+
import org.elasticsearch.action.support.RefCountingListener;
26+
import org.elasticsearch.action.support.WriteRequest;
2427
import org.elasticsearch.client.internal.Client;
2528
import org.elasticsearch.common.bytes.BytesReference;
2629
import org.elasticsearch.common.settings.Settings;
@@ -78,7 +81,7 @@ protected boolean reuseClusters() {
7881
}
7982

8083
@Override
81-
protected Collection<String> remoteClusterAlias() {
84+
protected List<String> remoteClusterAlias() {
8285
return List.of(REMOTE1, REMOTE2);
8386
}
8487

@@ -126,12 +129,9 @@ private CCSTelemetrySnapshot getTelemetryFromFailedSearch(SearchRequest searchRe
126129
// We want to send search to a specific node (we don't care which one) so that we could
127130
// collect the CCS telemetry from it later
128131
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
129-
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
130-
cluster(LOCAL_CLUSTER).client(nodeName).search(searchRequest, queryFuture);
131-
assertBusy(() -> assertTrue(queryFuture.isDone()));
132132

133133
// We expect failure, but we don't care too much which failure it is in this test
134-
ExecutionException ee = expectThrows(ExecutionException.class, queryFuture::get);
134+
ExecutionException ee = expectThrows(ExecutionException.class, cluster(LOCAL_CLUSTER).client(nodeName).search(searchRequest)::get);
135135
assertNotNull(ee.getCause());
136136

137137
return getTelemetrySnapshot(nodeName);
@@ -637,56 +637,62 @@ private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
637637
return usage.getCcsUsageHolder().getCCSTelemetrySnapshot();
638638
}
639639

640-
private Map<String, Object> setupClusters() {
640+
private Map<String, Object> setupClusters() throws ExecutionException, InterruptedException {
641641
String localIndex = "demo";
642+
String remoteIndex = "prod";
642643
int numShardsLocal = randomIntBetween(2, 10);
643644
Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build();
644-
assertAcked(
645+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
646+
try (RefCountingListener refCountingListener = new RefCountingListener(future)) {
645647
client(LOCAL_CLUSTER).admin()
646648
.indices()
647649
.prepareCreate(localIndex)
648650
.setSettings(localSettings)
649651
.setMapping("@timestamp", "type=date", "f", "type=text")
650-
);
651-
indexDocs(client(LOCAL_CLUSTER), localIndex);
652-
653-
String remoteIndex = "prod";
654-
int numShardsRemote = randomIntBetween(2, 10);
655-
for (String clusterAlias : remoteClusterAlias()) {
656-
final InternalTestCluster remoteCluster = cluster(clusterAlias);
657-
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
658-
assertAcked(
652+
.execute(refCountingListener.acquire(r -> {
653+
assertAcked(r);
654+
indexDocs(client(LOCAL_CLUSTER), localIndex, refCountingListener.acquire());
655+
}));
656+
657+
int numShardsRemote = randomIntBetween(2, 10);
658+
var remotes = remoteClusterAlias();
659+
runInParallel(remotes.size(), i -> {
660+
final String clusterAlias = remotes.get(i);
661+
final InternalTestCluster remoteCluster = cluster(clusterAlias);
662+
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
659663
client(clusterAlias).admin()
660664
.indices()
661665
.prepareCreate(remoteIndex)
662666
.setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1)))
663667
.setMapping("@timestamp", "type=date", "f", "type=text")
664-
);
665-
assertFalse(
666-
client(clusterAlias).admin()
667-
.cluster()
668-
.prepareHealth(TEST_REQUEST_TIMEOUT, remoteIndex)
669-
.setWaitForYellowStatus()
670-
.setTimeout(TimeValue.timeValueSeconds(10))
671-
.get()
672-
.isTimedOut()
673-
);
674-
indexDocs(client(clusterAlias), remoteIndex);
668+
.execute(refCountingListener.acquire(r -> {
669+
assertAcked(r);
670+
client(clusterAlias).admin()
671+
.cluster()
672+
.prepareHealth(TEST_REQUEST_TIMEOUT, remoteIndex)
673+
.setWaitForYellowStatus()
674+
.setTimeout(TimeValue.timeValueSeconds(10))
675+
.execute(refCountingListener.acquire(healthResponse -> {
676+
assertFalse(healthResponse.isTimedOut());
677+
indexDocs(client(clusterAlias), remoteIndex, refCountingListener.acquire());
678+
}));
679+
}));
680+
});
675681
}
676-
682+
future.get();
677683
Map<String, Object> clusterInfo = new HashMap<>();
678684
clusterInfo.put("local.index", localIndex);
679685
clusterInfo.put("remote.index", remoteIndex);
680686
return clusterInfo;
681687
}
682688

683-
private int indexDocs(Client client, String index) {
689+
private void indexDocs(Client client, String index, ActionListener<Void> listener) {
684690
int numDocs = between(5, 20);
691+
final BulkRequestBuilder bulkRequest = client.prepareBulk();
685692
for (int i = 0; i < numDocs; i++) {
686-
client.prepareIndex(index).setSource("f", "v", "@timestamp", randomNonNegativeLong()).get();
693+
bulkRequest.add(client.prepareIndex(index).setSource("f", "v", "@timestamp", randomNonNegativeLong()));
687694
}
688-
client.admin().indices().prepareRefresh(index).get();
689-
return numDocs;
695+
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute(listener.safeMap(r -> null));
690696
}
691697

692698
/**

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
public class CrossClusterIT extends AbstractMultiClustersTestCase {
8787

8888
@Override
89-
protected Collection<String> remoteClusterAlias() {
89+
protected List<String> remoteClusterAlias() {
9090
return List.of("cluster_a");
9191
}
9292

0 commit comments

Comments
 (0)