Skip to content

Commit 400cf61

Browse files
ES|QL: do not use skip_unavailable in CPS (#135419)
1 parent e000389 commit 400cf61

File tree

4 files changed

+114
-14
lines changed

4 files changed

+114
-14
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.common.settings.Setting;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.common.util.CollectionUtils;
13+
import org.elasticsearch.plugins.ClusterPlugin;
14+
import org.elasticsearch.plugins.Plugin;
15+
import org.elasticsearch.transport.ConnectTransportException;
16+
17+
import java.util.Collection;
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
22+
import static org.hamcrest.Matchers.instanceOf;
23+
import static org.hamcrest.Matchers.is;
24+
25+
// TODO: Move this test to the Serverless repo once the IT framework is ready there.
26+
public class EsqlCpsDoesNotUseSkipUnavailableIT extends AbstractCrossClusterTestCase {
27+
28+
public static class CpsPlugin extends Plugin implements ClusterPlugin {
29+
@Override
30+
public List<Setting<?>> getSettings() {
31+
return List.of(CpsEnableSetting);
32+
}
33+
}
34+
35+
private static final Setting<String> CpsEnableSetting = Setting.simpleString(
36+
"serverless.cross_project.enabled",
37+
Setting.Property.NodeScope
38+
);
39+
40+
@Override
41+
protected List<String> remoteClusterAlias() {
42+
return List.of(REMOTE_CLUSTER_1);
43+
}
44+
45+
@Override
46+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
47+
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class);
48+
}
49+
50+
@Override
51+
protected Settings nodeSettings() {
52+
return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build();
53+
}
54+
55+
@Override
56+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
57+
// Setting skip_unavailable=false results in a fatal error when the linked cluster is not available.
58+
return Map.of(REMOTE_CLUSTER_1, false);
59+
}
60+
61+
public void testCpsShouldNotUseSkipUnavailable() throws Exception {
62+
// Add some dummy data to prove we are communicating fine with the remote.
63+
assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index"));
64+
client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
65+
client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();
66+
67+
// Shut down the linked cluster we'd be targeting in the search.
68+
try {
69+
cluster(REMOTE_CLUSTER_1).close();
70+
} catch (Exception e) {
71+
throw new AssertionError(e);
72+
}
73+
74+
/*
75+
* Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster
76+
* and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true
77+
* should not result in a fatal error.
78+
*/
79+
80+
EsqlQueryRequest request = new EsqlQueryRequest();
81+
request.query("FROM *,*:* | limit 10");
82+
request.allowPartialResults(true);
83+
try (EsqlQueryResponse response = runQuery(request)) {
84+
assertThat(response.isPartial(), is(true));
85+
EsqlExecutionInfo info = response.getExecutionInfo();
86+
assertThat(info.getCluster(REMOTE_CLUSTER_1).getStatus(), is(EsqlExecutionInfo.Cluster.Status.SKIPPED));
87+
}
88+
89+
request = new EsqlQueryRequest().query("FROM *,*:* | limit 10");
90+
try (EsqlQueryResponse response = runQuery(request)) {
91+
fail("a fatal error should be thrown since allow_partial_results=false");
92+
} catch (Exception e) {
93+
assertThat(e, instanceOf(ConnectTransportException.class));
94+
}
95+
96+
}
97+
98+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ protected void doResolvePolicies(
141141
}
142142

143143
final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
144-
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
144+
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, listener.map(lookupResponses -> {
145145
final EnrichResolution enrichResolution = new EnrichResolution();
146146
final Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
147147
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
@@ -304,6 +304,7 @@ private void lookupPolicies(
304304
Collection<String> remoteClusters,
305305
boolean includeLocal,
306306
Collection<UnresolvedPolicy> unresolvedPolicies,
307+
EsqlExecutionInfo executionInfo,
307308
ActionListener<Map<String, LookupResponse>> listener
308309
) {
309310
final Map<String, LookupResponse> lookupResponses = ConcurrentCollections.newConcurrentMap();
@@ -315,8 +316,9 @@ private void lookupPolicies(
315316
// remote clusters
316317
if (remotePolicies.isEmpty() == false) {
317318
for (String cluster : remoteClusters) {
319+
boolean skipOnFailure = executionInfo.shouldSkipOnFailure(cluster);
318320
ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp));
319-
getRemoteConnection(cluster, new ActionListener<Transport.Connection>() {
321+
getRemoteConnection(cluster, skipOnFailure == false, new ActionListener<Transport.Connection>() {
320322
@Override
321323
public void onResponse(Transport.Connection connection) {
322324
transportService.sendRequest(
@@ -325,7 +327,7 @@ public void onResponse(Transport.Connection connection) {
325327
new LookupRequest(cluster, remotePolicies),
326328
TransportRequestOptions.EMPTY,
327329
new ActionListenerResponseHandler<>(
328-
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)),
330+
lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, skipOnFailure, l)),
329331
LookupResponse::new,
330332
threadPool.executor(ThreadPool.Names.SEARCH)
331333
)
@@ -334,7 +336,7 @@ public void onResponse(Transport.Connection connection) {
334336

335337
@Override
336338
public void onFailure(Exception e) {
337-
failIfSkipUnavailableFalse(e, cluster, lookupListener);
339+
failIfSkipUnavailableFalse(e, skipOnFailure, lookupListener);
338340
}
339341
});
340342
}
@@ -359,8 +361,8 @@ public void onFailure(Exception e) {
359361
}
360362
}
361363

362-
private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener<LookupResponse> lookupListener) {
363-
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster).orElse(true)) {
364+
private void failIfSkipUnavailableFalse(Exception e, boolean skipOnFailure, ActionListener<LookupResponse> lookupListener) {
365+
if (ExceptionsHelper.isRemoteUnavailableException(e) && skipOnFailure) {
364366
lookupListener.onResponse(new LookupResponse(e));
365367
} else {
366368
lookupListener.onFailure(e);
@@ -480,11 +482,7 @@ protected Map<String, EnrichPolicy> availablePolicies() {
480482
return projectResolver.getProjectMetadata(clusterService.state()).custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY).getPolicies();
481483
}
482484

483-
protected void getRemoteConnection(String cluster, ActionListener<Transport.Connection> listener) {
484-
remoteClusterService.maybeEnsureConnectedAndGetConnection(
485-
cluster,
486-
remoteClusterService.isSkipUnavailable(cluster).orElse(true) == false,
487-
listener
488-
);
485+
protected void getRemoteConnection(String cluster, boolean ensureConnected, ActionListener<Transport.Connection> listener) {
486+
remoteClusterService.maybeEnsureConnectedAndGetConnection(cluster, ensureConnected, listener);
489487
}
490488
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,11 @@ private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
378378
// include_ccs_metadata is considered only if include_execution_metadata is not set
379379
includeCcsMetadata = Boolean.TRUE.equals(request.includeCCSMetadata());
380380
}
381-
return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true), includeCcsMetadata);
381+
Boolean allowPartialResults = request.allowPartialResults() != null ? request.allowPartialResults() : defaultAllowPartialResults;
382+
return new EsqlExecutionInfo(
383+
clusterAlias -> remoteClusterService.shouldSkipOnFailure(clusterAlias, allowPartialResults),
384+
includeCcsMetadata
385+
);
382386
}
383387

384388
private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
458458
}
459459

460460
@Override
461-
protected void getRemoteConnection(String remoteCluster, ActionListener<Transport.Connection> listener) {
461+
protected void getRemoteConnection(String remoteCluster, boolean ensureConnected, ActionListener<Transport.Connection> listener) {
462462
assertThat("Must only called on the local cluster", cluster, equalTo(LOCAL_CLUSTER_GROUP_KEY));
463463
listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode()));
464464
}

0 commit comments

Comments
 (0)