Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
Expand Down Expand Up @@ -41,6 +42,7 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand All @@ -62,12 +64,15 @@ private static class ClusterSetup {
void populateIndices() throws Exception {
local.okIds = populateIndex(LOCAL_CLUSTER, "ok-local", local.okShards, between(1, 100));
populateIndexWithFailingFields(LOCAL_CLUSTER, "fail-local", local.failingShards);
createUnavailableIndex(LOCAL_CLUSTER, "unavailable-local");

remote1.okIds = populateIndex(REMOTE_CLUSTER_1, "ok-cluster1", remote1.okShards, between(1, 100));
populateIndexWithFailingFields(REMOTE_CLUSTER_1, "fail-cluster1", remote1.failingShards);
createUnavailableIndex(REMOTE_CLUSTER_1, "unavailable-cluster1");

remote2.okIds = populateIndex(REMOTE_CLUSTER_2, "ok-cluster2", remote2.okShards, between(1, 100));
populateIndexWithFailingFields(REMOTE_CLUSTER_2, "fail-cluster2", remote2.failingShards);
createUnavailableIndex(REMOTE_CLUSTER_2, "unavailable-cluster2");
}

private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, ClusterSetup cluster) {
Expand Down Expand Up @@ -356,6 +361,42 @@ private static Exception randomFailure() {
);
}

public void testResolutionFailures() throws Exception {
populateIndices();
EsqlQueryRequest request = new EsqlQueryRequest();
request.allowPartialResults(true);
request.query("FROM ok*,unavailable* | LIMIT 1000");
try (var resp = runQuery(request)) {
assertThat(EsqlTestUtils.getValuesList(resp), hasSize(local.okIds.size()));
assertTrue(resp.isPartial());
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
var localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getFailures(), not(empty()));
assertThat(localCluster.getFailures().get(0).reason(), containsString("index [unavailable-local] has no active shard copy"));
}
request.query("FROM *:ok*,unavailable* | LIMIT 1000");
try (var resp = runQuery(request)) {
assertThat(EsqlTestUtils.getValuesList(resp), hasSize(remote1.okIds.size() + remote2.okIds.size()));
assertTrue(resp.isPartial());
var executionInfo = resp.getExecutionInfo();
var localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
assertThat(localCluster.getFailures(), not(empty()));
assertThat(localCluster.getFailures().get(0).reason(), containsString("index [unavailable-local] has no active shard copy"));
assertThat(executionInfo.getCluster(REMOTE_CLUSTER_1).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(executionInfo.getCluster(REMOTE_CLUSTER_2).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
}
request.query("FROM ok*,cluster-a:unavailable* | LIMIT 1000");
try (var resp = runQuery(request)) {
assertThat(EsqlTestUtils.getValuesList(resp), hasSize(local.okIds.size()));
assertTrue(resp.isPartial());
var remote1 = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
assertThat(remote1.getFailures(), not(empty()));
assertThat(remote1.getFailures().get(0).reason(), containsString("index [unavailable-cluster1] has no active shard copy"));
assertThat(resp.getExecutionInfo().getCluster(LOCAL_CLUSTER).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
}
}

private Set<String> populateIndexWithFailingFields(String clusterAlias, String indexName, int numShards) throws IOException {
Client client = client(clusterAlias);
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
Expand Down Expand Up @@ -398,4 +439,15 @@ private Set<String> populateIndexWithFailingFields(String clusterAlias, String i
}
return ids;
}

private void createUnavailableIndex(String clusterAlias, String indexName) throws IOException {
Client client = client(clusterAlias);
assertAcked(
client.admin()
.indices()
.prepareCreate(indexName)
.setSettings(Settings.builder().put("index.routing.allocation.include._name", "no_such_node"))
.setWaitForActiveShards(ActiveShardCount.NONE)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.util.Collection;
Expand Down Expand Up @@ -363,6 +365,10 @@ public void testFailOnUnavailableShards() throws Exception {
syncEsqlQueryRequest().query("from events,logs | KEEP timestamp,message").allowPartialResults(true)
)
) {
assertTrue(resp.isPartial());
EsqlExecutionInfo.Cluster local = resp.getExecutionInfo().getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
assertThat(local.getFailures(), hasSize(1));
assertThat(local.getFailures().get(0).reason(), containsString("index [logs] has no active shard copy"));
assertThat(getValuesList(resp), hasSize(3));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
Expand Down Expand Up @@ -562,8 +563,14 @@ public Cluster.Builder setFailedShards(int failedShards) {
return this;
}

public Cluster.Builder setFailures(List<ShardSearchFailure> failures) {
this.failures = failures;
public Cluster.Builder addFailures(List<ShardSearchFailure> failures) {
if (failures.isEmpty()) {
return this;
}
if (this.failures == null) {
this.failures = new ArrayList<>(original.failures);
}
this.failures.addAll(failures);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
*/
package org.elasticsearch.xpack.esql.index;

import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.core.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -19,33 +19,26 @@ public final class IndexResolution {
/**
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
* @param unavailableShards Set of shards that were unavailable during index resolution
* @param unavailableClusters Remote clusters that could not be contacted during planning
* @param failures failures occurred during field-caps.
* @return valid IndexResolution
*/
public static IndexResolution valid(
EsIndex index,
Set<String> resolvedIndices,
Set<NoShardAvailableActionException> unavailableShards,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
public static IndexResolution valid(EsIndex index, Set<String> resolvedIndices, Map<String, List<FieldCapabilitiesFailure>> failures) {
Objects.requireNonNull(index, "index must not be null if it was found");
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
Objects.requireNonNull(unavailableShards, "unavailableShards must not be null");
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
return new IndexResolution(index, null, resolvedIndices, unavailableShards, unavailableClusters);
Objects.requireNonNull(failures, "failures must not be null");
return new IndexResolution(index, null, resolvedIndices, failures);
}

/**
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
*/
public static IndexResolution valid(EsIndex index) {
return valid(index, index.concreteIndices(), Set.of(), Map.of());
return valid(index, index.concreteIndices(), Map.of());
}

public static IndexResolution invalid(String invalid) {
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
return new IndexResolution(null, invalid, Set.of(), Set.of(), Map.of());
return new IndexResolution(null, invalid, Set.of(), Map.of());
}

public static IndexResolution notFound(String name) {
Expand All @@ -59,22 +52,19 @@ public static IndexResolution notFound(String name) {

// all indices found by field-caps
private final Set<String> resolvedIndices;
private final Set<NoShardAvailableActionException> unavailableShards;
// remote clusters included in the user's index expression that could not be connected to
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;
// map from cluster alias to failures that occurred during field-caps.
private final Map<String, List<FieldCapabilitiesFailure>> failures;

private IndexResolution(
EsIndex index,
@Nullable String invalid,
Set<String> resolvedIndices,
Set<NoShardAvailableActionException> unavailableShards,
Map<String, FieldCapabilitiesFailure> unavailableClusters
Map<String, List<FieldCapabilitiesFailure>> failures
) {
this.index = index;
this.invalid = invalid;
this.resolvedIndices = resolvedIndices;
this.unavailableShards = unavailableShards;
this.unavailableClusters = unavailableClusters;
this.failures = failures;
}

public boolean matches(String indexName) {
Expand All @@ -101,11 +91,10 @@ public boolean isValid() {
}

/**
* @return Map of unavailable clusters (could not be connected to during field-caps query). Key of map is cluster alias,
* value is the {@link FieldCapabilitiesFailure} describing the issue.
* @return Map from cluster alias to failures that occurred during field-caps.
*/
public Map<String, FieldCapabilitiesFailure> unavailableClusters() {
return unavailableClusters;
public Map<String, List<FieldCapabilitiesFailure>> failures() {
return failures;
}

/**
Expand All @@ -115,13 +104,6 @@ public Set<String> resolvedIndices() {
return resolvedIndices;
}

/**
* @return set of unavailable shards during index resolution
*/
public Set<NoShardAvailableActionException> getUnavailableShards() {
return unavailableShards;
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
Expand All @@ -131,12 +113,12 @@ public boolean equals(Object obj) {
return Objects.equals(index, other.index)
&& Objects.equals(invalid, other.invalid)
&& Objects.equals(resolvedIndices, other.resolvedIndices)
&& Objects.equals(unavailableClusters, other.unavailableClusters);
&& Objects.equals(failures, other.failures);
}

@Override
public int hashCode() {
return Objects.hash(index, invalid, resolvedIndices, unavailableClusters);
return Objects.hash(index, invalid, resolvedIndices, failures);
}

@Override
Expand All @@ -152,7 +134,7 @@ public String toString() {
+ ", resolvedIndices="
+ resolvedIndices
+ ", unavailableClusters="
+ unavailableClusters
+ failures
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
builder.setTook(executionInfo.tookSoFar());
}
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
builder.setFailures(resp.failures);
builder.addFailures(resp.failures);
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,13 @@ public void executePlan(
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime);
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards();
var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0)
? EsqlExecutionInfo.Cluster.Status.PARTIAL
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
// Set the local cluster status (including the final driver) to partial if the query was stopped
// or encountered resolution or execution failures.
var status = localClusterWasInterrupted.get()
|| (failedShards != null && failedShards > 0)
|| v.getFailures().isEmpty() == false
? EsqlExecutionInfo.Cluster.Status.PARTIAL
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
builder.setStatus(status);
}
return builder.build();
Expand Down Expand Up @@ -445,7 +449,7 @@ public void executePlan(
.setSuccessfulShards(r.getSuccessfulShards())
.setSkippedShards(r.getSkippedShards())
.setFailedShards(r.getFailedShards())
.setFailures(r.failures)
.addFailures(r.failures)
.build()
);
dataNodesListener.onResponse(r.getCompletionInfo());
Expand All @@ -455,7 +459,7 @@ public void executePlan(
LOCAL_CLUSTER,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(
EsqlExecutionInfo.Cluster.Status.PARTIAL
).setFailures(List.of(new ShardSearchFailure(e))).build()
).addFailures(List.of(new ShardSearchFailure(e))).build()
);
dataNodesListener.onResponse(DriverCompletionInfo.EMPTY);
} else {
Expand Down
Loading