Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/124335.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 124335
summary: Change the order of the optimization rules
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V4;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V5;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
Expand Down Expand Up @@ -126,7 +126,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V4.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V5.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that it's probably a good idea to add tests for INLINESTATS ... BY x = bucket(...) and INLINESTATS ... BY x = CATEGORIZE(...). I think the latter cannot work because the join key in BY x = CATEGORIZE(...) is computed during the aggregation, whereas INLINESTATS requires the join key to be present before that.

Cc @jan-elastic , I think we'll have to start out with a limitation where INLINESTATS can't use CATEGORIZE, at least at first; to enable this, I think we'd somehow have to grab the categorizer from the first phase of the query (which computes the STATS) and make it available to the second phase of the query (which performs the joining with every row we see).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I can live with the fact that the first version of INLINESTATS doesn't work with CATEGORIZE.

Just open a GitHub issue for that and it can be resolved later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #124717

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ public enum Cap {
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
* were refactored.
*/
INLINESTATS_V4(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
INLINESTATS_V5(EsqlPlugin.INLINESTATS_FEATURE_FLAG),

/**
* Support partial_results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,15 @@ protected static Batch<LogicalPlan> substitutions() {
// translate metric aggregates after surrogate substitution and replace nested expressions with eval (again)
new TranslateMetricsAggregate(),
new ReplaceAggregateNestedExpressionWithEval(),
// this one needs to be placed before ReplaceAliasingEvalWithProject, so that any potential aliasing eval (eval x = y)
// is not replaced with a Project before the eval to be copied on the left hand side of an InlineJoin
new PropagateInlineEvals(),
new ReplaceRegexMatch(),
new ReplaceTrivialTypeConversions(),
new ReplaceAliasingEvalWithProject(),
new SkipQueryOnEmptyMappings(),
new SubstituteSpatialSurrogates(),
new ReplaceOrderByExpressionWithEval(),
new PropagateInlineEvals()
new ReplaceOrderByExpressionWithEval()
// new NormalizeAggregate(), - waits on https://github.com/elastic/elasticsearch/issues/100634
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,18 @@ protected LogicalPlan rule(InlineJoin plan) {
// check if there's any grouping that uses a reference on the right side
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a small cleanup here.

// if so, look for the source until finding a StubReference
// then copy those on the left side as well

LogicalPlan left = plan.left();
LogicalPlan right = plan.right();

// grouping references
List<Alias> groupingAlias = new ArrayList<>();
// TODO: replace this with AttributeSet
Map<String, ReferenceAttribute> groupingRefs = new LinkedHashMap<>();

// perform only one iteration that does two things
// first checks any aggregate that declares expressions inside the grouping
// second that checks any found references to collect their declaration
right = right.transformDown(p -> {

if (p instanceof Aggregate aggregate) {
// collect references
for (Expression g : aggregate.groupings()) {
Expand All @@ -56,24 +55,26 @@ protected LogicalPlan rule(InlineJoin plan) {
}
}

if (groupingRefs.isEmpty()) {
return p;
}

// find their declaration and remove it
// TODO: this doesn't take into account aliasing
if (p instanceof Eval eval) {
if (groupingRefs.size() > 0) {
List<Alias> fields = eval.fields();
List<Alias> remainingEvals = new ArrayList<>(fields.size());
for (Alias f : fields) {
if (groupingRefs.remove(f.name()) != null) {
groupingAlias.add(f);
} else {
remainingEvals.add(f);
}
}
if (remainingEvals.size() != fields.size()) {
// if all fields are moved, replace the eval
p = remainingEvals.size() == 0 ? eval.child() : new Eval(eval.source(), eval.child(), remainingEvals);
List<Alias> fields = eval.fields();
List<Alias> remainingEvals = new ArrayList<>(fields.size());
for (Alias f : fields) {
// TODO: look into identifying refs by their NameIds instead
if (groupingRefs.remove(f.name()) != null) {
groupingAlias.add(f);
} else {
remainingEvals.add(f);
}
}
if (remainingEvals.size() != fields.size()) {
// if all fields are moved, replace the eval
p = remainingEvals.size() == 0 ? eval.child() : new Eval(eval.source(), eval.child(), remainingEvals);
}
}
return p;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
private static EnrichResolution enrichResolution;
private static final LiteralsOnTheRight LITERALS_ON_THE_RIGHT = new LiteralsOnTheRight();

private static class SubstitutionOnlyOptimizer extends LogicalPlanOptimizer {
static SubstitutionOnlyOptimizer INSTANCE = new SubstitutionOnlyOptimizer(unboundLogicalOptimizerContext());
public static class SubstitutionOnlyOptimizer extends LogicalPlanOptimizer {
public static SubstitutionOnlyOptimizer INSTANCE = new SubstitutionOnlyOptimizer(unboundLogicalOptimizerContext());

SubstitutionOnlyOptimizer(LogicalOptimizerContext optimizerContext) {
super(optimizerContext);
Expand Down Expand Up @@ -6078,10 +6078,6 @@ public void testSimplifyComparisonArithmeticWithFloatsAndDirectionChange() {
doTestSimplifyComparisonArithmetics("float * -2 < 4", "float", GT, -2d);
}

private void assertNullLiteral(Expression expression) {
assertNull(as(expression, Literal.class).value());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/108519")
public void testSimplifyComparisonArithmeticSkippedOnIntegerArithmeticalOverflow() {
assertNotSimplified("integer - 1 " + randomBinaryComparison() + " " + Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.logical;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizerTests;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.junit.BeforeClass;

import java.util.List;
import java.util.Map;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class PropagateInlineEvalsTests extends ESTestCase {

private static EsqlParser parser;
private static Map<String, EsField> mapping;
private static Analyzer analyzer;

@BeforeClass
public static void init() {
parser = new EsqlParser();
mapping = loadMapping("mapping-basic.json");
EsIndex test = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD));
IndexResolution getIndexResult = IndexResolution.valid(test);
analyzer = new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
getIndexResult,
defaultLookupResolution(),
new EnrichResolution()
),
TEST_VERIFIER
);
}

/**
* Expects after running the {@link LogicalPlanOptimizer#substitutions()}:
*
* Limit[1000[INTEGER],false]
* \_InlineJoin[LEFT,[y{r}#10],[y{r}#10],[y{r}#10]]
* |_Eval[[gender{f}#13 AS y]]
* | \_EsqlProject[[emp_no{f}#11, languages{f}#14, gender{f}#13]]
* | \_EsRelation[test][_meta_field{f}#17, emp_no{f}#11, first_name{f}#12, ..]
* \_Aggregate[STANDARD,[y{r}#10],[MAX(languages{f}#14,true[BOOLEAN]) AS max_lang, y{r}#10]]
* \_StubRelation[[emp_no{f}#11, languages{f}#14, gender{f}#13, y{r}#10]]
*/
public void testGroupingAliasingMoved_To_LeftSideOfJoin() {
var plan = plan("""
from test
| keep emp_no, languages, gender
| inlinestats max_lang = MAX(languages) BY y = gender
""", LogicalPlanOptimizerTests.SubstitutionOnlyOptimizer.INSTANCE);

var limit = as(plan, Limit.class);
var inline = as(limit.child(), InlineJoin.class);
var leftEval = as(inline.left(), Eval.class);
var project = as(leftEval.child(), EsqlProject.class);

assertThat(Expressions.names(project.projections()), contains("emp_no", "languages", "gender"));

as(project.child(), EsRelation.class);
var rightAgg = as(inline.right(), Aggregate.class);
var stubRelation = as(rightAgg.child(), StubRelation.class);
assertThat(Expressions.names(stubRelation.expressions()), contains("emp_no", "languages", "gender", "y"));

var groupings = rightAgg.groupings();
var aggs = rightAgg.aggregates();
var ref = as(groupings.get(0), ReferenceAttribute.class);
assertThat(aggs.get(1), is(ref));
assertThat(leftEval.fields(), hasSize(1));
assertThat(leftEval.fields().get(0).toAttribute(), is(ref)); // the only grouping is passed as eval on the join's left hand side
assertThat(leftEval.fields().get(0).name(), is("y"));
}

/**
* Expects after running the {@link LogicalPlanOptimizer#substitutions()}:
* Limit[1000[INTEGER],false]
* \_InlineJoin[LEFT,[f{r}#18, g{r}#21, first_name_l{r}#9],[f{r}#18, g{r}#21, first_name_l{r}#9],[f{r}#18, g{r}#21, first_name_l{
* r}#9]]
* |_Eval[[LEFT(last_name{f}#27,1[INTEGER]) AS f, gender{f}#25 AS g]]
* | \_Eval[[LEFT(first_name{f}#24,1[INTEGER]) AS first_name_l]]
* | \_EsqlProject[[emp_no{f}#23, languages{f}#26, gender{f}#25, last_name{f}#27, first_name{f}#24]]
* | \_EsRelation[test][_meta_field{f}#29, emp_no{f}#23, first_name{f}#24, ..]
* \_Aggregate[STANDARD,[f{r}#18, g{r}#21, first_name_l{r}#9],[MAX(languages{f}#26,true[BOOLEAN]) AS max_lang, MIN(languages{f}
* #26,true[BOOLEAN]) AS min_lang, f{r}#18, g{r}#21, first_name_l{r}#9]]
* \_StubRelation[[emp_no{f}#23, languages{f}#26, gender{f}#25, last_name{f}#27, first_name{f}#24, first_name_l{r}#9, f{r}#18, g
* {r}#21]]
*/
public void testGroupingAliasingMoved_To_LeftSideOfJoin_WithExpression() {
var plan = plan("""
from test
| keep emp_no, languages, gender, last_name, first_name
| eval first_name_l = left(first_name, 1)
| inlinestats max_lang = MAX(languages), min_lang = MIN(languages) BY f = left(last_name, 1), g = gender, first_name_l
""", LogicalPlanOptimizerTests.SubstitutionOnlyOptimizer.INSTANCE);

var limit = as(plan, Limit.class);
var inline = as(limit.child(), InlineJoin.class);
var leftEval1 = as(inline.left(), Eval.class);
var leftEval2 = as(leftEval1.child(), Eval.class);
var project = as(leftEval2.child(), EsqlProject.class);

assertThat(Expressions.names(project.projections()), contains("emp_no", "languages", "gender", "last_name", "first_name"));

as(project.child(), EsRelation.class);
var rightAgg = as(inline.right(), Aggregate.class);
var stubRelation = as(rightAgg.child(), StubRelation.class);
assertThat(
Expressions.names(stubRelation.expressions()),
contains("emp_no", "languages", "gender", "last_name", "first_name", "first_name_l", "f", "g")
);

var groupings = rightAgg.groupings();
assertThat(groupings, hasSize(3));
var aggs = rightAgg.aggregates();
var ref1 = as(groupings.get(0), ReferenceAttribute.class); // f = left(last_name, 1)
var ref2 = as(groupings.get(1), ReferenceAttribute.class); // g = gender
var ref3 = as(groupings.get(2), ReferenceAttribute.class); // first_name_l
assertThat(aggs.get(2), is(ref1));
assertThat(aggs.get(3), is(ref2));
assertThat(leftEval1.fields(), hasSize(2));
assertThat(leftEval1.fields().get(0).toAttribute(), is(ref1)); // f = left(last_name, 1)
assertThat(leftEval1.fields().get(0).name(), is("f"));
assertThat(leftEval1.fields().get(1).toAttribute(), is(ref2)); // g = gender
assertThat(leftEval1.fields().get(1).name(), is("g"));
assertThat(leftEval2.fields(), hasSize(1));
assertThat(leftEval2.fields().get(0).toAttribute(), is(ref3)); // first_name_l is in the second eval (the one the user added)
assertThat(leftEval2.fields().get(0).name(), is("first_name_l"));
}

private LogicalPlan plan(String query, LogicalPlanOptimizer optimizer) {
return optimizer.optimize(analyzer.analyze(parser.createStatement(query)));
}

@Override
protected List<String> filteredWarnings() {
return withDefaultLimitWarning(super.filteredWarnings());
}
}