Skip to content

Commit 0c107c5

Browse files
committed
Use a "must" instead of "filter" when building the pushed down filter
AND when scoring is needed; wrap the request filter with a bool filter in case the query itself doesn't need scoring.
1 parent 4e00998 commit 0c107c5

File tree

6 files changed

+70
-23
lines changed

6 files changed

+70
-23
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,13 @@ private static PhysicalPlan rewrite(
101101
if (newPushable.size() > 0) { // update the executable with pushable conditions
102102
Query queryDSL = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(newPushable));
103103
QueryBuilder planQuery = queryDSL.asBuilder();
104-
var query = Queries.combine(Queries.Clause.FILTER, asList(queryExec.query(), planQuery));
104+
Queries.Clause combiningQueryClauseType;
105+
if (queryExec.hasScoring()) {
106+
combiningQueryClauseType = Queries.Clause.MUST;
107+
} else {
108+
combiningQueryClauseType = Queries.Clause.FILTER;
109+
}
110+
var query = Queries.combine(combiningQueryClauseType, asList(queryExec.query(), planQuery));
105111
queryExec = new EsQueryExec(
106112
queryExec.source(),
107113
queryExec.indexPattern(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,38 @@ protected PhysicalPlan rule(EsSourceExec plan) {
3232
var docId = new FieldAttribute(plan.source(), EsQueryExec.DOC_ID_FIELD.getName(), EsQueryExec.DOC_ID_FIELD);
3333
final List<Attribute> attributes = new ArrayList<>();
3434
attributes.add(docId);
35-
if (plan.indexMode() == IndexMode.TIME_SERIES) {
36-
Attribute tsid = null, timestamp = null;
37-
for (Attribute attr : plan.output()) {
38-
String name = attr.name();
39-
if (name.equals(MetadataAttribute.TSID_FIELD)) {
40-
tsid = attr;
41-
} else if (name.equals(MetadataAttribute.TIMESTAMP_FIELD)) {
42-
timestamp = attr;
35+
36+
var outputIterator = plan.output().iterator();
37+
var isTimeSeries = plan.indexMode() == IndexMode.TIME_SERIES;
38+
var keepIterating = true;
39+
Attribute tsid = null, timestamp = null, score = null;
40+
41+
while (keepIterating && outputIterator.hasNext()) {
42+
Attribute attr = outputIterator.next();
43+
if (attr instanceof MetadataAttribute ma) {
44+
if (ma.name().equals(MetadataAttribute.SCORE)) {
45+
score = attr;
46+
} else if (isTimeSeries) {
47+
if (ma.name().equals(MetadataAttribute.TSID_FIELD)) {
48+
tsid = attr;
49+
} else if (ma.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
50+
timestamp = attr;
51+
}
4352
}
4453
}
54+
keepIterating = score == null || (isTimeSeries && (tsid == null || timestamp == null));
55+
}
56+
if (isTimeSeries) {
4557
if (tsid == null || timestamp == null) {
4658
throw new IllegalStateException("_tsid or @timestamp are missing from the time-series source");
4759
}
4860
attributes.add(tsid);
4961
attributes.add(timestamp);
5062
}
51-
plan.output().forEach(attr -> {
52-
if (attr instanceof MetadataAttribute ma && ma.name().equals(MetadataAttribute.SCORE)) {
53-
attributes.add(ma);
54-
}
55-
});
63+
if (score != null) {
64+
attributes.add(score);
65+
}
66+
5667
return new EsQueryExec(plan.source(), plan.indexPattern(), plan.indexMode(), plan.indexNameWithModes(), attributes, plan.query());
5768
}
5869
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.xpack.esql.core.expression.Attribute;
2222
import org.elasticsearch.xpack.esql.core.expression.Expression;
2323
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
24+
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
2425
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2526
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
2627
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -204,6 +205,10 @@ public static boolean isSourceAttribute(Attribute attr) {
204205
return DOC_ID_FIELD.getName().equals(attr.name());
205206
}
206207

208+
public boolean hasScoring() {
209+
return attrs().stream().anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE));
210+
}
211+
207212
@Override
208213
protected NodeInfo<EsQueryExec> info() {
209214
return NodeInfo.create(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsStatsQueryExec.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public enum StatsType {
3737
}
3838

3939
public record Stat(String name, StatsType type, QueryBuilder query) {
40-
4140
public QueryBuilder filter(QueryBuilder sourceQuery) {
4241
return query == null ? sourceQuery : Queries.combine(Queries.Clause.FILTER, asList(sourceQuery, query));
4342
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.elasticsearch.xpack.esql.core.expression.Expression;
5050
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
5151
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
52-
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
5352
import org.elasticsearch.xpack.esql.core.type.DataType;
5453
import org.elasticsearch.xpack.esql.core.type.KeywordEsField;
5554
import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
@@ -186,9 +185,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
186185
assert esQueryExec.estimatedRowSize() != null : "estimated row size not initialized";
187186
int rowEstimatedSize = esQueryExec.estimatedRowSize();
188187
int limit = esQueryExec.limit() != null ? (Integer) esQueryExec.limit().fold(context.foldCtx()) : NO_LIMIT;
189-
boolean scoring = esQueryExec.attrs()
190-
.stream()
191-
.anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE));
188+
boolean scoring = esQueryExec.hasScoring();
192189
if ((sorts != null && sorts.isEmpty() == false)) {
193190
List<SortBuilder<?>> sortBuilders = new ArrayList<>(sorts.size());
194191
for (Sort sort : sorts) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.elasticsearch.xpack.esql.plan.IndexPattern;
6060
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
6161
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
62+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
6263
import org.elasticsearch.xpack.esql.plan.logical.Fork;
6364
import org.elasticsearch.xpack.esql.plan.logical.Keep;
6465
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@@ -70,6 +71,7 @@
7071
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
7172
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
7273
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
74+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
7375
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
7476
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
7577
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
@@ -712,16 +714,43 @@ private static Set<String> subfields(Set<String> names) {
712714

713715
private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) {
714716
PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan);
717+
final QueryBuilder filter = request.filter();
718+
final Holder<Boolean> hasScoring = new Holder<>(false);
719+
720+
if (filter != null) {
721+
physicalPlan.forEachDown(EsQueryExec.class, esQueryExec -> {
722+
if (hasScoring.get() == false && esQueryExec.hasScoring()) {
723+
hasScoring.set(true);
724+
}
725+
});
726+
if (hasScoring.get() == false) {
727+
physicalPlan.forEachDown(FragmentExec.class, fragmentExec -> {
728+
fragmentExec.fragment().forEachDown(EsRelation.class, esRelation -> {
729+
if (esRelation.output()
730+
.stream()
731+
.anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE))) {
732+
hasScoring.set(true);
733+
}
734+
});
735+
});
736+
}
737+
}
715738
physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> {
716-
QueryBuilder filter = request.filter();
717739
if (filter != null) {
718740
var fragmentFilter = f.esFilter();
719741
// TODO: have an ESFilter and push down to EsQueryExec / EsSource
720742
// This is an ugly hack to push the filter parameter to Lucene
721743
// TODO: filter integration testing
722-
filter = fragmentFilter != null ? boolQuery().filter(fragmentFilter).must(filter) : filter;
723-
LOGGER.debug("Fold filter {} to EsQueryExec", filter);
724-
f = f.withFilter(filter);
744+
QueryBuilder newFilter;
745+
if (fragmentFilter != null) {
746+
newFilter = hasScoring.get()
747+
? boolQuery().filter(fragmentFilter).must(filter)
748+
: boolQuery().filter(fragmentFilter).filter(filter);
749+
} else {
750+
newFilter = hasScoring.get() ? filter : boolQuery().filter(filter);
751+
}
752+
LOGGER.debug("Fold filter {} to EsQueryExec", newFilter);
753+
f = f.withFilter(newFilter);
725754
}
726755
return f;
727756
});

0 commit comments

Comments
 (0)