Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferNonNullAggConstraint;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.LocalPropagateEmptyRelation;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceConstantKeywords;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceMissingFieldWithNull;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceFieldWithConstant;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceTopNWithLimitAndSort;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
Expand All @@ -39,9 +38,8 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<Logical
new Batch<>(
"Local rewrite",
Limiter.ONCE,
new ReplaceConstantKeywords(),
new ReplaceTopNWithLimitAndSort(),
new ReplaceMissingFieldWithNull(),
new ReplaceFieldWithConstant(),
new InferIsNotNull(),
new InferNonNullAggConstraint()
),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
Expand All @@ -29,19 +30,21 @@
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/**
* Look for any fields used in the plan that are missing locally and replace them with null.
* Look for any fields used in the plan that are missing or that are constant locally and replace them with null or with the value.
* This should minimize the plan execution, in the best scenario skipping its execution all together.
*/
public class ReplaceMissingFieldWithNull extends ParameterizedRule<LogicalPlan, LogicalPlan, LocalLogicalOptimizerContext> {
public class ReplaceFieldWithConstant extends ParameterizedRule<LogicalPlan, LogicalPlan, LocalLogicalOptimizerContext> {

@Override
public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLogicalOptimizerContext) {
var lookupFieldsBuilder = AttributeSet.builder();
Map<Attribute, Expression> attrToValue = new HashMap<>();
plan.forEachUp(EsRelation.class, esRelation -> {
// Looking only for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index
// is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD.
Expand All @@ -52,6 +55,18 @@ public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLog
if (esRelation.indexMode() == IndexMode.LOOKUP) {
lookupFieldsBuilder.addAll(esRelation.output());
}
// find constant values only in the main indices
else if (esRelation.indexMode() == IndexMode.STANDARD) {
for (Attribute attribute : esRelation.output()) {
if (attribute instanceof FieldAttribute fa) {
// Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead.
var val = localLogicalOptimizerContext.searchStats().constantValue(fa.fieldName());
if (val != null) {
attrToValue.put(attribute, Literal.of(attribute, val));
}
}
}
}
});
AttributeSet lookupFields = lookupFieldsBuilder.build();

Expand All @@ -61,10 +76,10 @@ public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLog
|| localLogicalOptimizerContext.searchStats().exists(f.fieldName())
|| lookupFields.contains(f);

return plan.transformUp(p -> missingToNull(p, shouldBeRetained));
return plan.transformUp(p -> missingToNull(p, shouldBeRetained, attrToValue));
}

private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> shouldBeRetained) {
private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> shouldBeRetained, Map<Attribute, Expression> constants) {
if (plan instanceof EsRelation relation) {
// For any missing field, place an Eval right after the EsRelation to assign null values to that attribute (using the same name
// id!), thus avoiding that InsertFieldExtrations inserts a field extraction later.
Expand Down Expand Up @@ -118,7 +133,10 @@ private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> sh
|| plan instanceof OrderBy
|| plan instanceof RegexExtract
|| plan instanceof TopN) {
return plan.transformExpressionsOnlyUp(FieldAttribute.class, f -> shouldBeRetained.test(f) ? f : Literal.of(f, null));
return plan.transformExpressionsOnlyUp(
FieldAttribute.class,
f -> constants.getOrDefault(f, shouldBeRetained.test(f) ? f : Literal.of(f, null))
);
}

return plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,64 @@ public void testConstantKeywordDissectFilter() {
assertNull(query.query());
}

public void testMatchFunctionWithStatsWherePushable() {
String query = """
from test
| stats c = count(*) where match(last_name, "Smith")
""";
var plan = plannerOptimizer.plan(query);

var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
var exchange = as(agg.child(), ExchangeExec.class);
var stats = as(exchange.child(), EsStatsQueryExec.class);
QueryBuilder expected = new MatchQueryBuilder("last_name", "Smith").lenient(true);
assertThat(stats.query().toString(), equalTo(expected.toString()));
}

public void testMatchFunctionWithStatsPushableAndNonPushableCondition() {
String query = """
from test
| where length(first_name) > 10
| stats c = count(*) where match(last_name, "Smith")
""";
var plan = plannerOptimizer.plan(query);

var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
var exchange = as(agg.child(), ExchangeExec.class);
var aggExec = as(exchange.child(), AggregateExec.class);
var filter = as(aggExec.child(), FilterExec.class);
assertTrue(filter.condition() instanceof GreaterThan);
var fieldExtract = as(filter.child(), FieldExtractExec.class);
var esQuery = as(fieldExtract.child(), EsQueryExec.class);
QueryBuilder expected = new MatchQueryBuilder("last_name", "Smith").lenient(true);
assertThat(esQuery.query().toString(), equalTo(expected.toString()));
}

public void testMatchFunctionStatisWithNonPushableCondition() {
String query = """
from test
| stats c = count(*) where match(last_name, "Smith"), d = count(*) where match(first_name, "Anna")
""";
var plan = plannerOptimizer.plan(query);

var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
var aggregates = agg.aggregates();
assertThat(aggregates.size(), is(2));
for (NamedExpression aggregate : aggregates) {
var alias = as(aggregate, Alias.class);
var count = as(alias.child(), Count.class);
var match = as(count.filter(), Match.class);
}
var exchange = as(agg.child(), ExchangeExec.class);
var aggExec = as(exchange.child(), AggregateExec.class);
var fieldExtract = as(aggExec.child(), FieldExtractExec.class);
var esQuery = as(fieldExtract.child(), EsQueryExec.class);
assertNull(esQuery.query());
}

private QueryBuilder wrapWithSingleQuery(String query, QueryBuilder inner, String fieldName, Source source) {
return FilterTests.singleValueQuery(query, inner, fieldName, source);
}
Expand Down
Loading