Skip to content

Commit 9866009

Browse files
committed
CCS tests
1 parent 4f7036e commit 9866009

File tree

7 files changed

+331
-152
lines changed

7 files changed

+331
-152
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.search.ccs;
11+
12+
import org.elasticsearch.client.internal.Client;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.settings.Setting;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.core.TimeValue;
17+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
18+
import org.elasticsearch.test.InternalTestCluster;
19+
import org.elasticsearch.transport.RemoteClusterAware;
20+
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
26+
27+
/**
28+
* Base class for cross-cluster search (CCS) integration tests.
29+
* Presumed 2-node cluster, with remote called REMOTE_CLUSTER.
30+
*/
31+
public abstract class AbstractCrossClusterSearchTestCase extends AbstractMultiClustersTestCase {
32+
33+
protected static final String REMOTE_CLUSTER = "cluster_a";
34+
protected static final long EARLIEST_TIMESTAMP = 1691348810000L;
35+
protected static final long LATEST_TIMESTAMP = 1691348820000L;
36+
37+
@Override
38+
protected List<String> remoteClusterAlias() {
39+
return List.of(REMOTE_CLUSTER);
40+
}
41+
42+
@Override
43+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
44+
return Map.of(REMOTE_CLUSTER, randomBoolean());
45+
}
46+
47+
@Override
48+
protected boolean reuseClusters() {
49+
// Don't reuse by default, most test suites are kinda nasty...
50+
return false;
51+
}
52+
53+
/**
54+
* Set up local and remote indices with the same mapping and timestamped docs.
55+
* Returns a map with "local.num_shards", "remote.num_shards", "remote.skip_unavailable",
56+
* and optionally "local.index" / "remote.index" when using the no-arg overload.
57+
*/
58+
protected Map<String, Object> setupTwoClusters(String[] localIndices, String[] remoteIndices) {
59+
int numShardsLocal = randomIntBetween(2, 10);
60+
Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build();
61+
for (String localIndex : localIndices) {
62+
assertAcked(
63+
client(LOCAL_CLUSTER).admin()
64+
.indices()
65+
.prepareCreate(localIndex)
66+
.setSettings(localSettings)
67+
.setMapping("@timestamp", "type=date", "f", "type=text")
68+
);
69+
indexDocsWithTimestamp(client(LOCAL_CLUSTER), localIndex);
70+
}
71+
72+
int numShardsRemote = randomIntBetween(2, 10);
73+
final InternalTestCluster remoteCluster = cluster(REMOTE_CLUSTER);
74+
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3));
75+
for (String remoteIndex : remoteIndices) {
76+
assertAcked(
77+
client(REMOTE_CLUSTER).admin()
78+
.indices()
79+
.prepareCreate(remoteIndex)
80+
.setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1)))
81+
.setMapping("@timestamp", "type=date", "f", "type=text")
82+
);
83+
assertFalse(
84+
client(REMOTE_CLUSTER).admin()
85+
.cluster()
86+
.prepareHealth(TEST_REQUEST_TIMEOUT, remoteIndex)
87+
.setWaitForYellowStatus()
88+
.setTimeout(TimeValue.timeValueSeconds(10))
89+
.get()
90+
.isTimedOut()
91+
);
92+
indexDocsWithTimestamp(client(REMOTE_CLUSTER), remoteIndex);
93+
}
94+
95+
String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
96+
Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
97+
boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
98+
.getClusterSettings()
99+
.get(skipUnavailableSetting);
100+
101+
Map<String, Object> clusterInfo = new HashMap<>();
102+
clusterInfo.put("local.num_shards", numShardsLocal);
103+
clusterInfo.put("remote.num_shards", numShardsRemote);
104+
clusterInfo.put("remote.skip_unavailable", skipUnavailable);
105+
return clusterInfo;
106+
}
107+
108+
/**
109+
* Set up two clusters with default indices "demo" (local) and "prod" (remote).
110+
* Adds "local.index" and "remote.index" to the returned map.
111+
*/
112+
protected Map<String, Object> setupTwoClusters() {
113+
Map<String, Object> clusterInfo = setupTwoClusters(new String[] { "demo" }, new String[] { "prod" });
114+
clusterInfo.put("local.index", "demo");
115+
clusterInfo.put("remote.index", "prod");
116+
return clusterInfo;
117+
}
118+
119+
/**
120+
* Index documents with {@code f} and {@code @timestamp} fields for tests that rely on
121+
* a time range.
122+
*/
123+
protected int indexDocsWithTimestamp(Client client, String index) {
124+
int numDocs = between(500, 1200);
125+
for (int i = 0; i < numDocs; i++) {
126+
long ts = EARLIEST_TIMESTAMP + i;
127+
if (i == numDocs - 1) {
128+
ts = LATEST_TIMESTAMP;
129+
}
130+
client.prepareIndex(index).setSource("f", "v", "@timestamp", ts).get();
131+
}
132+
client.admin().indices().prepareRefresh(index).get();
133+
return numDocs;
134+
}
135+
136+
protected int indexDocs(Client client, String field, String index) {
137+
int numDocs = between(1, 200);
138+
for (int i = 0; i < numDocs; i++) {
139+
client.prepareIndex(index).setSource(field, "v" + i).get();
140+
}
141+
client.admin().indices().prepareRefresh(index).get();
142+
return numDocs;
143+
}
144+
}

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSCanMatchIT.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,7 @@
5151
import static org.hamcrest.Matchers.equalTo;
5252
import static org.hamcrest.Matchers.in;
5353

54-
public class CCSCanMatchIT extends AbstractMultiClustersTestCase {
55-
static final String REMOTE_CLUSTER = "cluster_a";
56-
57-
@Override
58-
protected List<String> remoteClusterAlias() {
59-
return List.of("cluster_a");
60-
}
54+
public class CCSCanMatchIT extends AbstractCrossClusterSearchTestCase {
6155

6256
private static class EngineWithExposingTimestamp extends InternalEngine {
6357
EngineWithExposingTimestamp(EngineConfig engineConfig) {

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterIT.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.tasks.CancellableTask;
4848
import org.elasticsearch.tasks.TaskCancelledException;
4949
import org.elasticsearch.tasks.TaskInfo;
50-
import org.elasticsearch.test.AbstractMultiClustersTestCase;
5150
import org.elasticsearch.test.InternalTestCluster;
5251
import org.elasticsearch.test.NodeRoles;
5352
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
@@ -83,25 +82,10 @@
8382
// formerly called CrossClusterSearchIT, but since this one mostly does
8483
// actions besides searching, it was renamed so a new CrossClusterSearchIT
8584
// can focus on several cross cluster search scenarios.
86-
public class CrossClusterIT extends AbstractMultiClustersTestCase {
85+
public class CrossClusterIT extends AbstractCrossClusterSearchTestCase {
8786

88-
@Override
89-
protected List<String> remoteClusterAlias() {
90-
return List.of("cluster_a");
91-
}
92-
93-
@Override
94-
protected boolean reuseClusters() {
95-
return false;
96-
}
97-
98-
private int indexDocs(Client client, String index) {
99-
int numDocs = between(1, 10);
100-
for (int i = 0; i < numDocs; i++) {
101-
client.prepareIndex(index).setSource("f", "v").get();
102-
}
103-
client.admin().indices().prepareRefresh(index).get();
104-
return numDocs;
87+
protected int indexDocs(Client client, String index) {
88+
return indexDocs(client, "f", index);
10589
}
10690

10791
public void testRemoteClusterClientRole() throws Exception {

server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CrossClusterSearchIT.java

Lines changed: 3 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@
1818
import org.elasticsearch.action.search.ShardSearchFailure;
1919
import org.elasticsearch.action.search.TransportSearchAction;
2020
import org.elasticsearch.action.support.PlainActionFuture;
21-
import org.elasticsearch.client.internal.Client;
22-
import org.elasticsearch.common.Strings;
23-
import org.elasticsearch.common.settings.Setting;
24-
import org.elasticsearch.common.settings.Settings;
2521
import org.elasticsearch.common.util.CollectionUtils;
2622
import org.elasticsearch.core.TimeValue;
2723
import org.elasticsearch.index.query.MatchAllQueryBuilder;
@@ -31,20 +27,16 @@
3127
import org.elasticsearch.search.builder.SearchSourceBuilder;
3228
import org.elasticsearch.search.query.SlowRunningQueryBuilder;
3329
import org.elasticsearch.search.query.ThrowingQueryBuilder;
34-
import org.elasticsearch.test.AbstractMultiClustersTestCase;
35-
import org.elasticsearch.test.InternalTestCluster;
3630
import org.elasticsearch.transport.RemoteClusterAware;
3731
import org.elasticsearch.transport.RemoteTransportException;
3832

3933
import java.util.Collection;
40-
import java.util.HashMap;
4134
import java.util.List;
4235
import java.util.Map;
4336
import java.util.Objects;
4437
import java.util.concurrent.ExecutionException;
4538
import java.util.concurrent.TimeUnit;
4639

47-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4840
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
4941
import static org.hamcrest.Matchers.anyOf;
5042
import static org.hamcrest.Matchers.containsString;
@@ -53,32 +45,18 @@
5345
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5446
import static org.hamcrest.Matchers.instanceOf;
5547

56-
public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
57-
58-
private static final String REMOTE_CLUSTER = "cluster_a";
59-
private static long EARLIEST_TIMESTAMP = 1691348810000L;
60-
private static long LATEST_TIMESTAMP = 1691348820000L;
61-
62-
@Override
63-
protected List<String> remoteClusterAlias() {
64-
return List.of(REMOTE_CLUSTER);
65-
}
48+
public class CrossClusterSearchIT extends AbstractCrossClusterSearchTestCase {
6649

6750
@Override
68-
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
69-
return Map.of(REMOTE_CLUSTER, randomBoolean());
51+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
52+
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), TestQueryBuilderPlugin.class);
7053
}
7154

7255
@Override
7356
protected boolean reuseClusters() {
7457
return false;
7558
}
7659

77-
@Override
78-
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
79-
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CrossClusterSearchIT.TestQueryBuilderPlugin.class);
80-
}
81-
8260
public static class TestQueryBuilderPlugin extends Plugin implements SearchPlugin {
8361
public TestQueryBuilderPlugin() {}
8462

@@ -827,73 +805,4 @@ private static void assertOneFailedShard(Cluster cluster, int totalShards) {
827805
assertThat(remoteShardSearchFailure.reason(), containsString("index corrupted"));
828806
}
829807

830-
private Map<String, Object> setupTwoClusters(String[] localIndices, String[] remoteIndices) {
831-
int numShardsLocal = randomIntBetween(2, 10);
832-
Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build();
833-
for (String localIndex : localIndices) {
834-
assertAcked(
835-
client(LOCAL_CLUSTER).admin()
836-
.indices()
837-
.prepareCreate(localIndex)
838-
.setSettings(localSettings)
839-
.setMapping("@timestamp", "type=date", "f", "type=text")
840-
);
841-
indexDocs(client(LOCAL_CLUSTER), localIndex);
842-
}
843-
844-
int numShardsRemote = randomIntBetween(2, 10);
845-
final InternalTestCluster remoteCluster = cluster(REMOTE_CLUSTER);
846-
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3));
847-
for (String remoteIndex : remoteIndices) {
848-
assertAcked(
849-
client(REMOTE_CLUSTER).admin()
850-
.indices()
851-
.prepareCreate(remoteIndex)
852-
.setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1)))
853-
.setMapping("@timestamp", "type=date", "f", "type=text")
854-
);
855-
assertFalse(
856-
client(REMOTE_CLUSTER).admin()
857-
.cluster()
858-
.prepareHealth(TEST_REQUEST_TIMEOUT, remoteIndex)
859-
.setWaitForYellowStatus()
860-
.setTimeout(TimeValue.timeValueSeconds(10))
861-
.get()
862-
.isTimedOut()
863-
);
864-
indexDocs(client(REMOTE_CLUSTER), remoteIndex);
865-
}
866-
867-
String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
868-
Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
869-
boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
870-
.getClusterSettings()
871-
.get(skipUnavailableSetting);
872-
873-
Map<String, Object> clusterInfo = new HashMap<>();
874-
clusterInfo.put("local.num_shards", numShardsLocal);
875-
clusterInfo.put("remote.num_shards", numShardsRemote);
876-
clusterInfo.put("remote.skip_unavailable", skipUnavailable);
877-
return clusterInfo;
878-
}
879-
880-
private Map<String, Object> setupTwoClusters() {
881-
var clusterInfo = setupTwoClusters(new String[] { "demo" }, new String[] { "prod" });
882-
clusterInfo.put("local.index", "demo");
883-
clusterInfo.put("remote.index", "prod");
884-
return clusterInfo;
885-
}
886-
887-
private int indexDocs(Client client, String index) {
888-
int numDocs = between(500, 1200);
889-
for (int i = 0; i < numDocs; i++) {
890-
long ts = EARLIEST_TIMESTAMP + i;
891-
if (i == numDocs - 1) {
892-
ts = LATEST_TIMESTAMP;
893-
}
894-
client.prepareIndex(index).setSource("f", "v", "@timestamp", ts).get();
895-
}
896-
client.admin().indices().prepareRefresh(index).get();
897-
return numDocs;
898-
}
899808
}

0 commit comments

Comments
 (0)