Skip to content

Commit a9deb81

Browse files
committed
Handle failures that occur during field-caps
1 parent a27784b commit a9deb81

File tree

11 files changed

+187
-94
lines changed

11 files changed

+187
-94
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.ResourceNotFoundException;
1212
import org.elasticsearch.action.search.ShardSearchFailure;
13+
import org.elasticsearch.action.support.ActiveShardCount;
1314
import org.elasticsearch.client.internal.Client;
1415
import org.elasticsearch.common.breaker.CircuitBreaker;
1516
import org.elasticsearch.common.breaker.CircuitBreakingException;
@@ -41,6 +42,7 @@
4142
import static org.hamcrest.Matchers.empty;
4243
import static org.hamcrest.Matchers.equalTo;
4344
import static org.hamcrest.Matchers.greaterThan;
45+
import static org.hamcrest.Matchers.hasSize;
4446
import static org.hamcrest.Matchers.in;
4547
import static org.hamcrest.Matchers.instanceOf;
4648
import static org.hamcrest.Matchers.is;
@@ -62,12 +64,15 @@ private static class ClusterSetup {
6264
void populateIndices() throws Exception {
6365
local.okIds = populateIndex(LOCAL_CLUSTER, "ok-local", local.okShards, between(1, 100));
6466
populateIndexWithFailingFields(LOCAL_CLUSTER, "fail-local", local.failingShards);
67+
createUnavailableIndex(LOCAL_CLUSTER, "unavailable-local");
6568

6669
remote1.okIds = populateIndex(REMOTE_CLUSTER_1, "ok-cluster1", remote1.okShards, between(1, 100));
6770
populateIndexWithFailingFields(REMOTE_CLUSTER_1, "fail-cluster1", remote1.failingShards);
71+
createUnavailableIndex(REMOTE_CLUSTER_1, "unavailable-cluster1");
6872

6973
remote2.okIds = populateIndex(REMOTE_CLUSTER_2, "ok-cluster2", remote2.okShards, between(1, 100));
7074
populateIndexWithFailingFields(REMOTE_CLUSTER_2, "fail-cluster2", remote2.failingShards);
75+
createUnavailableIndex(REMOTE_CLUSTER_2, "unavailable-cluster2");
7176
}
7277

7378
private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, ClusterSetup cluster) {
@@ -346,6 +351,42 @@ public void testFailSearchShardsOnLocalCluster() throws Exception {
346351
}
347352
}
348353

354+
public void testResolutionFailures() throws Exception {
355+
populateIndices();
356+
EsqlQueryRequest request = new EsqlQueryRequest();
357+
request.allowPartialResults(true);
358+
request.query("FROM ok*,unavailable* | LIMIT 1000");
359+
try (var resp = runQuery(request)) {
360+
assertThat(EsqlTestUtils.getValuesList(resp), hasSize(local.okIds.size()));
361+
assertTrue(resp.isPartial());
362+
assertThat(resp.getExecutionInfo().getCluster(LOCAL_CLUSTER).getFailures(), not(empty()));
363+
assertThat(
364+
resp.getExecutionInfo().getCluster(LOCAL_CLUSTER).getFailures().get(0).reason(),
365+
containsString("index [unavailable-local] has no active shard copy")
366+
);
367+
}
368+
request.query("FROM *:ok*,unavailable* | LIMIT 1000");
369+
try (var resp = runQuery(request)) {
370+
assertThat(EsqlTestUtils.getValuesList(resp), hasSize(remote1.okIds.size() + remote2.okIds.size()));
371+
assertTrue(resp.isPartial());
372+
assertThat(resp.getExecutionInfo().getCluster(LOCAL_CLUSTER).getFailures(), not(empty()));
373+
assertThat(
374+
resp.getExecutionInfo().getCluster(LOCAL_CLUSTER).getFailures().get(0).reason(),
375+
containsString("index [unavailable-local] has no active shard copy")
376+
);
377+
}
378+
request.query("FROM ok*,cluster-a:unavailable* | LIMIT 1000");
379+
try (var resp = runQuery(request)) {
380+
assertThat(EsqlTestUtils.getValuesList(resp), hasSize(local.okIds.size()));
381+
assertTrue(resp.isPartial());
382+
assertThat(resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1).getFailures(), not(empty()));
383+
assertThat(
384+
resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1).getFailures().get(0).reason(),
385+
containsString("index [unavailable-cluster1] has no active shard copy")
386+
);
387+
}
388+
}
389+
349390
private static Exception randomFailure() {
350391
return randomFrom(
351392
new IllegalStateException("driver was closed already"),
@@ -398,4 +439,15 @@ private Set<String> populateIndexWithFailingFields(String clusterAlias, String i
398439
}
399440
return ids;
400441
}
442+
443+
private void createUnavailableIndex(String clusterAlias, String indexName) throws IOException {
444+
Client client = client(clusterAlias);
445+
assertAcked(
446+
client.admin()
447+
.indices()
448+
.prepareCreate(indexName)
449+
.setSettings(Settings.builder().put("index.routing.allocation.include._name", "no_such_node"))
450+
.setWaitForActiveShards(ActiveShardCount.NONE)
451+
);
452+
}
401453
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import org.elasticsearch.index.shard.ShardId;
2020
import org.elasticsearch.plugins.Plugin;
2121
import org.elasticsearch.test.transport.MockTransportService;
22+
import org.elasticsearch.transport.RemoteClusterAware;
2223
import org.elasticsearch.transport.TransportService;
2324
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
25+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
2426
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
2527

2628
import java.util.Collection;
@@ -363,6 +365,10 @@ public void testFailOnUnavailableShards() throws Exception {
363365
syncEsqlQueryRequest().query("from events,logs | KEEP timestamp,message").allowPartialResults(true)
364366
)
365367
) {
368+
assertTrue(resp.isPartial());
369+
EsqlExecutionInfo.Cluster local = resp.getExecutionInfo().getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
370+
assertThat(local.getFailures(), hasSize(1));
371+
assertThat(local.getFailures().get(0).reason(), containsString("index [logs] has no active shard copy"));
366372
assertThat(getValuesList(resp), hasSize(3));
367373
}
368374
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.xcontent.XContentBuilder;
2929

3030
import java.io.IOException;
31+
import java.util.ArrayList;
3132
import java.util.Collections;
3233
import java.util.EnumMap;
3334
import java.util.Iterator;
@@ -562,8 +563,14 @@ public Cluster.Builder setFailedShards(int failedShards) {
562563
return this;
563564
}
564565

565-
public Cluster.Builder setFailures(List<ShardSearchFailure> failures) {
566-
this.failures = failures;
566+
public Cluster.Builder addFailures(List<ShardSearchFailure> failures) {
567+
if (failures.isEmpty()) {
568+
return this;
569+
}
570+
if (this.failures == null) {
571+
this.failures = new ArrayList<>(original.failures);
572+
}
573+
this.failures.addAll(failures);
567574
return this;
568575
}
569576

@@ -658,7 +665,10 @@ public List<ShardSearchFailure> getFailures() {
658665
}
659666

660667
boolean isPartial() {
661-
return status == Status.PARTIAL || status == Status.SKIPPED || (failedShards != null && failedShards > 0);
668+
return status == Status.PARTIAL
669+
|| status == Status.SKIPPED
670+
|| (failedShards != null && failedShards > 0)
671+
|| failures.isEmpty() == false;
662672
}
663673

664674
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
*/
77
package org.elasticsearch.xpack.esql.index;
88

9-
import org.elasticsearch.action.NoShardAvailableActionException;
109
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
1110
import org.elasticsearch.core.Nullable;
1211

12+
import java.util.List;
1313
import java.util.Map;
1414
import java.util.Objects;
1515
import java.util.Set;
@@ -19,33 +19,26 @@ public final class IndexResolution {
1919
/**
2020
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
2121
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
22-
* @param unavailableShards Set of shards that were unavailable during index resolution
23-
* @param unavailableClusters Remote clusters that could not be contacted during planning
22+
* @param failures failures occurred during field-caps.
2423
* @return valid IndexResolution
2524
*/
26-
public static IndexResolution valid(
27-
EsIndex index,
28-
Set<String> resolvedIndices,
29-
Set<NoShardAvailableActionException> unavailableShards,
30-
Map<String, FieldCapabilitiesFailure> unavailableClusters
31-
) {
25+
public static IndexResolution valid(EsIndex index, Set<String> resolvedIndices, Map<String, List<FieldCapabilitiesFailure>> failures) {
3226
Objects.requireNonNull(index, "index must not be null if it was found");
3327
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
34-
Objects.requireNonNull(unavailableShards, "unavailableShards must not be null");
35-
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
36-
return new IndexResolution(index, null, resolvedIndices, unavailableShards, unavailableClusters);
28+
Objects.requireNonNull(failures, "failures must not be null");
29+
return new IndexResolution(index, null, resolvedIndices, failures);
3730
}
3831

3932
/**
4033
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
4134
*/
4235
public static IndexResolution valid(EsIndex index) {
43-
return valid(index, index.concreteIndices(), Set.of(), Map.of());
36+
return valid(index, index.concreteIndices(), Map.of());
4437
}
4538

4639
public static IndexResolution invalid(String invalid) {
4740
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
48-
return new IndexResolution(null, invalid, Set.of(), Set.of(), Map.of());
41+
return new IndexResolution(null, invalid, Set.of(), Map.of());
4942
}
5043

5144
public static IndexResolution notFound(String name) {
@@ -59,22 +52,19 @@ public static IndexResolution notFound(String name) {
5952

6053
// all indices found by field-caps
6154
private final Set<String> resolvedIndices;
62-
private final Set<NoShardAvailableActionException> unavailableShards;
63-
// remote clusters included in the user's index expression that could not be connected to
64-
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;
55+
// map from cluster alias to failures that occurred during field-caps.
56+
private final Map<String, List<FieldCapabilitiesFailure>> failures;
6557

6658
private IndexResolution(
6759
EsIndex index,
6860
@Nullable String invalid,
6961
Set<String> resolvedIndices,
70-
Set<NoShardAvailableActionException> unavailableShards,
71-
Map<String, FieldCapabilitiesFailure> unavailableClusters
62+
Map<String, List<FieldCapabilitiesFailure>> failures
7263
) {
7364
this.index = index;
7465
this.invalid = invalid;
7566
this.resolvedIndices = resolvedIndices;
76-
this.unavailableShards = unavailableShards;
77-
this.unavailableClusters = unavailableClusters;
67+
this.failures = failures;
7868
}
7969

8070
public boolean matches(String indexName) {
@@ -101,11 +91,10 @@ public boolean isValid() {
10191
}
10292

10393
/**
104-
* @return Map of unavailable clusters (could not be connected to during field-caps query). Key of map is cluster alias,
105-
* value is the {@link FieldCapabilitiesFailure} describing the issue.
94+
* @return Map from cluster alias to failures that occurred during field-caps.
10695
*/
107-
public Map<String, FieldCapabilitiesFailure> unavailableClusters() {
108-
return unavailableClusters;
96+
public Map<String, List<FieldCapabilitiesFailure>> failures() {
97+
return failures;
10998
}
11099

111100
/**
@@ -115,13 +104,6 @@ public Set<String> resolvedIndices() {
115104
return resolvedIndices;
116105
}
117106

118-
/**
119-
* @return set of unavailable shards during index resolution
120-
*/
121-
public Set<NoShardAvailableActionException> getUnavailableShards() {
122-
return unavailableShards;
123-
}
124-
125107
@Override
126108
public boolean equals(Object obj) {
127109
if (obj == null || obj.getClass() != getClass()) {
@@ -131,12 +113,12 @@ public boolean equals(Object obj) {
131113
return Objects.equals(index, other.index)
132114
&& Objects.equals(invalid, other.invalid)
133115
&& Objects.equals(resolvedIndices, other.resolvedIndices)
134-
&& Objects.equals(unavailableClusters, other.unavailableClusters);
116+
&& Objects.equals(failures, other.failures);
135117
}
136118

137119
@Override
138120
public int hashCode() {
139-
return Objects.hash(index, invalid, resolvedIndices, unavailableClusters);
121+
return Objects.hash(index, invalid, resolvedIndices, failures);
140122
}
141123

142124
@Override
@@ -152,7 +134,7 @@ public String toString() {
152134
+ ", resolvedIndices="
153135
+ resolvedIndices
154136
+ ", unavailableClusters="
155-
+ unavailableClusters
137+
+ failures
156138
+ '}';
157139
}
158140
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
166166
builder.setTook(executionInfo.tookSoFar());
167167
}
168168
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
169-
builder.setFailures(resp.failures);
169+
builder.addFailures(resp.failures);
170170
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
171171
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
172172
} else {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,11 @@ public void executePlan(
390390
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime);
391391
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
392392
final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards();
393-
var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0)
394-
? EsqlExecutionInfo.Cluster.Status.PARTIAL
395-
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
393+
var status = localClusterWasInterrupted.get()
394+
|| (failedShards != null && failedShards > 0)
395+
|| v.getFailures().isEmpty() == false
396+
? EsqlExecutionInfo.Cluster.Status.PARTIAL
397+
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
396398
builder.setStatus(status);
397399
}
398400
return builder.build();
@@ -438,7 +440,7 @@ public void executePlan(
438440
.setSuccessfulShards(r.getSuccessfulShards())
439441
.setSkippedShards(r.getSkippedShards())
440442
.setFailedShards(r.getFailedShards())
441-
.setFailures(r.failures)
443+
.addFailures(r.failures)
442444
.build()
443445
);
444446
dataNodesListener.onResponse(r.getCompletionInfo());
@@ -448,7 +450,7 @@ public void executePlan(
448450
LOCAL_CLUSTER,
449451
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(
450452
EsqlExecutionInfo.Cluster.Status.PARTIAL
451-
).setFailures(List.of(new ShardSearchFailure(e))).build()
453+
).addFailures(List.of(new ShardSearchFailure(e))).build()
452454
);
453455
dataNodesListener.onResponse(DriverCompletionInfo.EMPTY);
454456
} else {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.xpack.esql.plan.IndexPattern;
3636
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3737

38+
import java.util.ArrayList;
3839
import java.util.Collections;
3940
import java.util.HashMap;
4041
import java.util.HashSet;
@@ -47,14 +48,25 @@ public class EsqlCCSUtils {
4748

4849
private EsqlCCSUtils() {}
4950

50-
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
51-
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
51+
static Map<String, List<FieldCapabilitiesFailure>> groupFailuresPerCluster(List<FieldCapabilitiesFailure> failures) {
52+
Map<String, List<FieldCapabilitiesFailure>> perCluster = new HashMap<>();
5253
for (FieldCapabilitiesFailure failure : failures) {
53-
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
54-
for (String indexExpression : failure.getIndices()) {
55-
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
56-
unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
57-
}
54+
String cluster = RemoteClusterAware.parseClusterAlias(failure.getIndices()[0]);
55+
perCluster.computeIfAbsent(cluster, k -> new ArrayList<>()).add(failure);
56+
}
57+
return perCluster;
58+
}
59+
60+
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(Map<String, List<FieldCapabilitiesFailure>> failures) {
61+
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
62+
for (var e : failures.entrySet()) {
63+
if (Strings.isEmpty(e.getKey())) {
64+
continue;
65+
}
66+
for (FieldCapabilitiesFailure failure : e.getValue()) {
67+
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
68+
unavailableRemotes.put(e.getKey(), failure);
69+
break;
5870
}
5971
}
6072
}
@@ -136,8 +148,8 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn
136148
} else {
137149
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED);
138150
// add this exception to the failures list only if there is no failure already recorded there
139-
if (v.getFailures() == null || v.getFailures().size() == 0) {
140-
builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
151+
if (v.getFailures().isEmpty()) {
152+
builder.addFailures(List.of(new ShardSearchFailure(exceptionForResponse)));
141153
}
142154
}
143155
return builder.build();
@@ -197,7 +209,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
197209
}
198210
Set<String> clustersRequested = executionInfo.clusterAliases();
199211
Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
200-
clustersWithNoMatchingIndices.removeAll(indexResolution.unavailableClusters().keySet());
212+
clustersWithNoMatchingIndices.removeAll(indexResolution.failures().keySet());
201213

202214
/**
203215
* Rules enforced at planning time around non-matching indices
@@ -360,7 +372,7 @@ public static void markClusterWithFinalStateAndNoShards(
360372
.setSkippedShards(Objects.requireNonNullElse(v.getSkippedShards(), 0))
361373
.setFailedShards(Objects.requireNonNullElse(v.getFailedShards(), 0));
362374
if (ex != null) {
363-
builder.setFailures(List.of(new ShardSearchFailure(ex)));
375+
builder.addFailures(List.of(new ShardSearchFailure(ex)));
364376
}
365377
return builder.build();
366378
});

0 commit comments

Comments
 (0)