Skip to content

Commit 94ab1a6

Browse files
Add tests for RCS1:ES|QL to verify behaviour for disconnected clusters (#116449)
* Add tests for RCS1:ES|QL to verify behaviour for disconnected clusters * fix: build * Add missing assertions for ccs metadata * Address review comments
1 parent ade29fb commit 94ab1a6

File tree

1 file changed

+286
-0
lines changed

1 file changed

+286
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.remotecluster;
9+
10+
import org.elasticsearch.client.Request;
11+
import org.elasticsearch.client.Response;
12+
import org.elasticsearch.client.ResponseException;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
15+
import org.elasticsearch.xcontent.XContentBuilder;
16+
import org.elasticsearch.xcontent.json.JsonXContent;
17+
import org.junit.Before;
18+
import org.junit.ClassRule;
19+
import org.junit.rules.RuleChain;
20+
import org.junit.rules.TestRule;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.Map;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import static org.hamcrest.CoreMatchers.containsString;
28+
import static org.hamcrest.CoreMatchers.is;
29+
import static org.hamcrest.Matchers.greaterThan;
30+
31+
public class CrossClusterEsqlRCS1UnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase {
32+
private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean();
33+
34+
static {
35+
fulfillingCluster = ElasticsearchCluster.local()
36+
.name("fulfilling-cluster")
37+
.nodes(1)
38+
.module("x-pack-esql")
39+
.module("x-pack-enrich")
40+
.apply(commonClusterConfig)
41+
.setting("remote_cluster.port", "0")
42+
.setting("xpack.ml.enabled", "false")
43+
.setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
44+
.setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
45+
.setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
46+
.setting("xpack.security.authc.token.enabled", "true")
47+
.keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
48+
.node(0, spec -> spec.setting("remote_cluster_server.enabled", "true"))
49+
.build();
50+
51+
queryCluster = ElasticsearchCluster.local()
52+
.name("query-cluster")
53+
.module("x-pack-esql")
54+
.module("x-pack-enrich")
55+
.apply(commonClusterConfig)
56+
.setting("xpack.ml.enabled", "false")
57+
.setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
58+
.build();
59+
}
60+
61+
@ClassRule
62+
public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
63+
64+
@Before
65+
public void setupPreRequisites() throws IOException {
66+
setupRolesAndPrivileges();
67+
loadData();
68+
}
69+
70+
public void testEsqlRcs1UnavailableRemoteScenarios() throws Exception {
71+
clusterShutDownWithRandomSkipUnavailable();
72+
remoteClusterShutdownWithSkipUnavailableTrue();
73+
remoteClusterShutdownWithSkipUnavailableFalse();
74+
}
75+
76+
private void clusterShutDownWithRandomSkipUnavailable() throws Exception {
77+
// skip_unavailable is set to a random boolean value.
78+
// However, no clusters are stopped. Hence, we do not expect any other behaviour
79+
// other than a 200-OK.
80+
81+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), randomBoolean());
82+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
83+
Response response = client().performRequest(esqlRequest(query));
84+
85+
Map<String, Object> map = responseAsMap(response);
86+
ArrayList<?> columns = (ArrayList<?>) map.get("columns");
87+
ArrayList<?> values = (ArrayList<?>) map.get("values");
88+
Map<?, ?> clusters = (Map<?, ?>) map.get("_clusters");
89+
Map<?, ?> clusterDetails = (Map<?, ?>) clusters.get("details");
90+
Map<?, ?> localClusterDetails = (Map<?, ?>) clusterDetails.get("(local)");
91+
Map<?, ?> remoteClusterDetails = (Map<?, ?>) clusterDetails.get("my_remote_cluster");
92+
93+
assertOK(response);
94+
assertThat((int) map.get("took"), greaterThan(0));
95+
assertThat(columns.size(), is(4));
96+
assertThat(values.size(), is(9));
97+
98+
assertThat((int) clusters.get("total"), is(2));
99+
assertThat((int) clusters.get("successful"), is(2));
100+
assertThat((int) clusters.get("running"), is(0));
101+
assertThat((int) clusters.get("skipped"), is(0));
102+
assertThat((int) clusters.get("partial"), is(0));
103+
assertThat((int) clusters.get("failed"), is(0));
104+
105+
assertThat(clusterDetails.size(), is(2));
106+
assertThat((int) localClusterDetails.get("took"), greaterThan(0));
107+
assertThat(localClusterDetails.get("status"), is("successful"));
108+
109+
assertThat((int) remoteClusterDetails.get("took"), greaterThan(0));
110+
assertThat(remoteClusterDetails.get("status"), is("successful"));
111+
}
112+
113+
@SuppressWarnings("unchecked")
114+
private void remoteClusterShutdownWithSkipUnavailableTrue() throws Exception {
115+
// Remote cluster is stopped and skip unavailable is set to true.
116+
// We expect no exception and partial results from the remaining open cluster.
117+
118+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), true);
119+
120+
try {
121+
// Stop remote cluster.
122+
fulfillingCluster.stop(true);
123+
124+
// A simple query that targets our remote cluster.
125+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
126+
Response response = client().performRequest(esqlRequest(query));
127+
128+
Map<String, Object> map = responseAsMap(response);
129+
ArrayList<String> columns = (ArrayList<String>) map.get("columns");
130+
ArrayList<String> values = (ArrayList<String>) map.get("values");
131+
Map<String, ?> clusters = (Map<String, ?>) map.get("_clusters");
132+
Map<String, ?> clusterDetails = (Map<String, ?>) clusters.get("details");
133+
Map<String, ?> localClusterDetails = (Map<String, ?>) clusterDetails.get("(local)");
134+
Map<String, ?> remoteClusterDetails = (Map<String, ?>) clusterDetails.get("my_remote_cluster");
135+
136+
// Assert results obtained from the local cluster and that remote cluster was
137+
// skipped.
138+
assertOK(response);
139+
assertThat((int) map.get("took"), greaterThan(0));
140+
141+
assertThat(columns.size(), is(2));
142+
assertThat(values.size(), is(5));
143+
144+
assertThat((int) clusters.get("total"), is(2));
145+
assertThat((int) clusters.get("successful"), is(1));
146+
assertThat((int) clusters.get("skipped"), is(1));
147+
assertThat((int) clusters.get("running"), is(0));
148+
assertThat((int) clusters.get("partial"), is(0));
149+
assertThat((int) clusters.get("failed"), is(0));
150+
151+
assertThat(clusterDetails.size(), is(2));
152+
assertThat((int) localClusterDetails.get("took"), greaterThan(0));
153+
assertThat(localClusterDetails.get("status"), is("successful"));
154+
155+
assertThat((int) remoteClusterDetails.get("took"), greaterThan(0));
156+
assertThat(remoteClusterDetails.get("status"), is("skipped"));
157+
158+
} catch (ResponseException r) {
159+
throw new AssertionError(r);
160+
} finally {
161+
fulfillingCluster.start();
162+
closeFulfillingClusterClient();
163+
initFulfillingClusterClient();
164+
}
165+
}
166+
167+
private void remoteClusterShutdownWithSkipUnavailableFalse() throws Exception {
168+
// Remote cluster is stopped and skip_unavailable is set to false.
169+
// Although the other cluster is open, we expect an Exception.
170+
171+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), false);
172+
173+
try {
174+
// Stop remote cluster.
175+
fulfillingCluster.stop(true);
176+
177+
// A simple query that targets our remote cluster.
178+
String query = "FROM *,my_remote_cluster:* | LIMIT 10";
179+
ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(query)));
180+
assertThat(ex.getMessage(), containsString("connect_transport_exception"));
181+
} finally {
182+
fulfillingCluster.start();
183+
closeFulfillingClusterClient();
184+
initFulfillingClusterClient();
185+
}
186+
}
187+
188+
private void setupRolesAndPrivileges() throws IOException {
189+
var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
190+
putUserRequest.setJsonEntity("""
191+
{
192+
"password": "x-pack-test-password",
193+
"roles" : ["remote_search"]
194+
}""");
195+
assertOK(adminClient().performRequest(putUserRequest));
196+
197+
var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
198+
putRoleOnRemoteClusterRequest.setJsonEntity("""
199+
{
200+
"indices": [
201+
{
202+
"names": ["points", "squares"],
203+
"privileges": ["read", "read_cross_cluster", "create_index", "monitor"]
204+
}
205+
],
206+
"remote_indices": [
207+
{
208+
"names": ["points", "squares"],
209+
"privileges": ["read", "read_cross_cluster", "create_index", "monitor"],
210+
"clusters": ["my_remote_cluster"]
211+
}
212+
]
213+
}""");
214+
assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest));
215+
}
216+
217+
private void loadData() throws IOException {
218+
Request createIndex = new Request("PUT", "points");
219+
createIndex.setJsonEntity("""
220+
{
221+
"mappings": {
222+
"properties": {
223+
"id": { "type": "integer" },
224+
"score": { "type": "integer" }
225+
}
226+
}
227+
}
228+
""");
229+
assertOK(client().performRequest(createIndex));
230+
231+
Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
232+
bulkRequest.setJsonEntity("""
233+
{ "index": { "_index": "points" } }
234+
{ "id": 1, "score": 75}
235+
{ "index": { "_index": "points" } }
236+
{ "id": 2, "score": 125}
237+
{ "index": { "_index": "points" } }
238+
{ "id": 3, "score": 100}
239+
{ "index": { "_index": "points" } }
240+
{ "id": 4, "score": 50}
241+
{ "index": { "_index": "points" } }
242+
{ "id": 5, "score": 150}
243+
""");
244+
assertOK(client().performRequest(bulkRequest));
245+
246+
createIndex = new Request("PUT", "squares");
247+
createIndex.setJsonEntity("""
248+
{
249+
"mappings": {
250+
"properties": {
251+
"num": { "type": "integer" },
252+
"square": { "type": "integer" }
253+
}
254+
}
255+
}
256+
""");
257+
assertOK(performRequestAgainstFulfillingCluster(createIndex));
258+
259+
bulkRequest = new Request("POST", "/_bulk?refresh=true");
260+
bulkRequest.setJsonEntity("""
261+
{ "index": {"_index": "squares"}}
262+
{ "num": 1, "square": 1 }
263+
{ "index": {"_index": "squares"}}
264+
{ "num": 2, "square": 4 }
265+
{ "index": {"_index": "squares"}}
266+
{ "num": 3, "square": 9 }
267+
{ "index": {"_index": "squares"}}
268+
{ "num": 4, "square": 16 }
269+
""");
270+
assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
271+
}
272+
273+
private Request esqlRequest(String query) throws IOException {
274+
XContentBuilder body = JsonXContent.contentBuilder();
275+
276+
body.startObject();
277+
body.field("query", query);
278+
body.field("include_ccs_metadata", true);
279+
body.endObject();
280+
281+
Request request = new Request("POST", "_query");
282+
request.setJsonEntity(Strings.toString(body));
283+
284+
return request;
285+
}
286+
}

0 commit comments

Comments
 (0)