Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/123728.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 123728
summary: Allow skip shards with `_tier` and `_index` in ES|QL
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -24,9 +25,12 @@
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -250,4 +254,50 @@ public void testFailOnUnavailableShards() throws Exception {
assertThat(error.getMessage(), containsString("no shard copies found"));
}
}

public void testSkipOnIndexName() {
internalCluster().ensureAtLeastNumDataNodes(2);
int numIndices = between(2, 10);
Map<String, Integer> indexToNumDocs = new HashMap<>();
for (int i = 0; i < numIndices; i++) {
String index = "events-" + i;
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
);
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int docs = between(1, 5);
long timestamp = 1;
for (int d = 0; d < docs; d++) {
bulk.add(new IndexRequest().source("timestamp", ++timestamp, "message", "v-" + d));
}
bulk.get();
indexToNumDocs.put(index, docs);
}
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
});
}
try {
for (int i = 0; i < numIndices; i++) {
queriedIndices.clear();
String index = "events-" + i;
try (EsqlQueryResponse resp = run("from events* METADATA _index | WHERE _index == \"" + index + "\" | KEEP timestamp")) {
assertThat(getValuesList(resp), hasSize(indexToNumDocs.get(index)));
}
assertThat(queriedIndices, equalTo(Set.of(index)));
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.clearAllRules();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static java.util.Arrays.asList;
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
Expand Down Expand Up @@ -179,23 +180,25 @@ public static PhysicalPlan localPlan(
}

/**
* Extracts the ES query for the <code>@timestamp</code> field for the passed plan.
* Extracts a filter that can be used to skip unmatched shards on the coordinator.
*/
public static QueryBuilder requestTimestampFilter(PhysicalPlan plan) {
return detectFilter(plan, "@timestamp");
private static final Set<String> CAN_MATCH_FIELDS = Set.of("@timestamp", "event.ingested");

public static QueryBuilder canMatchFilter(PhysicalPlan plan) {
// metadata field like _index, _tier
return detectFilter(plan, f -> CAN_MATCH_FIELDS.contains(f) || f.startsWith("_"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might pull in other fields starting with _ , such as _score or other user defined functions.

}

/**
* Note that since this filter does not have access to SearchStats, it cannot detect if the field is a text field with a delegate.
* We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should
* take care to not use it with TEXT fields.
*/
static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
static QueryBuilder detectFilter(PhysicalPlan plan, Predicate<String> fieldName) {
// first position is the REST filter, the second the query filter
var requestFilter = new QueryBuilder[] { null, null };

final List<QueryBuilder> requestFilters = new ArrayList<>();
plan.forEachDown(FragmentExec.class, fe -> {
requestFilter[0] = fe.esFilter();
requestFilters.add(fe.esFilter());
// detect filter inside the query
fe.fragment().forEachUp(Filter.class, f -> {
// the only filter that can be pushed down is that on top of the relation
Expand All @@ -208,21 +211,21 @@ static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
for (var exp : conjunctions) {
var refs = new AttributeSet(exp.references());
// remove literals or attributes that match by name
boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name()));
boolean matchesField = refs.removeIf(e -> fieldName.test(e.name()));
// the expression only contains the target reference
// and the expression is pushable (functions can be fully translated)
if (matchesField && refs.isEmpty() && canPushToSource(exp)) {
matches.add(exp);
}
}
}
if (matches.size() > 0) {
requestFilter[1] = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder();
if (matches.isEmpty() == false) {
requestFilters.add(TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder());
}
});
});

return Queries.combine(FILTER, asList(requestFilter));
return Queries.combine(FILTER, requestFilters);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ protected void sendRequest(
clusterAlias,
concreteIndices,
originalIndices,
PlannerUtils.requestTimestampFilter(dataNodePlan),
PlannerUtils.canMatchFilter(dataNodePlan),
runOnTaskFailure,
ActionListener.releaseAfter(outListener, exchangeSource.addEmptySink())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private QueryBuilder restFilterQuery(String field) {
}

private QueryBuilder filterQueryForTransportNodes(PhysicalPlan plan) {
return PlannerUtils.detectFilter(plan, EMP_NO);
return PlannerUtils.detectFilter(plan, EMP_NO::equals);
}

@Override
Expand Down
Loading