Skip to content

Commit 5124aef

Browse files
Add tests for RCS2:ES|QL to verify behaviour for disconnected clusters (#116847)
* Add tests for RCS2:ES|QL to verify behaviour for disconnected clusters * Address some review comments
1 parent c699af2 commit 5124aef

File tree

1 file changed

+316
-0
lines changed

1 file changed

+316
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.remotecluster;
9+
10+
import org.elasticsearch.client.Request;
11+
import org.elasticsearch.client.RequestOptions;
12+
import org.elasticsearch.client.Response;
13+
import org.elasticsearch.client.ResponseException;
14+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
15+
import org.elasticsearch.test.cluster.util.resource.Resource;
16+
import org.elasticsearch.xcontent.XContentBuilder;
17+
import org.elasticsearch.xcontent.json.JsonXContent;
18+
import org.junit.Before;
19+
import org.junit.ClassRule;
20+
import org.junit.rules.RuleChain;
21+
import org.junit.rules.TestRule;
22+
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.Map;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
28+
import static org.hamcrest.Matchers.containsString;
29+
import static org.hamcrest.Matchers.equalTo;
30+
import static org.hamcrest.Matchers.greaterThan;
31+
import static org.hamcrest.Matchers.is;
32+
33+
public class CrossClusterEsqlRCS2UnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase {
34+
private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
35+
36+
static {
37+
fulfillingCluster = ElasticsearchCluster.local()
38+
.name("fulfilling-cluster")
39+
.nodes(1)
40+
.module("x-pack-esql")
41+
.apply(commonClusterConfig)
42+
.setting("remote_cluster.port", "0")
43+
.setting("xpack.ml.enabled", "false")
44+
.setting("xpack.security.remote_cluster_server.ssl.enabled", "true")
45+
.setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
46+
.setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
47+
.setting("xpack.security.authc.token.enabled", "true")
48+
.keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
49+
.node(0, spec -> spec.setting("remote_cluster_server.enabled", "true"))
50+
.build();
51+
52+
queryCluster = ElasticsearchCluster.local()
53+
.name("query-cluster")
54+
.module("x-pack-esql")
55+
.apply(commonClusterConfig)
56+
.setting("xpack.ml.enabled", "false")
57+
.setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
58+
.setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
59+
.setting("xpack.security.authc.token.enabled", "true")
60+
.keystore("cluster.remote.my_remote_cluster.credentials", () -> {
61+
if (API_KEY_MAP_REF.get() == null) {
62+
final Map<String, Object> apiKeyMap = createCrossClusterAccessApiKey("""
63+
{
64+
"search": [
65+
{
66+
"names": ["*"]
67+
}
68+
]
69+
}""");
70+
API_KEY_MAP_REF.set(apiKeyMap);
71+
}
72+
return (String) API_KEY_MAP_REF.get().get("encoded");
73+
})
74+
.rolesFile(Resource.fromClasspath("roles.yml"))
75+
.user(REMOTE_METRIC_USER, PASS.toString(), "read_remote_shared_metrics", false)
76+
.build();
77+
}
78+
79+
@ClassRule
80+
public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
81+
82+
@Before
83+
public void setupPreRequisites() throws Exception {
84+
setupRolesAndPrivileges();
85+
loadData();
86+
}
87+
88+
public void testEsqlRcs2UnavailableRemoteScenarios() throws Exception {
89+
clusterShutDownWithRandomSkipUnavailable();
90+
remoteClusterShutdownWithSkipUnavailableTrue();
91+
remoteClusterShutdownWithSkipUnavailableFalse();
92+
}
93+
94+
private void clusterShutDownWithRandomSkipUnavailable() throws Exception {
95+
// skip_unavailable is set to a random boolean value.
96+
// However, no clusters are stopped. Hence, we do not expect any other behaviour
97+
// other than a 200-OK.
98+
99+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), randomBoolean());
100+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
101+
Response response = performRequestWithRemoteSearchUser(esqlRequest(query));
102+
103+
Map<String, Object> map = responseAsMap(response);
104+
ArrayList<?> columns = (ArrayList<?>) map.get("columns");
105+
ArrayList<?> values = (ArrayList<?>) map.get("values");
106+
Map<?, ?> clusters = (Map<?, ?>) map.get("_clusters");
107+
Map<?, ?> clusterDetails = (Map<?, ?>) clusters.get("details");
108+
Map<?, ?> localClusterDetails = (Map<?, ?>) clusterDetails.get("(local)");
109+
Map<?, ?> remoteClusterDetails = (Map<?, ?>) clusterDetails.get("my_remote_cluster");
110+
111+
assertOK(response);
112+
assertThat((int) map.get("took"), greaterThan(0));
113+
assertThat(columns.size(), is(4));
114+
assertThat(values.size(), is(9));
115+
116+
assertThat((int) clusters.get("total"), is(2));
117+
assertThat((int) clusters.get("successful"), is(2));
118+
assertThat((int) clusters.get("running"), is(0));
119+
assertThat((int) clusters.get("skipped"), is(0));
120+
assertThat((int) clusters.get("partial"), is(0));
121+
assertThat((int) clusters.get("failed"), is(0));
122+
123+
assertThat(clusterDetails.size(), is(2));
124+
assertThat((int) localClusterDetails.get("took"), greaterThan(0));
125+
assertThat(localClusterDetails.get("status"), is("successful"));
126+
127+
assertThat((int) remoteClusterDetails.get("took"), greaterThan(0));
128+
assertThat(remoteClusterDetails.get("status"), is("successful"));
129+
}
130+
131+
@SuppressWarnings("unchecked")
132+
private void remoteClusterShutdownWithSkipUnavailableTrue() throws Exception {
133+
// Remote cluster is stopped and skip unavailable is set to true.
134+
// We expect no exception and partial results from the remaining open cluster.
135+
136+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), true);
137+
138+
try {
139+
// Stop remote cluster.
140+
fulfillingCluster.stop(true);
141+
142+
// A simple query that targets our remote cluster.
143+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
144+
Response response = performRequestWithRemoteSearchUser(esqlRequest(query));
145+
146+
Map<String, Object> map = responseAsMap(response);
147+
ArrayList<String> columns = (ArrayList<String>) map.get("columns");
148+
ArrayList<String> values = (ArrayList<String>) map.get("values");
149+
Map<String, ?> clusters = (Map<String, ?>) map.get("_clusters");
150+
Map<String, ?> clusterDetails = (Map<String, ?>) clusters.get("details");
151+
Map<String, ?> localClusterDetails = (Map<String, ?>) clusterDetails.get("(local)");
152+
Map<String, ?> remoteClusterDetails = (Map<String, ?>) clusterDetails.get("my_remote_cluster");
153+
154+
// Assert results obtained from the local cluster and that remote cluster was
155+
// skipped.
156+
assertOK(response);
157+
assertThat((int) map.get("took"), greaterThan(0));
158+
159+
assertThat(columns.size(), is(2));
160+
assertThat(values.size(), is(5));
161+
162+
assertThat((int) clusters.get("total"), is(2));
163+
assertThat((int) clusters.get("successful"), is(1));
164+
assertThat((int) clusters.get("skipped"), is(1));
165+
assertThat((int) clusters.get("running"), is(0));
166+
assertThat((int) clusters.get("partial"), is(0));
167+
assertThat((int) clusters.get("failed"), is(0));
168+
169+
assertThat(clusterDetails.size(), is(2));
170+
assertThat((int) localClusterDetails.get("took"), greaterThan(0));
171+
assertThat(localClusterDetails.get("status"), is("successful"));
172+
173+
assertThat((int) remoteClusterDetails.get("took"), greaterThan(0));
174+
assertThat(remoteClusterDetails.get("status"), is("skipped"));
175+
176+
ArrayList<?> remoteClusterFailures = (ArrayList<?>) remoteClusterDetails.get("failures");
177+
assertThat(remoteClusterFailures.size(), equalTo(1));
178+
Map<String, ?> failuresMap = (Map<String, ?>) remoteClusterFailures.get(0);
179+
180+
Map<String, ?> reason = (Map<String, ?>) failuresMap.get("reason");
181+
assertThat(reason.get("type").toString(), equalTo("connect_transport_exception"));
182+
assertThat(reason.get("reason").toString(), containsString("Unable to connect to [my_remote_cluster]"));
183+
} finally {
184+
fulfillingCluster.start();
185+
closeFulfillingClusterClient();
186+
initFulfillingClusterClient();
187+
}
188+
}
189+
190+
private void remoteClusterShutdownWithSkipUnavailableFalse() throws Exception {
191+
// Remote cluster is stopped and skip_unavailable is set to false.
192+
// Although the other cluster is open, we expect an Exception.
193+
194+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), false);
195+
196+
try {
197+
// Stop remote cluster.
198+
fulfillingCluster.stop(true);
199+
200+
// A simple query that targets our remote cluster.
201+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
202+
ResponseException ex = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(esqlRequest(query)));
203+
assertThat(ex.getMessage(), containsString("connect_transport_exception"));
204+
} finally {
205+
fulfillingCluster.start();
206+
closeFulfillingClusterClient();
207+
initFulfillingClusterClient();
208+
}
209+
}
210+
211+
private void setupRolesAndPrivileges() throws IOException {
212+
var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
213+
putUserRequest.setJsonEntity("""
214+
{
215+
"password": "x-pack-test-password",
216+
"roles" : ["remote_search"]
217+
}""");
218+
assertOK(adminClient().performRequest(putUserRequest));
219+
220+
var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
221+
putRoleOnRemoteClusterRequest.setJsonEntity("""
222+
{
223+
"indices": [
224+
{
225+
"names": ["task", "hits"],
226+
"privileges": ["read", "read_cross_cluster", "create_index", "monitor"]
227+
}
228+
],
229+
"remote_indices": [
230+
{
231+
"names": ["task", "hits"],
232+
"privileges": ["read", "read_cross_cluster", "create_index", "monitor"],
233+
"clusters": ["*"]
234+
}
235+
]
236+
}""");
237+
assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest));
238+
}
239+
240+
private void loadData() throws IOException {
241+
Request createIndex = new Request("PUT", "task");
242+
createIndex.setJsonEntity("""
243+
{
244+
"mappings": {
245+
"properties": {
246+
"id": { "type": "integer" },
247+
"time_taken_millis": { "type": "integer" }
248+
}
249+
}
250+
}
251+
""");
252+
assertOK(client().performRequest(createIndex));
253+
254+
Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
255+
bulkRequest.setJsonEntity("""
256+
{ "index": { "_index": "task" } }
257+
{ "id": 1, "time_taken_millis": 39}
258+
{ "index": { "_index": "task" } }
259+
{ "id": 2, "time_taken_millis": 25}
260+
{ "index": { "_index": "task" } }
261+
{ "id": 3, "time_taken_millis": 42}
262+
{ "index": { "_index": "task" } }
263+
{ "id": 4, "time_taken_millis": 16}
264+
{ "index": { "_index": "task" } }
265+
{ "id": 5, "time_taken_millis": 62}
266+
""");
267+
assertOK(client().performRequest(bulkRequest));
268+
269+
createIndex = new Request("PUT", "hits");
270+
createIndex.setJsonEntity("""
271+
{
272+
"mappings": {
273+
"properties": {
274+
"endpoint_id": { "type": "integer" },
275+
"t_hits": { "type": "integer" }
276+
}
277+
}
278+
}
279+
""");
280+
assertOK(performRequestAgainstFulfillingCluster(createIndex));
281+
282+
bulkRequest = new Request("POST", "/_bulk?refresh=true");
283+
bulkRequest.setJsonEntity("""
284+
{ "index": {"_index": "hits"}}
285+
{ "endpoint_id": 1, "t_hits": 1267 }
286+
{ "index": {"_index": "hits"}}
287+
{ "endpoint_id": 2, "t_hits": 1389 }
288+
{ "index": {"_index": "hits"}}
289+
{ "endpoint_id": 3, "t_hits": 1922 }
290+
{ "index": {"_index": "hits"}}
291+
{ "endpoint_id": 4, "t_hits": 1547 }
292+
""");
293+
assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
294+
}
295+
296+
private Response performRequestWithRemoteSearchUser(final Request request) throws IOException {
297+
request.setOptions(
298+
RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(REMOTE_SEARCH_USER, PASS))
299+
);
300+
return client().performRequest(request);
301+
}
302+
303+
private Request esqlRequest(String query) throws IOException {
304+
XContentBuilder body = JsonXContent.contentBuilder();
305+
306+
body.startObject();
307+
body.field("query", query);
308+
body.field("include_ccs_metadata", true);
309+
body.endObject();
310+
311+
Request request = new Request("POST", "_query");
312+
request.setJsonEntity(org.elasticsearch.common.Strings.toString(body));
313+
314+
return request;
315+
}
316+
}

0 commit comments

Comments
 (0)