Skip to content

Commit 6660a0f

Browse files
committed
Allow skip shards with _tier and _index in ES|QL
1 parent 201a370 commit 6660a0f

File tree

4 files changed

+82
-13
lines changed

4 files changed

+82
-13
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1011
import org.elasticsearch.action.index.IndexRequest;
1112
import org.elasticsearch.action.support.WriteRequest;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -17,16 +18,24 @@
1718
import org.elasticsearch.index.query.RangeQueryBuilder;
1819
import org.elasticsearch.index.shard.ShardId;
1920
import org.elasticsearch.plugins.Plugin;
21+
import org.elasticsearch.tasks.Task;
2022
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
2123
import org.elasticsearch.test.transport.MockTransportService;
24+
import org.elasticsearch.test.transport.StubbableTransport;
25+
import org.elasticsearch.transport.TransportChannel;
26+
import org.elasticsearch.transport.TransportRequest;
27+
import org.elasticsearch.transport.TransportRequestHandler;
2228
import org.elasticsearch.transport.TransportService;
2329
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
2430
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
2531

2632
import java.util.Collection;
33+
import java.util.HashMap;
2734
import java.util.List;
35+
import java.util.Map;
2836
import java.util.Set;
2937

38+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
3039
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
3140
import static org.hamcrest.Matchers.containsString;
3241
import static org.hamcrest.Matchers.empty;
@@ -250,4 +259,61 @@ public void testFailOnUnavailableShards() throws Exception {
250259
assertThat(error.getMessage(), containsString("no shard copies found"));
251260
}
252261
}
262+
263+
public void testSkipOnIndexName() {
264+
internalCluster().ensureAtLeastNumDataNodes(2);
265+
int numIndices = between(2, 10);
266+
Map<String, Integer> indexToNumDocs = new HashMap<>();
267+
for (int i = 0; i < numIndices; i++) {
268+
String index = "events-" + i;
269+
ElasticsearchAssertions.assertAcked(
270+
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
271+
);
272+
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
273+
int docs = between(1, 5);
274+
long timestamp = 1;
275+
for (int d = 0; d < docs; d++) {
276+
bulk.add(new IndexRequest().source("timestamp", ++timestamp, "message", "v-" + d));
277+
}
278+
bulk.get();
279+
indexToNumDocs.put(index, docs);
280+
}
281+
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
282+
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
283+
MockTransportService mockTransportService = as(ts, MockTransportService.class);
284+
mockTransportService.addRequestHandlingBehavior(
285+
ComputeService.DATA_ACTION_NAME,
286+
new StubbableTransport.RequestHandlingBehavior<TransportRequest>() {
287+
@Override
288+
public void messageReceived(
289+
TransportRequestHandler<TransportRequest> handler,
290+
TransportRequest request,
291+
TransportChannel channel,
292+
Task task
293+
) throws Exception {
294+
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
295+
for (ShardId shardId : dataNodeRequest.shardIds()) {
296+
queriedIndices.add(shardId.getIndexName());
297+
}
298+
handler.messageReceived(request, channel, task);
299+
}
300+
}
301+
);
302+
}
303+
try {
304+
for (int i = 0; i < numIndices; i++) {
305+
queriedIndices.clear();
306+
String index = "events-" + i;
307+
try (EsqlQueryResponse resp = run("from events* METADATA _index | WHERE _index == \"" + index + "\" | KEEP timestamp")) {
308+
assertThat(getValuesList(resp), hasSize(indexToNumDocs.get(index)));
309+
}
310+
assertThat(queriedIndices, equalTo(Set.of(index)));
311+
}
312+
} finally {
313+
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
314+
MockTransportService mockTransportService = as(ts, MockTransportService.class);
315+
mockTransportService.clearAllRules();
316+
}
317+
}
318+
}
253319
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.List;
5353
import java.util.Set;
5454
import java.util.function.Consumer;
55+
import java.util.function.Predicate;
5556

5657
import static java.util.Arrays.asList;
5758
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
@@ -179,23 +180,25 @@ public static PhysicalPlan localPlan(
179180
}
180181

181182
/**
182-
* Extracts the ES query for the <code>@timestamp</code> field for the passed plan.
183+
* Extracts a filter that can be used to skip unmatched shards on the coordinator.
183184
*/
184-
public static QueryBuilder requestTimestampFilter(PhysicalPlan plan) {
185-
return detectFilter(plan, "@timestamp");
185+
private static final Set<String> CAN_MATCH_FIELDS = Set.of("@timestamp", "event.ingested");
186+
187+
public static QueryBuilder canMatchFilter(PhysicalPlan plan) {
188+
// metadata field like _index, _tier
189+
return detectFilter(plan, f -> CAN_MATCH_FIELDS.contains(f) || f.startsWith("_"));
186190
}
187191

188192
/**
189193
* Note that since this filter does not have access to SearchStats, it cannot detect if the field is a text field with a delegate.
190194
* We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should
191195
* take care to not use it with TEXT fields.
192196
*/
193-
static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
197+
static QueryBuilder detectFilter(PhysicalPlan plan, Predicate<String> fieldName) {
194198
// first position is the REST filter, the second the query filter
195-
var requestFilter = new QueryBuilder[] { null, null };
196-
199+
final List<QueryBuilder> requestFilters = new ArrayList<>();
197200
plan.forEachDown(FragmentExec.class, fe -> {
198-
requestFilter[0] = fe.esFilter();
201+
requestFilters.add(fe.esFilter());
199202
// detect filter inside the query
200203
fe.fragment().forEachUp(Filter.class, f -> {
201204
// the only filter that can be pushed down is that on top of the relation
@@ -208,21 +211,21 @@ static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
208211
for (var exp : conjunctions) {
209212
var refs = new AttributeSet(exp.references());
210213
// remove literals or attributes that match by name
211-
boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name()));
214+
boolean matchesField = refs.removeIf(e -> fieldName.test(e.name()));
212215
// the expression only contains the target reference
213216
// and the expression is pushable (functions can be fully translated)
214217
if (matchesField && refs.isEmpty() && canPushToSource(exp)) {
215218
matches.add(exp);
216219
}
217220
}
218221
}
219-
if (matches.size() > 0) {
220-
requestFilter[1] = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder();
222+
if (matches.isEmpty() == false) {
223+
requestFilters.add(TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(matches)).asBuilder());
221224
}
222225
});
223226
});
224227

225-
return Queries.combine(FILTER, asList(requestFilter));
228+
return Queries.combine(FILTER, requestFilters);
226229
}
227230

228231
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ protected void sendRequest(
184184
clusterAlias,
185185
concreteIndices,
186186
originalIndices,
187-
PlannerUtils.requestTimestampFilter(dataNodePlan),
187+
PlannerUtils.canMatchFilter(dataNodePlan),
188188
runOnTaskFailure,
189189
ActionListener.releaseAfter(outListener, exchangeSource.addEmptySink())
190190
);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ private QueryBuilder restFilterQuery(String field) {
318318
}
319319

320320
private QueryBuilder filterQueryForTransportNodes(PhysicalPlan plan) {
321-
return PlannerUtils.detectFilter(plan, EMP_NO);
321+
return PlannerUtils.detectFilter(plan, EMP_NO::equals);
322322
}
323323

324324
@Override

0 commit comments

Comments
 (0)