Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
Expand Down Expand Up @@ -74,6 +75,9 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
private volatile boolean isPartial; // Does this request have partial results?
private transient volatile boolean isStopped; // Have we received stop command?

// Track failures that occur during the resolution phase on the coordinator. These will be reported in the final response.
private transient Map<String, ResolutionFailure> resolutionFailures = Map.of();

// start time for the ESQL query for calculating time spans relative to the beginning of the query
private final transient TimeSpan.Builder relativeStart;
private transient TimeSpan overallTimeSpan;
Expand Down Expand Up @@ -159,6 +163,14 @@ public boolean includeCCSMetadata() {
return includeCCSMetadata;
}

public void setResolutionFailures(Map<String, ResolutionFailure> resolutionFailures) {
this.resolutionFailures = resolutionFailures;
}

public ResolutionFailure getResolutionFailure(String clusterAlias) {
return resolutionFailures.get(clusterAlias);
}

/**
* Call when ES|QL "planning" phase is complete and query execution (in ComputeService) is about to start.
* Note this is currently only built for a single phase planning/execution model. When INLINESTATS
Expand All @@ -181,6 +193,23 @@ public void markEndQuery() {
assert relativeStart != null : "Relative start time must be set when markEndQuery is called";
overallTimeSpan = relativeStart.stop();
overallTook = overallTimeSpan.toTimeValue();
// combines with the resolution failures.
for (String clusterAlias : clusterInfo.keySet()) {
ResolutionFailure resolutionFailure = resolutionFailures.get(clusterAlias);
if (resolutionFailure == null) {
continue;
}
isPartial = true;
swapCluster(clusterAlias, (k, c) -> {
List<ShardSearchFailure> mergedFailures = new ArrayList<>(c.failures.size() + resolutionFailure.failures().size());
mergedFailures.addAll(c.failures);
for (Exception e : resolutionFailure.failures()) {
mergedFailures.add(new ShardSearchFailure(e));
}
var status = c.status == Cluster.Status.SUCCESSFUL ? Cluster.Status.PARTIAL : c.status;
return new Cluster.Builder(c).setStatus(status).setFailures(mergedFailures).build();
});
}
}

// for testing only - use markEndQuery in production code
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import java.util.List;

/**
* Track failures that occur during the resolution phase on the coordinator. These will be reported in the final response.
*/
public record ResolutionFailure(List<Exception> failures) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
*/
package org.elasticsearch.xpack.esql.index;

import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xpack.esql.action.ResolutionFailure;

import java.util.Map;
import java.util.Objects;
Expand All @@ -17,35 +17,34 @@
public final class IndexResolution {

/**
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
* @param unavailableShards Set of shards that were unavailable during index resolution
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
* @param unavailableClusters Remote clusters that could not be contacted during planning
* @param failuresByCluster the failures that occurred during the index resolution, grouped by cluster.
* @return valid IndexResolution
*/
public static IndexResolution valid(
EsIndex index,
Set<String> resolvedIndices,
Set<NoShardAvailableActionException> unavailableShards,
Map<String, FieldCapabilitiesFailure> unavailableClusters
Map<String, FieldCapabilitiesFailure> unavailableClusters,
Map<String, ResolutionFailure> failuresByCluster
) {
Objects.requireNonNull(index, "index must not be null if it was found");
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
Objects.requireNonNull(unavailableShards, "unavailableShards must not be null");
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
return new IndexResolution(index, null, resolvedIndices, unavailableShards, unavailableClusters);
return new IndexResolution(index, null, resolvedIndices, unavailableClusters, failuresByCluster);
}

/**
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
*/
public static IndexResolution valid(EsIndex index) {
return valid(index, index.concreteIndices(), Set.of(), Map.of());
return valid(index, index.concreteIndices(), Map.of(), Map.of());
}

public static IndexResolution invalid(String invalid) {
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
return new IndexResolution(null, invalid, Set.of(), Set.of(), Map.of());
return new IndexResolution(null, invalid, Set.of(), Map.of(), Map.of());
}

public static IndexResolution notFound(String name) {
Expand All @@ -59,22 +58,22 @@ public static IndexResolution notFound(String name) {

// all indices found by field-caps
private final Set<String> resolvedIndices;
private final Set<NoShardAvailableActionException> unavailableShards;
// remote clusters included in the user's index expression that could not be connected to
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;
private final Map<String, ResolutionFailure> failures;

private IndexResolution(
EsIndex index,
@Nullable String invalid,
Set<String> resolvedIndices,
Set<NoShardAvailableActionException> unavailableShards,
Map<String, FieldCapabilitiesFailure> unavailableClusters
Map<String, FieldCapabilitiesFailure> unavailableClusters,
Map<String, ResolutionFailure> failures
) {
this.index = index;
this.invalid = invalid;
this.resolvedIndices = resolvedIndices;
this.unavailableShards = unavailableShards;
this.unavailableClusters = unavailableClusters;
this.failures = failures;
}

public boolean matches(String indexName) {
Expand Down Expand Up @@ -116,10 +115,10 @@ public Set<String> resolvedIndices() {
}

/**
* @return set of unavailable shards during index resolution
* @return a map from cluster to failures occurred on that cluster
*/
public Set<NoShardAvailableActionException> getUnavailableShards() {
return unavailableShards;
public Map<String, ResolutionFailure> failures() {
return failures;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,12 @@ public void executePlan(
localListener.acquireCompute()
);
// starts computes on data nodes on the main cluster
if (localConcreteIndices != null && localConcreteIndices.indices().length > 0) {
final var resolutionFailure = execInfo.getResolutionFailure(LOCAL_CLUSTER);
if (configuration.allowPartialResults() == false && resolutionFailure != null) {
for (Exception failure : resolutionFailure.failures()) {
localListener.acquireAvoid().onFailure(failure);
}
} else if (localConcreteIndices != null && localConcreteIndices.indices().length > 0) {
final var dataNodesListener = localListener.acquireCompute();
dataNodeComputeHandler.startComputeOnDataNodes(
sessionId,
Expand Down Expand Up @@ -462,10 +467,20 @@ public void executePlan(
// starts computes on remote clusters
final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices);
for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) {
if (execInfo.getCluster(cluster.clusterAlias()).getStatus() != EsqlExecutionInfo.Cluster.Status.RUNNING) {
String clusterAlias = cluster.clusterAlias();
if (execInfo.getCluster(clusterAlias).getStatus() != EsqlExecutionInfo.Cluster.Status.RUNNING) {
// if the cluster is already in the terminal state from the planning stage, no need to call it
continue;
}
final var resolutionFailure = execInfo.getResolutionFailure(clusterAlias);
if (configuration.allowPartialResults() == false
&& execInfo.isSkipUnavailable(clusterAlias) == false
&& resolutionFailure != null) {
for (Exception failure : resolutionFailure.failures()) {
computeListener.acquireAvoid().onFailure(new RemoteException(clusterAlias, failure));
}
continue;
}
clusterComputeHandler.startComputeOnRemoteCluster(
sessionId,
rootTask,
Expand All @@ -486,9 +501,9 @@ public void executePlan(
* wrapped.
*/
if (ex instanceof TransportException te) {
l.onFailure(new RemoteException(cluster.clusterAlias(), FailureCollector.unwrapTransportException(te)));
l.onFailure(new RemoteException(clusterAlias, FailureCollector.unwrapTransportException(te)));
} else {
l.onFailure(new RemoteException(cluster.clusterAlias(), ex));
l.onFailure(new RemoteException(clusterAlias, ex));
}
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,21 @@
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster;
import org.elasticsearch.xpack.esql.action.ResolutionFailure;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public class EsqlCCSUtils {

Expand All @@ -61,6 +64,22 @@ static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(
return unavailableRemotes;
}

static Map<String, ResolutionFailure> groupResolutionFailuresByCluster(List<FieldCapabilitiesFailure> failures) {
Map<String, List<FieldCapabilitiesFailure>> groupedByCluster = new HashMap<>();
for (FieldCapabilitiesFailure failure : failures) {
String clusterAlias = RemoteClusterAware.parseClusterAlias(failure.getIndices()[0]);
groupedByCluster.computeIfAbsent(clusterAlias, k -> new ArrayList<>()).add(failure);
}
return groupedByCluster.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e -> new ResolutionFailure(e.getValue().stream().map(FieldCapabilitiesFailure::getException).toList())
)
);
}

/**
* ActionListener that receives LogicalPlan or error from logical planning.
* Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,8 @@ private void preAnalyzeMainIndices(
result.fieldNames,
requestFilter,
listener.delegateFailure((l, indexResolution) -> {
if (configuration.allowPartialResults() == false && indexResolution.getUnavailableShards().isEmpty() == false) {
l.onFailure(indexResolution.getUnavailableShards().iterator().next());
} else {
l.onResponse(result.withIndexResolution(indexResolution));
}
executionInfo.setResolutionFailures(indexResolution.failures());
l.onResponse(result.withIndexResolution(indexResolution));
})
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
Expand Down Expand Up @@ -149,17 +148,12 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit
}
}

var failuresGroupedByCluster = EsqlCCSUtils.groupResolutionFailuresByCluster(fieldCapsResponse.getFailures());

Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlCCSUtils.determineUnavailableRemoteClusters(
fieldCapsResponse.getFailures()
);

Set<NoShardAvailableActionException> unavailableShards = new HashSet<>();
for (FieldCapabilitiesFailure failure : fieldCapsResponse.getFailures()) {
if (failure.getException() instanceof NoShardAvailableActionException e) {
unavailableShards.add(e);
}
}

Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
Expand All @@ -171,7 +165,7 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit
}
// If all the mappings are empty we return an empty set of resolved indices to line up with QL
var index = new EsIndex(indexPattern, rootFields, allEmpty ? Map.of() : concreteIndices, partiallyUnmappedFields);
return IndexResolution.valid(index, concreteIndices.keySet(), unavailableShards, unavailableRemotes);
return IndexResolution.valid(index, concreteIndices.keySet(), unavailableRemotes, failuresGroupedByCluster);
}

private static Map<String, List<IndexFieldCapabilities>> collectFieldCaps(FieldCapabilitiesResponse fieldCapsResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
)
);

IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), Map.of());
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of(), Map.of());

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand Down Expand Up @@ -297,7 +297,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
)
);
Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of();
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters, Map.of());

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand Down Expand Up @@ -339,7 +339,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
// remote1 is unavailable
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters, Map.of());

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand Down Expand Up @@ -382,7 +382,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {

var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters, Map.of());
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
Expand Down Expand Up @@ -431,7 +431,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
// remote1 is unavailable
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters, Map.of());

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand Down