Skip to content

Commit f590711

Browse files
pawankartik-elasticelasticsearchmachine
andauthored
Port: add RCS unavailable remotes test to 8.x (elastic#120802)
* Port: add RCS 1 unavailable remotes test * Add RCS 2 test * [CI] Auto commit changes from spotless * Fix imports --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 2ed9121 commit f590711

File tree

2 files changed

+621
-0
lines changed

2 files changed

+621
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.remotecluster;
9+
10+
import org.elasticsearch.client.Request;
11+
import org.elasticsearch.client.Response;
12+
import org.elasticsearch.client.ResponseException;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
15+
import org.elasticsearch.xcontent.XContentBuilder;
16+
import org.elasticsearch.xcontent.json.JsonXContent;
17+
import org.junit.Before;
18+
import org.junit.ClassRule;
19+
import org.junit.rules.RuleChain;
20+
import org.junit.rules.TestRule;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.Map;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import static org.hamcrest.CoreMatchers.containsString;
28+
import static org.hamcrest.CoreMatchers.is;
29+
import static org.hamcrest.Matchers.anyOf;
30+
import static org.hamcrest.Matchers.greaterThan;
31+
32+
public class CrossClusterEsqlRCS1UnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase {
33+
private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean();
34+
35+
static {
36+
fulfillingCluster = ElasticsearchCluster.local()
37+
.name("fulfilling-cluster")
38+
.nodes(1)
39+
.module("x-pack-esql")
40+
.module("x-pack-enrich")
41+
.apply(commonClusterConfig)
42+
.setting("remote_cluster.port", "0")
43+
.setting("xpack.ml.enabled", "false")
44+
.setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
45+
.setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
46+
.setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
47+
.setting("xpack.security.authc.token.enabled", "true")
48+
.keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
49+
.node(0, spec -> spec.setting("remote_cluster_server.enabled", "true"))
50+
.build();
51+
52+
queryCluster = ElasticsearchCluster.local()
53+
.name("query-cluster")
54+
.module("x-pack-esql")
55+
.module("x-pack-enrich")
56+
.apply(commonClusterConfig)
57+
.setting("xpack.ml.enabled", "false")
58+
.setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
59+
.build();
60+
}
61+
62+
@ClassRule
63+
public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
64+
65+
@Before
66+
public void setupPreRequisites() throws IOException {
67+
setupRolesAndPrivileges();
68+
loadData();
69+
}
70+
71+
public void testEsqlRcs1UnavailableRemoteScenarios() throws Exception {
72+
clusterShutDownWithRandomSkipUnavailable();
73+
remoteClusterShutdownWithSkipUnavailableTrue();
74+
remoteClusterShutdownWithSkipUnavailableFalse();
75+
}
76+
77+
private void clusterShutDownWithRandomSkipUnavailable() throws Exception {
78+
// skip_unavailable is set to a random boolean value.
79+
// However, no clusters are stopped. Hence, we do not expect any other behaviour
80+
// other than a 200-OK.
81+
82+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), randomBoolean());
83+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
84+
Response response = client().performRequest(esqlRequest(query));
85+
86+
Map<String, Object> map = responseAsMap(response);
87+
ArrayList<?> columns = (ArrayList<?>) map.get("columns");
88+
ArrayList<?> values = (ArrayList<?>) map.get("values");
89+
Map<?, ?> clusters = (Map<?, ?>) map.get("_clusters");
90+
Map<?, ?> clusterDetails = (Map<?, ?>) clusters.get("details");
91+
Map<?, ?> localClusterDetails = (Map<?, ?>) clusterDetails.get("(local)");
92+
Map<?, ?> remoteClusterDetails = (Map<?, ?>) clusterDetails.get("my_remote_cluster");
93+
94+
assertOK(response);
95+
assertThat((int) map.get("took"), greaterThan(0));
96+
assertThat(columns.size(), is(4));
97+
assertThat(values.size(), is(9));
98+
99+
assertThat((int) clusters.get("total"), is(2));
100+
assertThat((int) clusters.get("successful"), is(2));
101+
assertThat((int) clusters.get("running"), is(0));
102+
assertThat((int) clusters.get("skipped"), is(0));
103+
assertThat((int) clusters.get("partial"), is(0));
104+
assertThat((int) clusters.get("failed"), is(0));
105+
106+
assertThat(clusterDetails.size(), is(2));
107+
assertThat((int) localClusterDetails.get("took"), greaterThan(0));
108+
assertThat(localClusterDetails.get("status"), is("successful"));
109+
110+
assertThat((int) remoteClusterDetails.get("took"), greaterThan(0));
111+
assertThat(remoteClusterDetails.get("status"), is("successful"));
112+
}
113+
114+
@SuppressWarnings("unchecked")
115+
private void remoteClusterShutdownWithSkipUnavailableTrue() throws Exception {
116+
// Remote cluster is stopped and skip unavailable is set to true.
117+
// We expect no exception and partial results from the remaining open cluster.
118+
119+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), true);
120+
121+
try {
122+
// Stop remote cluster.
123+
fulfillingCluster.stop(true);
124+
125+
// A simple query that targets our remote cluster.
126+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
127+
Response response = client().performRequest(esqlRequest(query));
128+
129+
Map<String, Object> map = responseAsMap(response);
130+
ArrayList<String> columns = (ArrayList<String>) map.get("columns");
131+
ArrayList<String> values = (ArrayList<String>) map.get("values");
132+
Map<String, ?> clusters = (Map<String, ?>) map.get("_clusters");
133+
Map<String, ?> clusterDetails = (Map<String, ?>) clusters.get("details");
134+
Map<String, ?> localClusterDetails = (Map<String, ?>) clusterDetails.get("(local)");
135+
Map<String, ?> remoteClusterDetails = (Map<String, ?>) clusterDetails.get("my_remote_cluster");
136+
137+
// Assert results obtained from the local cluster and that remote cluster was
138+
// skipped.
139+
assertOK(response);
140+
assertThat((int) map.get("took"), greaterThan(0));
141+
142+
assertThat(columns.size(), is(2));
143+
assertThat(values.size(), is(5));
144+
145+
assertThat((int) clusters.get("total"), is(2));
146+
assertThat((int) clusters.get("successful"), is(1));
147+
assertThat((int) clusters.get("skipped"), is(1));
148+
assertThat((int) clusters.get("running"), is(0));
149+
assertThat((int) clusters.get("partial"), is(0));
150+
assertThat((int) clusters.get("failed"), is(0));
151+
152+
assertThat(clusterDetails.size(), is(2));
153+
assertThat((int) localClusterDetails.get("took"), greaterThan(0));
154+
assertThat(localClusterDetails.get("status"), is("successful"));
155+
156+
assertThat((int) remoteClusterDetails.get("took"), greaterThan(0));
157+
assertThat(remoteClusterDetails.get("status"), is("skipped"));
158+
159+
} catch (ResponseException r) {
160+
throw new AssertionError(r);
161+
} finally {
162+
fulfillingCluster.start();
163+
closeFulfillingClusterClient();
164+
initFulfillingClusterClient();
165+
}
166+
}
167+
168+
private void remoteClusterShutdownWithSkipUnavailableFalse() throws Exception {
169+
// Remote cluster is stopped and skip_unavailable is set to false.
170+
// Although the other cluster is open, we expect an Exception.
171+
172+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), false);
173+
174+
try {
175+
// Stop remote cluster.
176+
fulfillingCluster.stop(true);
177+
178+
// A simple query that targets our remote cluster.
179+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
180+
ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(query)));
181+
assertThat(
182+
ex.getMessage(),
183+
anyOf(
184+
containsString("connect_transport_exception"),
185+
containsString("node_disconnected_exception"),
186+
containsString("node_not_connected_exception")
187+
)
188+
);
189+
} finally {
190+
fulfillingCluster.start();
191+
closeFulfillingClusterClient();
192+
initFulfillingClusterClient();
193+
}
194+
}
195+
196+
private void setupRolesAndPrivileges() throws IOException {
197+
var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
198+
putUserRequest.setJsonEntity("""
199+
{
200+
"password": "x-pack-test-password",
201+
"roles" : ["remote_search"]
202+
}""");
203+
assertOK(adminClient().performRequest(putUserRequest));
204+
205+
var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
206+
putRoleOnRemoteClusterRequest.setJsonEntity("""
207+
{
208+
"indices": [
209+
{
210+
"names": ["points", "squares"],
211+
"privileges": ["read", "read_cross_cluster", "create_index", "monitor"]
212+
}
213+
],
214+
"remote_indices": [
215+
{
216+
"names": ["points", "squares"],
217+
"privileges": ["read", "read_cross_cluster", "create_index", "monitor"],
218+
"clusters": ["my_remote_cluster"]
219+
}
220+
]
221+
}""");
222+
assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest));
223+
}
224+
225+
private void loadData() throws IOException {
226+
Request createIndex = new Request("PUT", "points");
227+
createIndex.setJsonEntity("""
228+
{
229+
"mappings": {
230+
"properties": {
231+
"id": { "type": "integer" },
232+
"score": { "type": "integer" }
233+
}
234+
}
235+
}
236+
""");
237+
assertOK(client().performRequest(createIndex));
238+
239+
Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
240+
bulkRequest.setJsonEntity("""
241+
{ "index": { "_index": "points" } }
242+
{ "id": 1, "score": 75}
243+
{ "index": { "_index": "points" } }
244+
{ "id": 2, "score": 125}
245+
{ "index": { "_index": "points" } }
246+
{ "id": 3, "score": 100}
247+
{ "index": { "_index": "points" } }
248+
{ "id": 4, "score": 50}
249+
{ "index": { "_index": "points" } }
250+
{ "id": 5, "score": 150}
251+
""");
252+
assertOK(client().performRequest(bulkRequest));
253+
254+
createIndex = new Request("PUT", "squares");
255+
createIndex.setJsonEntity("""
256+
{
257+
"mappings": {
258+
"properties": {
259+
"num": { "type": "integer" },
260+
"square": { "type": "integer" }
261+
}
262+
}
263+
}
264+
""");
265+
assertOK(performRequestAgainstFulfillingCluster(createIndex));
266+
267+
bulkRequest = new Request("POST", "/_bulk?refresh=true");
268+
bulkRequest.setJsonEntity("""
269+
{ "index": {"_index": "squares"}}
270+
{ "num": 1, "square": 1 }
271+
{ "index": {"_index": "squares"}}
272+
{ "num": 2, "square": 4 }
273+
{ "index": {"_index": "squares"}}
274+
{ "num": 3, "square": 9 }
275+
{ "index": {"_index": "squares"}}
276+
{ "num": 4, "square": 16 }
277+
""");
278+
assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
279+
}
280+
281+
private Request esqlRequest(String query) throws IOException {
282+
XContentBuilder body = JsonXContent.contentBuilder();
283+
284+
body.startObject();
285+
body.field("query", query);
286+
body.field("include_ccs_metadata", true);
287+
body.endObject();
288+
289+
Request request = new Request("POST", "_query");
290+
request.setJsonEntity(Strings.toString(body));
291+
292+
return request;
293+
}
294+
}

0 commit comments

Comments
 (0)