Skip to content

Commit b6d2d4b

Browse files
authored
ES|QL CCS uses skip_unavailable setting for handling disconnected remote clusters (#115266)
As part of ES|QL planning of a cross-cluster search, a field-caps call is done to each cluster and, if an ENRICH command is present, the enrich policy-resolve API is called on each remote. If a remote cluster cannot be connected to in these calls, the outcome depends on the skip_unavailable setting. For skip_unavailable=false clusters, the error is fatal and the error will immediately be propagated back to the client with a top level error message with a 500 HTTP status response code. For skip_unavailable=true clusters, the error is not fatal. The error will be trapped, recorded in the EsqlExecutionInfo object for the query, marking the cluster as SKIPPED. If the user requested CCS metadata to be included, the cluster status and connection failure will be present in the _clusters/details section of the response. If no clusters can be contacted, if they are all marked as skip_unavailable=true, no error will be returned. Instead a 200 HTTP status will be returned with no column and no values. If the include_ccs_metadata: true setting was included on the query, the errors will listed in the _clusters metadata section. (Note: this is also how the _search endpoint works for CCS.) Partially addresses #114531
1 parent e5d5c17 commit b6d2d4b

File tree

17 files changed

+1819
-144
lines changed

17 files changed

+1819
-144
lines changed

docs/changelog/115266.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 115266
2+
summary: ES|QL CCS uses `skip_unavailable` setting for handling disconnected remote
3+
clusters
4+
area: ES|QL
5+
type: enhancement
6+
issues: [ 114531 ]

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ static TransportVersion def(int id) {
185185
public static final TransportVersion INDEX_REQUEST_REMOVE_METERING = def(8_780_00_0);
186186
public static final TransportVersion CPU_STAT_STRING_PARSING = def(8_781_00_0);
187187
public static final TransportVersion QUERY_RULES_RETRIEVER = def(8_782_00_0);
188+
public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_783_00_0);
188189

189190
/*
190191
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java

Lines changed: 690 additions & 0 deletions
Large diffs are not rendered by default.

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java

Lines changed: 525 additions & 0 deletions
Large diffs are not rendered by default.

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.test.AbstractMultiClustersTestCase;
2626
import org.elasticsearch.test.InternalTestCluster;
2727
import org.elasticsearch.test.XContentTestUtils;
28+
import org.elasticsearch.transport.RemoteClusterAware;
2829
import org.elasticsearch.transport.TransportService;
2930
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
3031
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
@@ -246,7 +247,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() {
246247

247248
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
248249
assertThat(localCluster.getIndexExpression(), equalTo("no_such_index"));
249-
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
250+
// TODO: a follow on PR will change this to throw an Exception when the local cluster requests a concrete index that is missing
251+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
250252
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
251253
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
252254
assertThat(localCluster.getTotalShards(), equalTo(0));
@@ -499,7 +501,7 @@ public void testCCSExecutionOnSearchesWithLimit0() {
499501

500502
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
501503
assertThat(localCluster.getIndexExpression(), equalTo("nomatch*"));
502-
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
504+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
503505
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
504506
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
505507
assertThat(remoteCluster.getTotalShards(), equalTo(0));
@@ -803,6 +805,14 @@ Map<String, Object> setupTwoClusters() {
803805
clusterInfo.put("local.index", localIndex);
804806
clusterInfo.put("remote.num_shards", numShardsRemote);
805807
clusterInfo.put("remote.index", remoteIndex);
808+
809+
String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
810+
Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
811+
boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
812+
.getClusterSettings()
813+
.get(skipUnavailableSetting);
814+
clusterInfo.put("remote.skip_unavailable", skipUnavailable);
815+
806816
return clusterInfo;
807817
}
808818

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.elasticsearch.TransportVersions;
11+
import org.elasticsearch.action.search.ShardSearchFailure;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.common.io.stream.Writeable;
@@ -281,6 +282,7 @@ public static class Cluster implements ToXContentFragment, Writeable {
281282
private final Integer successfulShards;
282283
private final Integer skippedShards;
283284
private final Integer failedShards;
285+
private final List<ShardSearchFailure> failures;
284286
private final TimeValue took; // search latency for this cluster sub-search
285287

286288
/**
@@ -300,7 +302,7 @@ public String toString() {
300302
}
301303

302304
public Cluster(String clusterAlias, String indexExpression) {
303-
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null);
305+
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null, null);
304306
}
305307

306308
/**
@@ -312,7 +314,7 @@ public Cluster(String clusterAlias, String indexExpression) {
312314
* @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings
313315
*/
314316
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable) {
315-
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null);
317+
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null);
316318
}
317319

318320
/**
@@ -324,7 +326,7 @@ public Cluster(String clusterAlias, String indexExpression, boolean skipUnavaila
324326
* @param status current status of the search on this Cluster
325327
*/
326328
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable, Cluster.Status status) {
327-
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null);
329+
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null, null);
328330
}
329331

330332
public Cluster(
@@ -336,6 +338,7 @@ public Cluster(
336338
Integer successfulShards,
337339
Integer skippedShards,
338340
Integer failedShards,
341+
List<ShardSearchFailure> failures,
339342
TimeValue took
340343
) {
341344
assert clusterAlias != null : "clusterAlias cannot be null";
@@ -349,6 +352,11 @@ public Cluster(
349352
this.successfulShards = successfulShards;
350353
this.skippedShards = skippedShards;
351354
this.failedShards = failedShards;
355+
if (failures == null) {
356+
this.failures = List.of();
357+
} else {
358+
this.failures = failures;
359+
}
352360
this.took = took;
353361
}
354362

@@ -362,6 +370,11 @@ public Cluster(StreamInput in) throws IOException {
362370
this.failedShards = in.readOptionalInt();
363371
this.took = in.readOptionalTimeValue();
364372
this.skipUnavailable = in.readBoolean();
373+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
374+
this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure));
375+
} else {
376+
this.failures = List.of();
377+
}
365378
}
366379

367380
@Override
@@ -375,6 +388,9 @@ public void writeTo(StreamOutput out) throws IOException {
375388
out.writeOptionalInt(failedShards);
376389
out.writeOptionalTimeValue(took);
377390
out.writeBoolean(skipUnavailable);
391+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
392+
out.writeCollection(failures);
393+
}
378394
}
379395

380396
/**
@@ -387,12 +403,12 @@ public void writeTo(StreamOutput out) throws IOException {
387403
* All other fields can be set and override the value in the "copyFrom" Cluster.
388404
*/
389405
public static class Builder {
390-
private String indexExpression;
391406
private Cluster.Status status;
392407
private Integer totalShards;
393408
private Integer successfulShards;
394409
private Integer skippedShards;
395410
private Integer failedShards;
411+
private List<ShardSearchFailure> failures;
396412
private TimeValue took;
397413
private final Cluster original;
398414

@@ -408,22 +424,18 @@ public Builder(Cluster copyFrom) {
408424
public Cluster build() {
409425
return new Cluster(
410426
original.getClusterAlias(),
411-
indexExpression == null ? original.getIndexExpression() : indexExpression,
427+
original.getIndexExpression(),
412428
original.isSkipUnavailable(),
413429
status != null ? status : original.getStatus(),
414430
totalShards != null ? totalShards : original.getTotalShards(),
415431
successfulShards != null ? successfulShards : original.getSuccessfulShards(),
416432
skippedShards != null ? skippedShards : original.getSkippedShards(),
417433
failedShards != null ? failedShards : original.getFailedShards(),
434+
failures != null ? failures : original.getFailures(),
418435
took != null ? took : original.getTook()
419436
);
420437
}
421438

422-
public Cluster.Builder setIndexExpression(String indexExpression) {
423-
this.indexExpression = indexExpression;
424-
return this;
425-
}
426-
427439
public Cluster.Builder setStatus(Cluster.Status status) {
428440
this.status = status;
429441
return this;
@@ -449,6 +461,11 @@ public Cluster.Builder setFailedShards(int failedShards) {
449461
return this;
450462
}
451463

464+
public Cluster.Builder setFailures(List<ShardSearchFailure> failures) {
465+
this.failures = failures;
466+
return this;
467+
}
468+
452469
public Cluster.Builder setTook(TimeValue took) {
453470
this.took = took;
454471
return this;
@@ -466,7 +483,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
466483
builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
467484
builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
468485
if (took != null) {
469-
// TODO: change this to took_nanos and call took.nanos?
470486
builder.field(TOOK.getPreferredName(), took.millis());
471487
}
472488
if (totalShards != null) {
@@ -483,6 +499,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
483499
}
484500
builder.endObject();
485501
}
502+
if (failures != null && failures.size() > 0) {
503+
builder.startArray(RestActions.FAILURES_FIELD.getPreferredName());
504+
for (ShardSearchFailure failure : failures) {
505+
failure.toXContent(builder, params);
506+
}
507+
builder.endArray();
508+
}
486509
}
487510
builder.endObject();
488511
return builder;
@@ -529,6 +552,10 @@ public Integer getFailedShards() {
529552
return failedShards;
530553
}
531554

555+
public List<ShardSearchFailure> getFailures() {
556+
return failures;
557+
}
558+
532559
@Override
533560
public boolean equals(Object o) {
534561
if (this == o) return true;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public final class EnrichResolution {
2323

2424
private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
2525
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
26+
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();
2627

2728
public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
2829
return resolvedPolicies.get(new Key(policyName, mode));
@@ -51,6 +52,14 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
5152
errors.putIfAbsent(new Key(policyName, mode), reason);
5253
}
5354

55+
public void addUnavailableCluster(String clusterAlias, Exception e) {
56+
unavailableClusters.put(clusterAlias, e);
57+
}
58+
59+
public Map<String, Exception> getUnavailableClusters() {
60+
return unavailableClusters;
61+
}
62+
5463
private record Key(String policyName, Enrich.Mode mode) {
5564

5665
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.enrich;
99

10+
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.ActionListenerResponseHandler;
1213
import org.elasticsearch.action.search.SearchRequest;
@@ -50,6 +51,7 @@
5051
import java.util.ArrayList;
5152
import java.util.Arrays;
5253
import java.util.Collection;
54+
import java.util.Collections;
5355
import java.util.HashMap;
5456
import java.util.HashSet;
5557
import java.util.List;
@@ -113,12 +115,27 @@ public void resolvePolicies(
113115
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
114116
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
115117
final EnrichResolution enrichResolution = new EnrichResolution();
118+
119+
Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
120+
121+
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
122+
String clusterAlias = entry.getKey();
123+
if (entry.getValue().connectionError != null) {
124+
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
125+
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
126+
remoteClusters.remove(clusterAlias);
127+
} else {
128+
lookupResponsesToProcess.put(clusterAlias, entry.getValue());
129+
}
130+
}
131+
116132
for (UnresolvedPolicy unresolved : unresolvedPolicies) {
117133
Tuple<ResolvedEnrichPolicy, String> resolved = mergeLookupResults(
118134
unresolved,
119135
calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters),
120-
lookupResponses
136+
lookupResponsesToProcess
121137
);
138+
122139
if (resolved.v1() != null) {
123140
enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1());
124141
} else {
@@ -149,13 +166,16 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
149166
Collection<String> targetClusters,
150167
Map<String, LookupResponse> lookupResults
151168
) {
152-
assert targetClusters.isEmpty() == false;
153169
String policyName = unresolved.name;
170+
if (targetClusters.isEmpty()) {
171+
return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable");
172+
}
154173
final Map<String, ResolvedEnrichPolicy> policies = new HashMap<>();
155174
final List<String> failures = new ArrayList<>();
156175
for (String cluster : targetClusters) {
157176
LookupResponse lookupResult = lookupResults.get(cluster);
158177
if (lookupResult != null) {
178+
assert lookupResult.connectionError == null : "Should never have a non-null connectionError here";
159179
ResolvedEnrichPolicy policy = lookupResult.policies.get(policyName);
160180
if (policy != null) {
161181
policies.put(cluster, policy);
@@ -261,22 +281,34 @@ private void lookupPolicies(
261281
if (remotePolicies.isEmpty() == false) {
262282
for (String cluster : remoteClusters) {
263283
ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp));
264-
getRemoteConnection(
265-
cluster,
266-
lookupListener.delegateFailureAndWrap(
267-
(delegate, connection) -> transportService.sendRequest(
284+
getRemoteConnection(cluster, new ActionListener<Transport.Connection>() {
285+
@Override
286+
public void onResponse(Transport.Connection connection) {
287+
transportService.sendRequest(
268288
connection,
269289
RESOLVE_ACTION_NAME,
270290
new LookupRequest(cluster, remotePolicies),
271291
TransportRequestOptions.EMPTY,
272-
new ActionListenerResponseHandler<>(
273-
delegate,
274-
LookupResponse::new,
275-
threadPool.executor(ThreadPool.Names.SEARCH)
276-
)
277-
)
278-
)
279-
);
292+
new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> {
293+
if (ExceptionsHelper.isRemoteUnavailableException(e)
294+
&& remoteClusterService.isSkipUnavailable(cluster)) {
295+
l.onResponse(new LookupResponse(e));
296+
} else {
297+
l.onFailure(e);
298+
}
299+
}), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH))
300+
);
301+
}
302+
303+
@Override
304+
public void onFailure(Exception e) {
305+
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
306+
lookupListener.onResponse(new LookupResponse(e));
307+
} else {
308+
lookupListener.onFailure(e);
309+
}
310+
}
311+
});
280312
}
281313
}
282314
// local cluster
@@ -323,16 +355,30 @@ public void writeTo(StreamOutput out) throws IOException {
323355
private static class LookupResponse extends TransportResponse {
324356
final Map<String, ResolvedEnrichPolicy> policies;
325357
final Map<String, String> failures;
358+
// does not need to be Writable since this indicates a failure to contact a remote cluster, so only set on querying cluster
359+
final transient Exception connectionError;
326360

327361
LookupResponse(Map<String, ResolvedEnrichPolicy> policies, Map<String, String> failures) {
328362
this.policies = policies;
329363
this.failures = failures;
364+
this.connectionError = null;
365+
}
366+
367+
/**
368+
* Use this constructor when the remote cluster is unavailable to indicate inability to do the enrich policy lookup
369+
* @param connectionError Exception received when trying to connect to a remote cluster
370+
*/
371+
LookupResponse(Exception connectionError) {
372+
this.policies = Collections.emptyMap();
373+
this.failures = Collections.emptyMap();
374+
this.connectionError = connectionError;
330375
}
331376

332377
LookupResponse(StreamInput in) throws IOException {
333378
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
334379
this.policies = planIn.readMap(StreamInput::readString, ResolvedEnrichPolicy::new);
335380
this.failures = planIn.readMap(StreamInput::readString, StreamInput::readString);
381+
this.connectionError = null;
336382
}
337383

338384
@Override

0 commit comments

Comments
 (0)