Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ private static PhysicalPlan rewrite(
if (newPushable.size() > 0) { // update the executable with pushable conditions
Query queryDSL = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(newPushable));
QueryBuilder planQuery = queryDSL.asBuilder();
var query = Queries.combine(Queries.Clause.FILTER, asList(queryExec.query(), planQuery));
Queries.Clause combiningQueryClauseType;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix for the bug. Need to add tests.

if (queryExec.hasScoring()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clauseType = queryExec.hasScoring() ? MUST : FILTER

combiningQueryClauseType = Queries.Clause.MUST;
} else {
combiningQueryClauseType = Queries.Clause.FILTER;
}
var query = Queries.combine(combiningQueryClauseType, asList(queryExec.query(), planQuery));
queryExec = new EsQueryExec(
queryExec.source(),
queryExec.indexPattern(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,38 @@ protected PhysicalPlan rule(EsSourceExec plan) {
var docId = new FieldAttribute(plan.source(), EsQueryExec.DOC_ID_FIELD.getName(), EsQueryExec.DOC_ID_FIELD);
final List<Attribute> attributes = new ArrayList<>();
attributes.add(docId);
if (plan.indexMode() == IndexMode.TIME_SERIES) {
Attribute tsid = null, timestamp = null;
for (Attribute attr : plan.output()) {
String name = attr.name();
if (name.equals(MetadataAttribute.TSID_FIELD)) {
tsid = attr;
} else if (name.equals(MetadataAttribute.TIMESTAMP_FIELD)) {
timestamp = attr;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just refactoring to eliminate double iteration over plan.output() attributes.

var outputIterator = plan.output().iterator();
var isTimeSeries = plan.indexMode() == IndexMode.TIME_SERIES;
var keepIterating = true;
Attribute tsid = null, timestamp = null, score = null;

while (keepIterating && outputIterator.hasNext()) {
Attribute attr = outputIterator.next();
if (attr instanceof MetadataAttribute ma) {
if (ma.name().equals(MetadataAttribute.SCORE)) {
score = attr;
} else if (isTimeSeries) {
if (ma.name().equals(MetadataAttribute.TSID_FIELD)) {
tsid = attr;
} else if (ma.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
timestamp = attr;
}
}
}
keepIterating = score == null || (isTimeSeries && (tsid == null || timestamp == null));
}
if (isTimeSeries) {
if (tsid == null || timestamp == null) {
throw new IllegalStateException("_tsid or @timestamp are missing from the time-series source");
}
attributes.add(tsid);
attributes.add(timestamp);
}
plan.output().forEach(attr -> {
if (attr instanceof MetadataAttribute ma && ma.name().equals(MetadataAttribute.SCORE)) {
attributes.add(ma);
}
});
if (score != null) {
attributes.add(score);
}

return new EsQueryExec(plan.source(), plan.indexPattern(), plan.indexMode(), plan.indexNameWithModes(), attributes, plan.query());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand Down Expand Up @@ -204,6 +205,10 @@ public static boolean isSourceAttribute(Attribute attr) {
return DOC_ID_FIELD.getName().equals(attr.name());
}

public boolean hasScoring() {
return attrs().stream().anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expressions.anyMatch(a -> a ..) -> not just a one liner which avoids the stream stack pollution but it also handles the expression tree navigation.
Not needed here but doesn't hurt.

}

@Override
protected NodeInfo<EsQueryExec> info() {
return NodeInfo.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public enum StatsType {
}

public record Stat(String name, StatsType type, QueryBuilder query) {

public QueryBuilder filter(QueryBuilder sourceQuery) {
return query == null ? sourceQuery : Queries.combine(Queries.Clause.FILTER, asList(sourceQuery, query));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.KeywordEsField;
import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
Expand Down Expand Up @@ -186,9 +185,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
assert esQueryExec.estimatedRowSize() != null : "estimated row size not initialized";
int rowEstimatedSize = esQueryExec.estimatedRowSize();
int limit = esQueryExec.limit() != null ? (Integer) esQueryExec.limit().fold(context.foldCtx()) : NO_LIMIT;
boolean scoring = esQueryExec.attrs()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small refactoring.

.stream()
.anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE));
boolean scoring = esQueryExec.hasScoring();
if ((sorts != null && sorts.isEmpty() == false)) {
List<SortBuilder<?>> sortBuilders = new ArrayList<>(sorts.size());
for (Sort sort : sorts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Fork;
import org.elasticsearch.xpack.esql.plan.logical.Keep;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
Expand All @@ -70,6 +71,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
Expand Down Expand Up @@ -712,16 +714,43 @@ private static Set<String> subfields(Set<String> names) {

private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) {
PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan);
final QueryBuilder filter = request.filter();
final Holder<Boolean> hasScoring = new Holder<>(false);

if (filter != null) {
physicalPlan.forEachDown(EsQueryExec.class, esQueryExec -> {
if (hasScoring.get() == false && esQueryExec.hasScoring()) {
hasScoring.set(true);
}
});
if (hasScoring.get() == false) {
physicalPlan.forEachDown(FragmentExec.class, fragmentExec -> {
fragmentExec.fragment().forEachDown(EsRelation.class, esRelation -> {
if (esRelation.output()
.stream()
.anyMatch(a -> a instanceof MetadataAttribute && a.name().equals(MetadataAttribute.SCORE))) {
hasScoring.set(true);
}
});
});
}
}
physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> {
QueryBuilder filter = request.filter();
if (filter != null) {
var fragmentFilter = f.esFilter();
// TODO: have an ESFilter and push down to EsQueryExec / EsSource
// This is an ugly hack to push the filter parameter to Lucene
// TODO: filter integration testing
filter = fragmentFilter != null ? boolQuery().filter(fragmentFilter).must(filter) : filter;
LOGGER.debug("Fold filter {} to EsQueryExec", filter);
f = f.withFilter(filter);
QueryBuilder newFilter;
if (fragmentFilter != null) {
newFilter = hasScoring.get()
? boolQuery().filter(fragmentFilter).must(filter)
: boolQuery().filter(fragmentFilter).filter(filter);
} else {
newFilter = hasScoring.get() ? filter : boolQuery().filter(filter);
}
LOGGER.debug("Fold filter {} to EsQueryExec", newFilter);
f = f.withFilter(newFilter);
}
return f;
});
Expand Down