Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/127549.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 127549
summary: Add local optimizations for `constant_keyword`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferNonNullAggConstraint;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.LocalPropagateEmptyRelation;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceMissingFieldWithNull;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceFieldWithConstantOrNull;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceTopNWithLimitAndSort;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
Expand All @@ -39,7 +39,7 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<Logical
"Local rewrite",
Limiter.ONCE,
new ReplaceTopNWithLimitAndSort(),
new ReplaceMissingFieldWithNull(),
new ReplaceFieldWithConstantOrNull(),
new InferIsNotNull(),
new InferNonNullAggConstraint()
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
Expand All @@ -29,21 +30,23 @@
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/**
* Look for any fields used in the plan that are missing locally and replace them with null.
* Look for any fields used in the plan that are missing and replaces them with null or look for fields that are constant.
* This should minimize the plan execution, in the best scenario skipping its execution all together.
*/
public class ReplaceMissingFieldWithNull extends ParameterizedRule<LogicalPlan, LogicalPlan, LocalLogicalOptimizerContext> {
public class ReplaceFieldWithConstantOrNull extends ParameterizedRule<LogicalPlan, LogicalPlan, LocalLogicalOptimizerContext> {

@Override
public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLogicalOptimizerContext) {
var lookupFieldsBuilder = AttributeSet.builder();
Map<Attribute, Expression> attrToConstant = new HashMap<>();
plan.forEachUp(EsRelation.class, esRelation -> {
// Looking only for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index
// Looking for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index
// is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD.
// It seems like we could instead just look for JOINs and walk down their right hand side to find lookup fields - but this does
// not work as this rule also gets called just on the right hand side of a JOIN, which means that we don't always know that
Expand All @@ -52,6 +55,18 @@ public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLog
if (esRelation.indexMode() == IndexMode.LOOKUP) {
lookupFieldsBuilder.addAll(esRelation.output());
}
// find constant values only in the main indices
else if (esRelation.indexMode() == IndexMode.STANDARD) {
for (Attribute attribute : esRelation.output()) {
if (attribute instanceof FieldAttribute fa) {
// Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead.
var val = localLogicalOptimizerContext.searchStats().constantValue(fa.fieldName());
if (val != null) {
attrToConstant.put(attribute, Literal.of(attribute, val));
}
}
}
}
});
AttributeSet lookupFields = lookupFieldsBuilder.build();

Expand All @@ -61,10 +76,14 @@ public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLog
|| localLogicalOptimizerContext.searchStats().exists(f.fieldName())
|| lookupFields.contains(f);

return plan.transformUp(p -> missingToNull(p, shouldBeRetained));
return plan.transformUp(p -> replaceWithNullOrConstant(p, shouldBeRetained, attrToConstant));
}

private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> shouldBeRetained) {
private LogicalPlan replaceWithNullOrConstant(
LogicalPlan plan,
Predicate<FieldAttribute> shouldBeRetained,
Map<Attribute, Expression> attrToConstant
) {
if (plan instanceof EsRelation relation) {
// For any missing field, place an Eval right after the EsRelation to assign null values to that attribute (using the same name
// id!), thus avoiding that InsertFieldExtrations inserts a field extraction later.
Expand Down Expand Up @@ -118,7 +137,13 @@ private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> sh
|| plan instanceof OrderBy
|| plan instanceof RegexExtract
|| plan instanceof TopN) {
return plan.transformExpressionsOnlyUp(FieldAttribute.class, f -> shouldBeRetained.test(f) ? f : Literal.of(f, null));
return plan.transformExpressionsOnlyUp(FieldAttribute.class, f -> {
if (attrToConstant.containsKey(f)) {// handle constant values field and use the value itself instead
return attrToConstant.get(f);
} else {// handle missing fields and replace them with null
return shouldBeRetained.test(f) ? f : Literal.of(f, null);
}
});
}

return plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,41 @@ public boolean canUseEqualityOnSyntheticSourceDelegate(String name, String value
return true;
}

public String constantValue(String name) {
String val = null;
for (SearchExecutionContext ctx : contexts) {
MappedFieldType f = ctx.getFieldType(name);
if (f == null) {
return null;
}
if (f instanceof ConstantFieldType cf) {
var fetcher = cf.valueFetcher(ctx, null);
String thisVal = null;
try {
// since the value is a constant, the doc _should_ be irrelevant
List<Object> vals = fetcher.fetchValues(null, -1, null);
Object objVal = vals.size() == 1 ? vals.get(0) : null;
// we are considering only string values for now, since this can return "strange" things,
// see IndexModeFieldType
thisVal = objVal instanceof String ? (String) objVal : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we tend to use BytesRefs encoded as utf-8.

} catch (IOException iox) {}

if (thisVal == null) {
// Value not yet set
return null;
}
if (val == null) {
val = thisVal;
} else if (thisVal.equals(val) == false) {
return null;
}
} else {
return null;
}
}
return val;
}

private interface DocCountTester {
Boolean test(LeafReader leafReader) throws IOException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ public interface SearchStats {

boolean canUseEqualityOnSyntheticSourceDelegate(String name, String value);

/**
* Returns the value for a field if it's a constant (eg. a constant_keyword with only one value for the involved indices).
* NULL if the field is not a constant.
*/
default String constantValue(String name) {
return null;
}

/**
* When there are no search stats available, for example when there are no search contexts, we have static results.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.Stat;
Expand All @@ -71,6 +72,7 @@
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
Expand Down Expand Up @@ -152,6 +154,18 @@ public boolean isSingleValue(String field) {
}
};

private final SearchStats CONSTANT_K_STATS = new TestSearchStats() {
@Override
public boolean isSingleValue(String field) {
return true;
}

@Override
public String constantValue(String name) {
return name.startsWith("constant_keyword") ? "foo" : null;
}
};

@ParametersFactory(argumentFormatting = PARAM_FORMATTING)
public static List<Object[]> readScriptSpec() {
return settings().stream().map(t -> {
Expand Down Expand Up @@ -1858,6 +1872,113 @@ public void testPushDownFieldExtractToTimeSeriesSource() {
assertTrue(timeSeriesSource.attrs().stream().noneMatch(EsQueryExec::isSourceAttribute));
}

/**
* LimitExec[1000[INTEGER]]
* \_ExchangeExec[[!alias_integer, boolean{f}#415, byte{f}#416, constant_keyword-foo{f}#417, date{f}#418, date_nanos{f}#419,
* double{f}#420, float{f}#421, half_float{f}#422, integer{f}#424, ip{f}#425, keyword{f}#426, long{f}#427, scaled_float{f}#423,
* !semantic_text, short{f}#429, text{f}#430, unsigned_long{f}#428, version{f}#431, wildcard{f}#432], false]
* \_ProjectExec[[!alias_integer, boolean{f}#415, byte{f}#416, constant_keyword-foo{f}#417, date{f}#418, date_nanos{f}#419,
* double{f}#420, float{f}#421, half_float{f}#422, integer{f}#424, ip{f}#425, keyword{f}#426, long{f}#427, scaled_float{f}#423,
* !semantic_text, short{f}#429, text{f}#430, unsigned_long{f}#428, version{f}#431, wildcard{f}#432]]
* \_FieldExtractExec[!alias_integer, boolean{f}#415, byte{f}#416, consta..]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#434], limit[1000], sort[] estimatedRowSize[412]
*/
public void testConstantKeywordWithMatchingFilter() {
String queryText = """
from test
| where `constant_keyword-foo` == "foo"
""";
var analyzer = makeAnalyzer("mapping-all-types.json");
var plan = plannerOptimizer.plan(queryText, CONSTANT_K_STATS, analyzer);

var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
var project = as(exchange.child(), ProjectExec.class);
var field = as(project.child(), FieldExtractExec.class);
var query = as(field.child(), EsQueryExec.class);
assertThat(as(query.limit(), Literal.class).value(), is(1000));
assertNull(query.query());
}

/**
* LimitExec[1000[INTEGER]]
* \_ExchangeExec[[!alias_integer, boolean{f}#4, byte{f}#5, constant_keyword-foo{f}#6, date{f}#7, date_nanos{f}#8, double{f}#9,
* float{f}#10, half_float{f}#11, integer{f}#13, ip{f}#14, keyword{f}#15, long{f}#16, scaled_float{f}#12, !semantic_text,
* short{f}#18, text{f}#19, unsigned_long{f}#17, version{f}#20, wildcard{f}#21], false]
* \_LocalSourceExec[[!alias_integer, boolean{f}#4, byte{f}#5, constant_keyword-foo{f}#6, date{f}#7, date_nanos{f}#8, double{f}#9,
* float{f}#10, half_float{f}#11, integer{f}#13, ip{f}#14, keyword{f}#15, long{f}#16, scaled_float{f}#12, !semantic_text,
* short{f}#18, text{f}#19, unsigned_long{f}#17, version{f}#20, wildcard{f}#21], EMPTY]
*/
public void testConstantKeywordWithNonMatchingFilter() {
String queryText = """
from test
| where `constant_keyword-foo` == "non-matching"
""";
var analyzer = makeAnalyzer("mapping-all-types.json");
var plan = plannerOptimizer.plan(queryText, CONSTANT_K_STATS, analyzer);

var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
var source = as(exchange.child(), LocalSourceExec.class);
}

/**
* LimitExec[1000[INTEGER]]
* \_ExchangeExec[[!alias_integer, boolean{f}#6, byte{f}#7, constant_keyword-foo{r}#25, date{f}#9, date_nanos{f}#10, double{f}#1...
* \_ProjectExec[[!alias_integer, boolean{f}#6, byte{f}#7, constant_keyword-foo{r}#25, date{f}#9, date_nanos{f}#10, double{f}#1...
* \_FieldExtractExec[!alias_integer, boolean{f}#6, byte{f}#7, date{f}#9,
* \_LimitExec[1000[INTEGER]]
* \_FilterExec[constant_keyword-foo{r}#25 == [66 6f 6f][KEYWORD]]
* \_MvExpandExec[constant_keyword-foo{f}#8,constant_keyword-foo{r}#25]
* \_FieldExtractExec[constant_keyword-foo{f}#8]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#26], limit[], sort[] estimatedRowSize[412]
*/
public void testConstantKeywordExpandFilter() {
String queryText = """
from test
| mv_expand `constant_keyword-foo`
| where `constant_keyword-foo` == "foo"
""";
var analyzer = makeAnalyzer("mapping-all-types.json");
var plan = plannerOptimizer.plan(queryText, CONSTANT_K_STATS, analyzer);

var limit = as(plan, LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
var project = as(exchange.child(), ProjectExec.class);
var fieldExtract = as(project.child(), FieldExtractExec.class);
var limit2 = as(fieldExtract.child(), LimitExec.class);
var filter = as(limit2.child(), FilterExec.class);
var expand = as(filter.child(), MvExpandExec.class);
var field = as(expand.child(), FieldExtractExec.class); // MV_EXPAND is not optimized yet (it doesn't accept literals)
as(field.child(), EsQueryExec.class);
}

/**
* DissectExec[constant_keyword-foo{f}#8,Parser[pattern=%{bar}, appendSeparator=, ...
* \_LimitExec[1000[INTEGER]]
* \_ExchangeExec[[!alias_integer, boolean{f}#6, byte{f}#7, constant_keyword-foo{f}#8, date{f}#9, date_nanos{f}#10, double{f}#11...
* \_ProjectExec[[!alias_integer, boolean{f}#6, byte{f}#7, constant_keyword-foo{f}#8, date{f}#9, date_nanos{f}#10, double{f}#11...
* \_FieldExtractExec[!alias_integer, boolean{f}#6, byte{f}#7, constant_k..]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#25], limit[1000], sort[] estimatedRowSize[462]
*/
public void testConstantKeywordDissectFilter() {
String queryText = """
from test
| dissect `constant_keyword-foo` "%{bar}"
| where `constant_keyword-foo` == "foo"
""";
var analyzer = makeAnalyzer("mapping-all-types.json");
var plan = plannerOptimizer.plan(queryText, CONSTANT_K_STATS, analyzer);

var dissect = as(plan, DissectExec.class);
var limit = as(dissect.child(), LimitExec.class);
var exchange = as(limit.child(), ExchangeExec.class);
var project = as(exchange.child(), ProjectExec.class);
var field = as(project.child(), FieldExtractExec.class);
var query = as(field.child(), EsQueryExec.class);
assertNull(query.query());
}

public void testMatchFunctionWithStatsWherePushable() {
String query = """
from test
Expand Down
Loading
Loading