Skip to content
7 changes: 7 additions & 0 deletions docs/changelog/124001.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 124001
summary: Use a must boolean statement when pushing down to Lucene when scoring is
also needed
area: ES|QL
type: bug
issues:
- 123967
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ private static PhysicalPlan rewrite(
if (newPushable.size() > 0) { // update the executable with pushable conditions
Query queryDSL = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(newPushable));
QueryBuilder planQuery = queryDSL.asBuilder();
var query = Queries.combine(Queries.Clause.FILTER, asList(queryExec.query(), planQuery));
Queries.Clause combiningQueryClauseType;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix for the bug. Need to add tests.

if (queryExec.hasScoring()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clauseType = queryExec.hasScoring() ? MUST : FILTER

combiningQueryClauseType = Queries.Clause.MUST;
} else {
combiningQueryClauseType = Queries.Clause.FILTER;
}
var query = Queries.combine(combiningQueryClauseType, asList(queryExec.query(), planQuery));
queryExec = new EsQueryExec(
queryExec.source(),
queryExec.indexPattern(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,36 @@ protected PhysicalPlan rule(EsSourceExec plan) {
var docId = new FieldAttribute(plan.source(), EsQueryExec.DOC_ID_FIELD.getName(), EsQueryExec.DOC_ID_FIELD);
final List<Attribute> attributes = new ArrayList<>();
attributes.add(docId);
if (plan.indexMode() == IndexMode.TIME_SERIES) {
Attribute tsid = null, timestamp = null;
for (Attribute attr : plan.output()) {
String name = attr.name();
if (name.equals(MetadataAttribute.TSID_FIELD)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just refactoring to eliminate double iteration over plan.output() attributes.

var outputIterator = plan.output().iterator();
var isTimeSeries = plan.indexMode() == IndexMode.TIME_SERIES;
var keepIterating = true;
Attribute tsid = null, timestamp = null, score = null;

while (keepIterating && outputIterator.hasNext()) {
Attribute attr = outputIterator.next();
if (attr instanceof MetadataAttribute ma) {
if (ma.name().equals(MetadataAttribute.SCORE)) {
score = attr;
} else if (isTimeSeries && ma.name().equals(MetadataAttribute.TSID_FIELD)) {
tsid = attr;
} else if (name.equals(MetadataAttribute.TIMESTAMP_FIELD)) {
timestamp = attr;
}
} else if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
timestamp = attr;
}
keepIterating = score == null || (isTimeSeries && (tsid == null || timestamp == null));
}
if (isTimeSeries) {
if (tsid == null || timestamp == null) {
throw new IllegalStateException("_tsid or @timestamp are missing from the time-series source");
}
attributes.add(tsid);
attributes.add(timestamp);
}
plan.output().forEach(attr -> {
if (attr instanceof MetadataAttribute ma && ma.name().equals(MetadataAttribute.SCORE)) {
attributes.add(ma);
}
});
if (score != null) {
attributes.add(score);
}

return new EsQueryExec(plan.source(), plan.indexPattern(), plan.indexMode(), plan.indexNameWithModes(), attributes, plan.query());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand Down Expand Up @@ -204,6 +205,10 @@ public static boolean isSourceAttribute(Attribute attr) {
return DOC_ID_FIELD.getName().equals(attr.name());
}

public boolean hasScoring() {
return attrs().stream().anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expressions.anyMatch(a -> a ..) -> not just a one liner which avoids the stream stack pollution but it also handles the expression tree navigation.
Not needed here but doesn't hurt.

}

@Override
protected NodeInfo<EsQueryExec> info() {
return NodeInfo.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public enum StatsType {
}

public record Stat(String name, StatsType type, QueryBuilder query) {

public QueryBuilder filter(QueryBuilder sourceQuery) {
return query == null ? sourceQuery : Queries.combine(Queries.Clause.FILTER, asList(sourceQuery, query));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.KeywordEsField;
import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
Expand Down Expand Up @@ -186,9 +185,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
assert esQueryExec.estimatedRowSize() != null : "estimated row size not initialized";
int rowEstimatedSize = esQueryExec.estimatedRowSize();
int limit = esQueryExec.limit() != null ? (Integer) esQueryExec.limit().fold(context.foldCtx()) : NO_LIMIT;
boolean scoring = esQueryExec.attrs()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small refactoring.

.stream()
.anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE));
boolean scoring = esQueryExec.hasScoring();
if ((sorts != null && sorts.isEmpty() == false)) {
List<SortBuilder<?>> sortBuilders = new ArrayList<>(sorts.size());
for (Sort sort : sorts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Fork;
import org.elasticsearch.xpack.esql.plan.logical.Keep;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
Expand All @@ -70,6 +71,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
Expand Down Expand Up @@ -712,16 +714,46 @@ private static Set<String> subfields(Set<String> names) {

private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) {
PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan);
final QueryBuilder filter = request.filter();
final Holder<Boolean> hasScoring = new Holder<>(false);

if (filter != null) {
// a PhysicalPlan completely transformed (no fragments) will have an EsQueryExec in it if the data comes from ES
physicalPlan.forEachDown(EsQueryExec.class, esQueryExec -> {
if (hasScoring.get() == false && esQueryExec.hasScoring()) {
hasScoring.set(true);
}
});
// if there is no EsQueryExec and still scoring is required, search for fragments as well where EsRelations should
// know if there is scoring needed or not
if (hasScoring.get() == false) {
physicalPlan.forEachDown(FragmentExec.class, fragmentExec -> {
fragmentExec.fragment().forEachDown(EsRelation.class, esRelation -> {
if (esRelation.output()
.stream()
.anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE))) {
hasScoring.set(true);
}
});
});
}
}
physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> {
QueryBuilder filter = request.filter();
if (filter != null) {
var fragmentFilter = f.esFilter();
// TODO: have an ESFilter and push down to EsQueryExec / EsSource
// This is an ugly hack to push the filter parameter to Lucene
// TODO: filter integration testing
filter = fragmentFilter != null ? boolQuery().filter(fragmentFilter).must(filter) : filter;
LOGGER.debug("Fold filter {} to EsQueryExec", filter);
f = f.withFilter(filter);
QueryBuilder newFilter;
if (fragmentFilter != null) {
newFilter = hasScoring.get()
? boolQuery().filter(fragmentFilter).must(filter) // a "bool" "must" does influence scoring
: boolQuery().filter(fragmentFilter).filter(filter);// no scoring? then "filter" to completely disable scoring
} else {
newFilter = hasScoring.get() ? filter : boolQuery().filter(filter);
}
LOGGER.debug("Fold filter {} to EsQueryExec", newFilter);
f = f.withFilter(newFilter);
}
return f;
});
Expand Down