Skip to content

Commit f7321e5

Browse files
authored
Allow skip shards with _tier and _index in ES|QL (#123728) (#123788)
This change adds support for skipping shards with event.ingested fields and metadata fields (_tier, _index). This should allow ES|QL to skip unmatched shards and avoid sending requests to the data nodes.
1 parent 9b100c2 commit f7321e5

File tree

6 files changed

+78
-13
lines changed

6 files changed

+78
-13
lines changed

docs/changelog/123728.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 123728
2+
summary: Allow skip shards with `_tier` and `_index` in ES|QL
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.regex.Regex;
1818
import org.elasticsearch.core.Nullable;
1919
import org.elasticsearch.index.mapper.ConstantFieldType;
20+
import org.elasticsearch.index.mapper.IndexFieldMapper;
2021
import org.elasticsearch.index.mapper.MappedFieldType;
2122
import org.elasticsearch.index.mapper.MappingLookup;
2223
import org.elasticsearch.index.mapper.ValueFetcher;
@@ -26,6 +27,7 @@
2627

2728
import java.util.Collections;
2829
import java.util.Map;
30+
import java.util.Set;
2931
import java.util.function.LongSupplier;
3032

3133
/**
@@ -39,6 +41,13 @@ public class CoordinatorRewriteContext extends QueryRewriteContext {
3941

4042
public static final String TIER_FIELD_NAME = "_tier";
4143

44+
public static final Set<String> SUPPORTED_FIELDS = Set.of(
45+
DataStream.TIMESTAMP_FIELD_NAME,
46+
IndexMetadata.EVENT_INGESTED_FIELD_NAME,
47+
TIER_FIELD_NAME,
48+
IndexFieldMapper.NAME
49+
);
50+
4251
static final ConstantFieldType TIER_FIELD_TYPE = new ConstantFieldType(TIER_FIELD_NAME, Map.of()) {
4352
@Override
4453
public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1011
import org.elasticsearch.action.index.IndexRequest;
1112
import org.elasticsearch.action.support.WriteRequest;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -24,9 +25,12 @@
2425
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
2526

2627
import java.util.Collection;
28+
import java.util.HashMap;
2729
import java.util.List;
30+
import java.util.Map;
2831
import java.util.Set;
2932

33+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
3034
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
3135
import static org.hamcrest.Matchers.containsString;
3236
import static org.hamcrest.Matchers.empty;
@@ -250,4 +254,50 @@ public void testFailOnUnavailableShards() throws Exception {
250254
assertThat(error.getMessage(), containsString("no shard copies found"));
251255
}
252256
}
257+
258+
public void testSkipOnIndexName() {
259+
internalCluster().ensureAtLeastNumDataNodes(2);
260+
int numIndices = between(2, 10);
261+
Map<String, Integer> indexToNumDocs = new HashMap<>();
262+
for (int i = 0; i < numIndices; i++) {
263+
String index = "events-" + i;
264+
ElasticsearchAssertions.assertAcked(
265+
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
266+
);
267+
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
268+
int docs = between(1, 5);
269+
long timestamp = 1;
270+
for (int d = 0; d < docs; d++) {
271+
bulk.add(new IndexRequest().source("timestamp", ++timestamp, "message", "v-" + d));
272+
}
273+
bulk.get();
274+
indexToNumDocs.put(index, docs);
275+
}
276+
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
277+
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
278+
MockTransportService mockTransportService = as(ts, MockTransportService.class);
279+
mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
280+
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
281+
for (ShardId shardId : dataNodeRequest.shardIds()) {
282+
queriedIndices.add(shardId.getIndexName());
283+
}
284+
handler.messageReceived(request, channel, task);
285+
});
286+
}
287+
try {
288+
for (int i = 0; i < numIndices; i++) {
289+
queriedIndices.clear();
290+
String index = "events-" + i;
291+
try (EsqlQueryResponse resp = run("from events* METADATA _index | WHERE _index == \"" + index + "\" | KEEP timestamp")) {
292+
assertThat(getValuesList(resp), hasSize(indexToNumDocs.get(index)));
293+
}
294+
assertThat(queriedIndices, equalTo(Set.of(index)));
295+
}
296+
} finally {
297+
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
298+
MockTransportService mockTransportService = as(ts, MockTransportService.class);
299+
mockTransportService.clearAllRules();
300+
}
301+
}
302+
}
253303
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.core.Tuple;
1717
import org.elasticsearch.index.IndexMode;
1818
import org.elasticsearch.index.mapper.MappedFieldType;
19+
import org.elasticsearch.index.query.CoordinatorRewriteContext;
1920
import org.elasticsearch.index.query.QueryBuilder;
2021
import org.elasticsearch.index.query.SearchExecutionContext;
2122
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
@@ -52,6 +53,7 @@
5253
import java.util.List;
5354
import java.util.Set;
5455
import java.util.function.Consumer;
56+
import java.util.function.Predicate;
5557

5658
import static java.util.Arrays.asList;
5759
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
@@ -179,23 +181,22 @@ public static PhysicalPlan localPlan(
179181
}
180182

181183
/**
182-
* Extracts the ES query for the <code>@timestamp</code> field for the passed plan.
184+
* Extracts a filter that can be used to skip unmatched shards on the coordinator.
183185
*/
184-
public static QueryBuilder requestTimestampFilter(PhysicalPlan plan) {
185-
return detectFilter(plan, "@timestamp");
186+
public static QueryBuilder canMatchFilter(PhysicalPlan plan) {
187+
return detectFilter(plan, CoordinatorRewriteContext.SUPPORTED_FIELDS::contains);
186188
}
187189

188190
/**
189191
* Note that since this filter does not have access to SearchStats, it cannot detect if the field is a text field with a delegate.
190192
* We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should
191193
* take care to not use it with TEXT fields.
192194
*/
193-
static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
195+
static QueryBuilder detectFilter(PhysicalPlan plan, Predicate<String> fieldName) {
194196
// first position is the REST filter, the second the query filter
195-
var requestFilter = new QueryBuilder[] { null, null };
196-
197+
final List<QueryBuilder> requestFilters = new ArrayList<>();
197198
plan.forEachDown(FragmentExec.class, fe -> {
198-
requestFilter[0] = fe.esFilter();
199+
requestFilters.add(fe.esFilter());
199200
// detect filter inside the query
200201
fe.fragment().forEachUp(Filter.class, f -> {
201202
// the only filter that can be pushed down is that on top of the relation
@@ -208,21 +209,21 @@ static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
208209
for (var exp : conjunctions) {
209210
var refs = new AttributeSet(exp.references());
210211
// remove literals or attributes that match by name
211-
boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name()));
212+
boolean matchesField = refs.removeIf(e -> fieldName.test(e.name()));
212213
// the expression only contains the target reference
213214
// and the expression is pushable (functions can be fully translated)
214215
if (matchesField && refs.isEmpty() && canPushToSource(exp)) {
215216
matches.add(exp);
216217
}
217218
}
218219
}
219-
if (matches.size() > 0) {
220-
requestFilter[1] = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder();
220+
if (matches.isEmpty() == false) {
221+
requestFilters.add(TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder());
221222
}
222223
});
223224
});
224225

225-
return Queries.combine(FILTER, asList(requestFilter));
226+
return Queries.combine(FILTER, requestFilters);
226227
}
227228

228229
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ protected void sendRequest(
184184
clusterAlias,
185185
concreteIndices,
186186
originalIndices,
187-
PlannerUtils.requestTimestampFilter(dataNodePlan),
187+
PlannerUtils.canMatchFilter(dataNodePlan),
188188
runOnTaskFailure,
189189
ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close)
190190
);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ private QueryBuilder restFilterQuery(String field) {
318318
}
319319

320320
private QueryBuilder filterQueryForTransportNodes(PhysicalPlan plan) {
321-
return PlannerUtils.detectFilter(plan, EMP_NO);
321+
return PlannerUtils.detectFilter(plan, EMP_NO::equals);
322322
}
323323

324324
@Override

0 commit comments

Comments
 (0)