Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123728.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123728
summary: Allow skip shards with `_tier` and `_index` in ES|QL
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.ConstantFieldType;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.ValueFetcher;
Expand All @@ -26,6 +27,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

/**
Expand All @@ -39,6 +41,13 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {

public static final String TIER_FIELD_NAME = "_tier";

public static final Set<String> SUPPORTED_FIELDS = Set.of(
DataStream.TIMESTAMP_FIELD_NAME,
IndexMetadata.EVENT_INGESTED_FIELD_NAME,
TIER_FIELD_NAME,
IndexFieldMapper.NAME
);

static final ConstantFieldType TIER_FIELD_TYPE = new ConstantFieldType(TIER_FIELD_NAME, Map.of()) {
@Override
public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -24,9 +25,12 @@
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -250,4 +254,50 @@ public void testFailOnUnavailableShards() throws Exception {
assertThat(error.getMessage(), containsString("no shard copies found"));
}
}

public void testSkipOnIndexName() {
internalCluster().ensureAtLeastNumDataNodes(2);
int numIndices = between(2, 10);
Map<String, Integer> indexToNumDocs = new HashMap<>();
for (int i = 0; i < numIndices; i++) {
String index = "events-" + i;
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
);
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int docs = between(1, 5);
long timestamp = 1;
for (int d = 0; d < docs; d++) {
bulk.add(new IndexRequest().source("timestamp", ++timestamp, "message", "v-" + d));
}
bulk.get();
indexToNumDocs.put(index, docs);
}
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
});
}
try {
for (int i = 0; i < numIndices; i++) {
queriedIndices.clear();
String index = "events-" + i;
try (EsqlQueryResponse resp = run("from events* METADATA _index | WHERE _index == \"" + index + "\" | KEEP timestamp")) {
assertThat(getValuesList(resp), hasSize(indexToNumDocs.get(index)));
}
assertThat(queriedIndices, equalTo(Set.of(index)));
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.clearAllRules();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.CoordinatorRewriteContext;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static java.util.Arrays.asList;
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
Expand Down Expand Up @@ -179,23 +181,22 @@ public static PhysicalPlan localPlan(
}

/**
* Extracts the ES query for the <code>@timestamp</code> field for the passed plan.
* Extracts a filter that can be used to skip unmatched shards on the coordinator.
*/
public static QueryBuilder requestTimestampFilter(PhysicalPlan plan) {
return detectFilter(plan, "@timestamp");
public static QueryBuilder canMatchFilter(PhysicalPlan plan) {
return detectFilter(plan, CoordinatorRewriteContext.SUPPORTED_FIELDS::contains);
}

/**
* Note that since this filter does not have access to SearchStats, it cannot detect if the field is a text field with a delegate.
* We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should
* take care to not use it with TEXT fields.
*/
static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
static QueryBuilder detectFilter(PhysicalPlan plan, Predicate<String> fieldName) {
// first position is the REST filter, the second the query filter
var requestFilter = new QueryBuilder[] { null, null };

final List<QueryBuilder> requestFilters = new ArrayList<>();
plan.forEachDown(FragmentExec.class, fe -> {
requestFilter[0] = fe.esFilter();
requestFilters.add(fe.esFilter());
// detect filter inside the query
fe.fragment().forEachUp(Filter.class, f -> {
// the only filter that can be pushed down is that on top of the relation
Expand All @@ -208,21 +209,21 @@ static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
for (var exp : conjunctions) {
var refs = new AttributeSet(exp.references());
// remove literals or attributes that match by name
boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name()));
boolean matchesField = refs.removeIf(e -> fieldName.test(e.name()));
// the expression only contains the target reference
// and the expression is pushable (functions can be fully translated)
if (matchesField && refs.isEmpty() && canPushToSource(exp)) {
matches.add(exp);
}
}
}
if (matches.size() > 0) {
requestFilter[1] = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder();
if (matches.isEmpty() == false) {
requestFilters.add(TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder());
}
});
});

return Queries.combine(FILTER, asList(requestFilter));
return Queries.combine(FILTER, requestFilters);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void startComputeOnDataNodes(
Runnable runOnTaskFailure,
ActionListener<ComputeResponse> outListener
) {
QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(dataNodePlan);
QueryBuilder requestFilter = PlannerUtils.canMatchFilter(dataNodePlan);
var listener = ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close);
final long startTimeInNanos = System.nanoTime();
lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodeResult -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private QueryBuilder restFilterQuery(String field) {
}

private QueryBuilder filterQueryForTransportNodes(PhysicalPlan plan) {
return PlannerUtils.detectFilter(plan, EMP_NO);
return PlannerUtils.detectFilter(plan, EMP_NO::equals);
}

@Override
Expand Down