Skip to content

Commit c71c008

Browse files
ESQL: connect_transport_exception should be thrown instead of verification_exception when ENRICH-ing if remote is disconnected (#119750) (#119881)
* fix: `verification_exception` is thrown instead of `connect_transport_exception` in ENRICH In the context of ENRICH, if a remote is disconnected and skip unavailable is set to `true`, then `verification_exception` is thrown instead of `connect_transport_exception`. This PR fixes this and adds the IT tests for ENRICH for RCS 1 and RCS 2. * Update docs/changelog/119750.yaml * Update 119750.yaml --------- Co-authored-by: Michael Peterson <[email protected]>
1 parent 20cc47d commit c71c008

File tree

4 files changed

+743
-0
lines changed

4 files changed

+743
-0
lines changed

docs/changelog/119750.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 119750
2+
summary: "ESQL: `connect_transport_exception` should be thrown instead of `verification_exception`\
3+
\ when ENRICH-ing if remote is disconnected"
4+
area: Search
5+
type: bug
6+
issues: []

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,10 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn
204204
* Mark it as SKIPPED with 0 shards searched and took=0.
205205
*/
206206
for (String c : clustersWithNoMatchingIndices) {
207+
if (executionInfo.getCluster(c).getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
208+
// if cluster was already marked SKIPPED during enrich policy resolution, do not overwrite
209+
continue;
210+
}
207211
final String indexExpression = executionInfo.getCluster(c).getIndexExpression();
208212
if (missingIndicesIsFatal(c, executionInfo)) {
209213
String error = Strings.format(
Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.remotecluster;
9+
10+
import org.elasticsearch.client.Request;
11+
import org.elasticsearch.client.Response;
12+
import org.elasticsearch.client.ResponseException;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
15+
import org.elasticsearch.xcontent.XContentBuilder;
16+
import org.elasticsearch.xcontent.json.JsonXContent;
17+
import org.junit.Before;
18+
import org.junit.ClassRule;
19+
import org.junit.rules.RuleChain;
20+
import org.junit.rules.TestRule;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.Map;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import static org.hamcrest.Matchers.containsString;
28+
import static org.hamcrest.Matchers.endsWith;
29+
import static org.hamcrest.Matchers.equalTo;
30+
import static org.hamcrest.Matchers.greaterThan;
31+
import static org.hamcrest.Matchers.is;
32+
33+
public class CrossClusterEsqlRCS1EnrichUnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase {
34+
private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean();
35+
36+
static {
37+
fulfillingCluster = ElasticsearchCluster.local()
38+
.name("fulfilling-cluster")
39+
.nodes(1)
40+
.module("x-pack-autoscaling")
41+
.module("x-pack-esql")
42+
.module("x-pack-enrich")
43+
.module("x-pack-ml")
44+
.module("ingest-common")
45+
.apply(commonClusterConfig)
46+
.setting("remote_cluster.port", "0")
47+
.setting("xpack.ml.enabled", "false")
48+
.setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
49+
.setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
50+
.setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
51+
.setting("xpack.security.authc.token.enabled", "true")
52+
.keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
53+
.node(0, spec -> spec.setting("remote_cluster_server.enabled", "true"))
54+
.build();
55+
56+
queryCluster = ElasticsearchCluster.local()
57+
.name("query-cluster")
58+
.module("x-pack-autoscaling")
59+
.module("x-pack-esql")
60+
.module("x-pack-enrich")
61+
.module("x-pack-ml")
62+
.module("ingest-common")
63+
.apply(commonClusterConfig)
64+
.setting("xpack.ml.enabled", "false")
65+
.setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get()))
66+
.build();
67+
}
68+
69+
@ClassRule
70+
public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
71+
72+
private final String[] modes = { "_coordinator", "_remote" };
73+
74+
@Before
75+
public void setupPreRequisites() throws IOException {
76+
setupRolesAndPrivileges();
77+
setSourceData();
78+
79+
var policy = createPolicy("employees-policy", "employees", "email", new String[] { "id", "designation" });
80+
// Create the enrich policy on both clusters.
81+
assertOK(client().performRequest(policy));
82+
assertOK(performRequestAgainstFulfillingCluster(policy));
83+
84+
// Execute the enrich policy on both clusters.
85+
var exec = executePolicy("employees-policy");
86+
assertOK(client().performRequest(exec));
87+
assertOK(performRequestAgainstFulfillingCluster(exec));
88+
}
89+
90+
public void testEsqlEnrichWithSkipUnavailable() throws Exception {
91+
esqlEnrichWithRandomSkipUnavailable();
92+
esqlEnrichWithSkipUnavailableTrue();
93+
esqlEnrichWithSkipUnavailableFalse();
94+
}
95+
96+
private void esqlEnrichWithRandomSkipUnavailable() throws Exception {
97+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), randomBoolean());
98+
99+
String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10";
100+
Response response = client().performRequest(esqlRequest(query));
101+
102+
Map<String, Object> map = responseAsMap(response);
103+
ArrayList<?> values = (ArrayList<?>) map.get("values");
104+
Map<?, ?> clusters = (Map<?, ?>) map.get("_clusters");
105+
Map<?, ?> clusterDetails = (Map<?, ?>) clusters.get("details");
106+
Map<?, ?> localClusterDetails = (Map<?, ?>) clusterDetails.get("(local)");
107+
Map<?, ?> remoteClusterDetails = (Map<?, ?>) clusterDetails.get("my_remote_cluster");
108+
109+
assertOK(response);
110+
assertThat((int) map.get("took"), greaterThan(0));
111+
assertThat(values.size(), is(6));
112+
for (int i = 0; i < 6; i++) {
113+
ArrayList<?> value = (ArrayList<?>) values.get(i);
114+
// Size is 3: ID, Email, Designation.
115+
assertThat(value.size(), is(3));
116+
// Email
117+
assertThat((String) value.get(0), endsWith("@corp.co"));
118+
// ID
119+
assertThat(value.get(1), is(i + 1));
120+
}
121+
122+
assertThat((int) clusters.get("total"), is(2));
123+
assertThat((int) clusters.get("successful"), is(2));
124+
assertThat((int) clusters.get("running"), is(0));
125+
assertThat((int) clusters.get("skipped"), is(0));
126+
assertThat((int) clusters.get("partial"), is(0));
127+
assertThat((int) clusters.get("failed"), is(0));
128+
129+
assertThat(clusterDetails.size(), is(2));
130+
assertThat((int) localClusterDetails.get("took"), greaterThan(0));
131+
assertThat(localClusterDetails.get("status"), is("successful"));
132+
133+
assertThat((int) remoteClusterDetails.get("took"), greaterThan(0));
134+
assertThat(remoteClusterDetails.get("status"), is("successful"));
135+
}
136+
137+
@SuppressWarnings("unchecked")
138+
private void esqlEnrichWithSkipUnavailableTrue() throws Exception {
139+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), true);
140+
141+
try {
142+
fulfillingCluster.stop(true);
143+
144+
String query = "FROM to-be-enriched,my_remote_cluster:to-be-enriched | ENRICH employees-policy | LIMIT 10";
145+
Response response = client().performRequest(esqlRequest(query));
146+
147+
Map<String, Object> map = responseAsMap(response);
148+
ArrayList<?> values = (ArrayList<?>) map.get("values");
149+
Map<?, ?> clusters = (Map<?, ?>) map.get("_clusters");
150+
Map<?, ?> clusterDetails = (Map<?, ?>) clusters.get("details");
151+
Map<?, ?> localClusterDetails = (Map<?, ?>) clusterDetails.get("(local)");
152+
Map<?, ?> remoteClusterDetails = (Map<?, ?>) clusterDetails.get("my_remote_cluster");
153+
154+
assertOK(response);
155+
assertThat((int) map.get("took"), greaterThan(0));
156+
assertThat(values.size(), is(3));
157+
158+
// We only have 3 values since the remote cluster is turned off.
159+
for (int i = 0; i < 3; i++) {
160+
ArrayList<?> value = (ArrayList<?>) values.get(i);
161+
// Size is 3: ID, Email, Designation.
162+
assertThat(value.size(), is(3));
163+
// Email
164+
assertThat((String) value.get(0), endsWith("@corp.co"));
165+
// ID
166+
assertThat(value.get(1), is(i + 1));
167+
}
168+
169+
assertThat((int) clusters.get("total"), is(2));
170+
assertThat((int) clusters.get("successful"), is(1));
171+
assertThat((int) clusters.get("running"), is(0));
172+
assertThat((int) clusters.get("skipped"), is(1));
173+
assertThat((int) clusters.get("partial"), is(0));
174+
assertThat((int) clusters.get("failed"), is(0));
175+
176+
assertThat(clusterDetails.size(), is(2));
177+
assertThat((int) localClusterDetails.get("took"), greaterThan(0));
178+
assertThat(localClusterDetails.get("status"), is("successful"));
179+
180+
assertThat((int) remoteClusterDetails.get("took"), greaterThan(0));
181+
assertThat(remoteClusterDetails.get("status"), is("skipped"));
182+
183+
ArrayList<?> remoteClusterFailures = (ArrayList<?>) remoteClusterDetails.get("failures");
184+
assertThat(remoteClusterFailures.size(), equalTo(1));
185+
Map<String, ?> failuresMap = (Map<String, ?>) remoteClusterFailures.get(0);
186+
187+
Map<String, ?> reason = (Map<String, ?>) failuresMap.get("reason");
188+
assertThat(reason.get("type").toString(), equalTo("connect_transport_exception"));
189+
assertThat(reason.get("reason").toString(), containsString("Unable to connect to [my_remote_cluster]"));
190+
} finally {
191+
fulfillingCluster.start();
192+
closeFulfillingClusterClient();
193+
initFulfillingClusterClient();
194+
}
195+
}
196+
197+
private void esqlEnrichWithSkipUnavailableFalse() throws Exception {
198+
configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), false);
199+
200+
try {
201+
fulfillingCluster.stop(true);
202+
203+
String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10";
204+
ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(query)));
205+
assertThat(ex.getMessage(), containsString("connect_transport_exception"));
206+
} finally {
207+
fulfillingCluster.start();
208+
closeFulfillingClusterClient();
209+
initFulfillingClusterClient();
210+
}
211+
}
212+
213+
private void setupRolesAndPrivileges() throws IOException {
214+
var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
215+
putUserRequest.setJsonEntity("""
216+
{
217+
"password": "x-pack-test-password",
218+
"roles" : ["remote_search"]
219+
}""");
220+
assertOK(adminClient().performRequest(putUserRequest));
221+
222+
var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
223+
putRoleOnRemoteClusterRequest.setJsonEntity("""
224+
{
225+
"indices": [
226+
{
227+
"names": ["*"],
228+
"privileges": ["read"]
229+
}
230+
],
231+
"cluster": [ "monitor_enrich", "manage_own_api_key" ],
232+
"remote_indices": [
233+
{
234+
"names": ["*"],
235+
"privileges": ["read"],
236+
"clusters": ["my_remote_cluster"]
237+
}
238+
],
239+
"remote_cluster": [
240+
{
241+
"privileges": ["monitor_enrich"],
242+
"clusters": ["my_remote_cluster"]
243+
}
244+
]
245+
}""");
246+
assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest));
247+
}
248+
249+
private void setSourceData() throws IOException {
250+
Request createIndex = new Request("PUT", "employees");
251+
createIndex.setJsonEntity("""
252+
{
253+
"mappings": {
254+
"properties": {
255+
"id": { "type": "integer" },
256+
"email": { "type": "text" },
257+
"designation": { "type": "text" }
258+
}
259+
}
260+
}
261+
""");
262+
assertOK(client().performRequest(createIndex));
263+
assertOK(performRequestAgainstFulfillingCluster(createIndex));
264+
265+
Request bulkRequest = new Request("POST", "/_bulk?refresh=true");
266+
bulkRequest.setJsonEntity("""
267+
{ "index": { "_index": "employees" } }
268+
{ "id": 1, "email": "[email protected]", "designation": "SDE intern"}
269+
{ "index": { "_index": "employees" } }
270+
{ "id": 2, "email": "[email protected]", "designation": "SDE 1"}
271+
{ "index": { "_index": "employees" } }
272+
{ "id": 3, "email": "[email protected]", "designation": "SDE 2"}
273+
{ "index": { "_index": "employees" } }
274+
{ "id": 4, "email": "[email protected]", "designation": "SSE"}
275+
{ "index": { "_index": "employees" } }
276+
{ "id": 5, "email": "[email protected]", "designation": "PSE 1"}
277+
{ "index": { "_index": "employees" } }
278+
{ "id": 6, "email": "[email protected]", "designation": "PSE 2"}
279+
""");
280+
assertOK(client().performRequest(bulkRequest));
281+
assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
282+
283+
createIndex = new Request("PUT", "to-be-enriched");
284+
createIndex.setJsonEntity("""
285+
{
286+
"mappings": {
287+
"properties": {
288+
"email": { "type": "text" }
289+
}
290+
}
291+
}
292+
""");
293+
assertOK(client().performRequest(createIndex));
294+
assertOK(performRequestAgainstFulfillingCluster(createIndex));
295+
296+
bulkRequest = new Request("POST", "/_bulk?refresh=true");
297+
bulkRequest.setJsonEntity("""
298+
{ "index": { "_index": "to-be-enriched" } }
299+
{ "email": "[email protected]"}
300+
{ "index": { "_index": "to-be-enriched" } }
301+
{ "email": "[email protected]"}
302+
{ "index": { "_index": "to-be-enriched" } }
303+
{ "email": "[email protected]"}
304+
""");
305+
assertOK(client().performRequest(bulkRequest));
306+
307+
bulkRequest = new Request("POST", "/_bulk?refresh=true");
308+
bulkRequest.setJsonEntity("""
309+
{ "index": { "_index": "to-be-enriched" } }
310+
{ "email": "[email protected]"}
311+
{ "index": { "_index": "to-be-enriched" } }
312+
{ "email": "[email protected]"}
313+
{ "index": { "_index": "to-be-enriched" } }
314+
{ "email": "[email protected]"}
315+
""");
316+
assertOK(performRequestAgainstFulfillingCluster(bulkRequest));
317+
}
318+
319+
private Request createPolicy(String policyName, String matchIndex, String matchField, String[] enrichFields) throws IOException {
320+
XContentBuilder body = JsonXContent.contentBuilder();
321+
body.startObject();
322+
body.startObject("match");
323+
body.field("indices", matchIndex);
324+
body.field("match_field", matchField);
325+
body.field("enrich_fields", enrichFields);
326+
327+
body.endObject();
328+
body.endObject();
329+
330+
return makeRequest("PUT", "_enrich/policy/" + policyName, body);
331+
}
332+
333+
private Request executePolicy(String policyName) {
334+
return new Request("PUT", "_enrich/policy/employees-policy/_execute");
335+
}
336+
337+
private Request esqlRequest(String query) throws IOException {
338+
XContentBuilder body = JsonXContent.contentBuilder();
339+
340+
body.startObject();
341+
body.field("query", query);
342+
body.field("include_ccs_metadata", true);
343+
body.endObject();
344+
345+
return makeRequest("POST", "_query", body);
346+
}
347+
348+
private Request makeRequest(String method, String endpoint, XContentBuilder requestBody) {
349+
Request request = new Request(method, endpoint);
350+
request.setJsonEntity(Strings.toString(requestBody));
351+
return request;
352+
}
353+
}

0 commit comments

Comments
 (0)