diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java index 971a2a7705c18..5406f3f773205 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java @@ -166,6 +166,10 @@ protected final EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas) } protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) { + return run(esqlCommands, pragmas, filter, null); + } + + protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter, Boolean allowPartialResults) { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query(esqlCommands); if (pragmas != null) { @@ -174,6 +178,9 @@ protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, Query if (filter != null) { request.filter(filter); } + if (allowPartialResults != null) { + request.allowPartialResults(allowPartialResults); + } return run(request); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java index 3db862684aca3..44c4599f910a0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java @@ -18,7 +18,6 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase; @@ -30,6 +29,7 @@ import java.util.Map; import java.util.Set; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.hamcrest.Matchers.containsString; @@ -48,7 +48,7 @@ protected Collection> nodePlugins() { * Make sure that we don't send data-node requests to the target shards which won't match the query */ public void testCanMatch() { - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareCreate("events_2022") @@ -60,9 +60,7 @@ public void testCanMatch() { .add(new IndexRequest().source("@timestamp", "2022-05-02", "uid", "u1")) .add(new IndexRequest().source("@timestamp", "2022-12-15", "uid", "u1")) .get(); - ElasticsearchAssertions.assertAcked( - client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword") - ); + assertAcked(client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword")); client().prepareBulk("events_2023") .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .add(new IndexRequest().source("@timestamp", "2023-01-15", "uid", "u2")) @@ -72,15 +70,17 @@ public void testCanMatch() { .get(); try { Set queriedIndices = ConcurrentCollections.newConcurrentSet(); - for (TransportService ts : internalCluster().getInstances(TransportService.class)) { - MockTransportService transportService = (MockTransportService) ts; - transportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { - DataNodeRequest dataNodeRequest = (DataNodeRequest) request; - for (ShardId shardId : dataNodeRequest.shardIds()) { - queriedIndices.add(shardId.getIndexName()); + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).addRequestHandlingBehavior( + ComputeService.DATA_ACTION_NAME, + (handler, request, channel, task) -> { + DataNodeRequest dataNodeRequest = (DataNodeRequest) request; + for (ShardId shardId : dataNodeRequest.shardIds()) { + queriedIndices.add(shardId.getIndexName()); + } + handler.messageReceived(request, channel, task); } - handler.messageReceived(request, channel, task); - }); + ); } try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01"))) { assertThat(getValuesList(resp), hasSize(4)); @@ -118,14 +118,14 @@ public void testCanMatch() { queriedIndices.clear(); } } finally { - for (TransportService ts : internalCluster().getInstances(TransportService.class)) { - ((MockTransportService) ts).clearAllRules(); + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).clearAllRules(); } } } public void testAliasFilters() { - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareCreate("employees") @@ -141,7 +141,7 @@ public void testAliasFilters() { .add(new IndexRequest().source("emp_no", 106, "dept", "sales", "hired", "2012-08-09", "salary", 30.1)) .get(); - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) @@ -209,11 +209,10 @@ public void testAliasFilters() { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103749") public void testFailOnUnavailableShards() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); String logsOnlyNode = internalCluster().startDataOnlyNode(); - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareCreate("events") @@ -230,7 +229,7 @@ public void testFailOnUnavailableShards() throws Exception { .add(new IndexRequest().source("timestamp", 2, "message", "b")) .add(new IndexRequest().source("timestamp", 3, "message", "c")) .get(); - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareCreate("logs") @@ -246,12 +245,28 @@ public void testFailOnUnavailableShards() throws Exception { .add(new IndexRequest().source("timestamp", 10, "message", "aa")) .add(new IndexRequest().source("timestamp", 11, "message", "bb")) .get(); + + // when all shards available try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message")) { assertThat(getValuesList(resp), hasSize(5)); - internalCluster().stopNode(logsOnlyNode); - ensureClusterSizeConsistency(); - Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message")); - assertThat(error.getMessage(), containsString("no shard copies found")); + } + + internalCluster().stopNode(logsOnlyNode); + ensureClusterSizeConsistency(); + + // when one shard is unavailable + expectThrows( + Exception.class, + containsString("index [logs] has no active shard copy"), + () -> run("from events,logs | KEEP timestamp,message") + ); + expectThrows( + Exception.class, + containsString("index [logs] has no active shard copy"), + () -> run("from * | KEEP timestamp,message") + ); + try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message", null, null, true)) { + assertThat(getValuesList(resp), hasSize(3)); } } @@ -261,9 +276,7 @@ public void testSkipOnIndexName() { Map indexToNumDocs = new HashMap<>(); for (int i = 0; i < numIndices; i++) { String index = "events-" + i; - ElasticsearchAssertions.assertAcked( - client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword") - ); + assertAcked(client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")); BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); int docs = between(1, 5); long timestamp = 1; @@ -274,15 +287,17 @@ public void testSkipOnIndexName() { indexToNumDocs.put(index, docs); } Set queriedIndices = ConcurrentCollections.newConcurrentSet(); - for (TransportService ts : internalCluster().getInstances(TransportService.class)) { - MockTransportService mockTransportService = as(ts, MockTransportService.class); - mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { - DataNodeRequest dataNodeRequest = (DataNodeRequest) request; - for (ShardId shardId : dataNodeRequest.shardIds()) { - queriedIndices.add(shardId.getIndexName()); + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).addRequestHandlingBehavior( + ComputeService.DATA_ACTION_NAME, + (handler, request, channel, task) -> { + DataNodeRequest dataNodeRequest = (DataNodeRequest) request; + for (ShardId shardId : dataNodeRequest.shardIds()) { + queriedIndices.add(shardId.getIndexName()); + } + handler.messageReceived(request, channel, task); } - handler.messageReceived(request, channel, task); - }); + ); } try { for (int i = 0; i < numIndices; i++) { @@ -294,9 +309,8 @@ public void testSkipOnIndexName() { assertThat(queriedIndices, equalTo(Set.of(index))); } } finally { - for (TransportService ts : internalCluster().getInstances(TransportService.class)) { - MockTransportService mockTransportService = as(ts, MockTransportService.class); - mockTransportService.clearAllRules(); + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).clearAllRules(); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java index 88366bbf9a7c3..b9040d2ef40d6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java @@ -6,10 +6,10 @@ */ package org.elasticsearch.xpack.esql.index; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.core.Nullable; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -19,30 +19,33 @@ public final class IndexResolution { /** * @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps. * @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex). + * @param unavailableShards Set of shards that were unavailable during index resolution * @param unavailableClusters Remote clusters that could not be contacted during planning * @return valid IndexResolution */ public static IndexResolution valid( EsIndex index, Set resolvedIndices, + Set unavailableShards, Map unavailableClusters ) { Objects.requireNonNull(index, "index must not be null if it was found"); Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null"); + Objects.requireNonNull(unavailableShards, "unavailableShards must not be null"); Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null"); - return new IndexResolution(index, null, resolvedIndices, unavailableClusters); + return new IndexResolution(index, null, resolvedIndices, unavailableShards, unavailableClusters); } /** * Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices(). */ public static IndexResolution valid(EsIndex index) { - return valid(index, index.concreteIndices(), Collections.emptyMap()); + return valid(index, index.concreteIndices(), Set.of(), Map.of()); } public static IndexResolution invalid(String invalid) { Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid"); - return new IndexResolution(null, invalid, Collections.emptySet(), Collections.emptyMap()); + return new IndexResolution(null, invalid, Set.of(), Set.of(), Map.of()); } public static IndexResolution notFound(String name) { @@ -56,6 +59,7 @@ public static IndexResolution notFound(String name) { // all indices found by field-caps private final Set resolvedIndices; + private final Set unavailableShards; // remote clusters included in the user's index expression that could not be connected to private final Map unavailableClusters; @@ -63,11 +67,13 @@ private IndexResolution( EsIndex index, @Nullable String invalid, Set resolvedIndices, + Set unavailableShards, Map unavailableClusters ) { this.index = index; this.invalid = invalid; this.resolvedIndices = resolvedIndices; + this.unavailableShards = unavailableShards; this.unavailableClusters = unavailableClusters; } @@ -109,6 +115,13 @@ public Set resolvedIndices() { return resolvedIndices; } + /** + * @return set of unavailable shards during index resolution + */ + public Set getUnavailableShards() { + return unavailableShards; + } + @Override public boolean equals(Object obj) { if (obj == null || obj.getClass() != getClass()) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 2cc78713f89bb..2efed443c9aa9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -482,7 +482,13 @@ private void preAnalyzeIndices( indexExpressionToResolve, result.fieldNames, requestFilter, - listener.map(indexResolution -> result.withIndexResolution(indexResolution)) + listener.delegateFailure((l, indexResolution) -> { + if (configuration.allowPartialResults() == false && indexResolution.getUnavailableShards().isEmpty() == false) { + l.onFailure(indexResolution.getUnavailableShards().iterator().next()); + } else { + l.onResponse(result.withIndexResolution(indexResolution)); + } + }) ); } } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index d0d35374242e7..7fec5a7471c96 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.session; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; @@ -152,6 +153,13 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit fieldCapsResponse.getFailures() ); + Set unavailableShards = new HashSet<>(); + for (FieldCapabilitiesFailure failure : fieldCapsResponse.getFailures()) { + if (failure.getException() instanceof NoShardAvailableActionException e) { + unavailableShards.add(e); + } + } + Map concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size()); for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) { concreteIndices.put(ir.getIndexName(), ir.getIndexMode()); @@ -163,7 +171,7 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit } // If all the mappings are empty we return an empty set of resolved indices to line up with QL var index = new EsIndex(indexPattern, rootFields, allEmpty ? Map.of() : concreteIndices, partiallyUnmappedFields); - return IndexResolution.valid(index, concreteIndices.keySet(), unavailableRemotes); + return IndexResolution.valid(index, concreteIndices.keySet(), unavailableShards, unavailableRemotes); } private static Map> collectFieldCaps(FieldCapabilitiesResponse fieldCapsResponse) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 28e884b7f6f39..e151e4c8f3a9b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -254,7 +254,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ) ); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of()); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), Map.of()); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -298,7 +298,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ) ); Map unavailableClusters = Map.of(); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -340,7 +340,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -383,7 +383,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters); VerificationException ve = expectThrows( VerificationException.class, () -> EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution) @@ -417,7 +417,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Set.of(), unavailableClusters); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);