diff --git a/docs/changelog/123728.yaml b/docs/changelog/123728.yaml new file mode 100644 index 0000000000000..4760fa3b49f19 --- /dev/null +++ b/docs/changelog/123728.yaml @@ -0,0 +1,5 @@ +pr: 123728 +summary: Allow skip shards with `_tier` and `_index` in ES|QL +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java index 5bc477a25317d..823679276edac 100644 --- a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java +++ b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.mapper.ConstantFieldType; +import org.elasticsearch.index.mapper.IndexFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.mapper.ValueFetcher; @@ -26,6 +27,7 @@ import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.function.LongSupplier; /** @@ -39,6 +41,13 @@ public class CoordinatorRewriteContext extends QueryRewriteContext { public static final String TIER_FIELD_NAME = "_tier"; + public static final Set SUPPORTED_FIELDS = Set.of( + DataStream.TIMESTAMP_FIELD_NAME, + IndexMetadata.EVENT_INGESTED_FIELD_NAME, + TIER_FIELD_NAME, + IndexFieldMapper.NAME + ); + static final ConstantFieldType TIER_FIELD_TYPE = new ConstantFieldType(TIER_FIELD_NAME, Map.of()) { @Override public ValueFetcher valueFetcher(SearchExecutionContext context, String format) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java index 0a2e42db4078d..3e34ea0acf982 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -24,9 +25,12 @@ import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -250,4 +254,50 @@ public void testFailOnUnavailableShards() throws Exception { assertThat(error.getMessage(), containsString("no shard copies found")); } } + + public void testSkipOnIndexName() { + internalCluster().ensureAtLeastNumDataNodes(2); + int numIndices = between(2, 10); + Map indexToNumDocs = new HashMap<>(); + for (int i = 0; i < numIndices; i++) { + String index = "events-" + i; + ElasticsearchAssertions.assertAcked( + client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword") + ); + BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + int docs = between(1, 5); + long timestamp = 1; + for (int d = 0; d < docs; d++) { + bulk.add(new IndexRequest().source("timestamp", ++timestamp, "message", "v-" + d)); + } + bulk.get(); + indexToNumDocs.put(index, docs); + } + Set queriedIndices = ConcurrentCollections.newConcurrentSet(); + for (TransportService ts : internalCluster().getInstances(TransportService.class)) { + MockTransportService mockTransportService = as(ts, MockTransportService.class); + mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { + DataNodeRequest dataNodeRequest = (DataNodeRequest) request; + for (ShardId shardId : dataNodeRequest.shardIds()) { + queriedIndices.add(shardId.getIndexName()); + } + handler.messageReceived(request, channel, task); + }); + } + try { + for (int i = 0; i < numIndices; i++) { + queriedIndices.clear(); + String index = "events-" + i; + try (EsqlQueryResponse resp = run("from events* METADATA _index | WHERE _index == \"" + index + "\" | KEEP timestamp")) { + assertThat(getValuesList(resp), hasSize(indexToNumDocs.get(index))); + } + assertThat(queriedIndices, equalTo(Set.of(index))); + } + } finally { + for (TransportService ts : internalCluster().getInstances(TransportService.class)) { + MockTransportService mockTransportService = as(ts, MockTransportService.class); + mockTransportService.clearAllRules(); + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 00adfd67dfba0..d4264140547e9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -16,6 +16,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.CoordinatorRewriteContext; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Predicate; import static java.util.Arrays.asList; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES; @@ -179,10 +181,10 @@ public static PhysicalPlan localPlan( } /** - * Extracts the ES query for the @timestamp field for the passed plan. + * Extracts a filter that can be used to skip unmatched shards on the coordinator. */ - public static QueryBuilder requestTimestampFilter(PhysicalPlan plan) { - return detectFilter(plan, "@timestamp"); + public static QueryBuilder canMatchFilter(PhysicalPlan plan) { + return detectFilter(plan, CoordinatorRewriteContext.SUPPORTED_FIELDS::contains); } /** @@ -190,12 +192,11 @@ public static QueryBuilder requestTimestampFilter(PhysicalPlan plan) { * We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should * take care to not use it with TEXT fields. */ - static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) { + static QueryBuilder detectFilter(PhysicalPlan plan, Predicate fieldName) { // first position is the REST filter, the second the query filter - var requestFilter = new QueryBuilder[] { null, null }; - + final List requestFilters = new ArrayList<>(); plan.forEachDown(FragmentExec.class, fe -> { - requestFilter[0] = fe.esFilter(); + requestFilters.add(fe.esFilter()); // detect filter inside the query fe.fragment().forEachUp(Filter.class, f -> { // the only filter that can be pushed down is that on top of the relation @@ -208,7 +209,7 @@ static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) { for (var exp : conjunctions) { var refs = new AttributeSet(exp.references()); // remove literals or attributes that match by name - boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name())); + boolean matchesField = refs.removeIf(e -> fieldName.test(e.name())); // the expression only contains the target reference // and the expression is pushable (functions can be fully translated) if (matchesField && refs.isEmpty() && canPushToSource(exp)) { @@ -216,13 +217,13 @@ static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) { } } } - if (matches.size() > 0) { - requestFilter[1] = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder(); + if (matches.isEmpty() == false) { + requestFilters.add(TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder()); } }); }); - return Queries.combine(FILTER, asList(requestFilter)); + return Queries.combine(FILTER, requestFilters); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 2ea2c7d1d6f46..4457636be101c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -184,7 +184,7 @@ protected void sendRequest( clusterAlias, concreteIndices, originalIndices, - PlannerUtils.requestTimestampFilter(dataNodePlan), + PlannerUtils.canMatchFilter(dataNodePlan), runOnTaskFailure, ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close) ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java index 4191f42f08237..90f25db232ec7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java @@ -318,7 +318,7 @@ private QueryBuilder restFilterQuery(String field) { } private QueryBuilder filterQueryForTransportNodes(PhysicalPlan plan) { - return PlannerUtils.detectFilter(plan, EMP_NO); + return PlannerUtils.detectFilter(plan, EMP_NO::equals); } @Override