Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ca954c4
Push down MvExpand past Project
kanoshiou Oct 10, 2025
55c37c2
Update docs/changelog/136398.yaml
kanoshiou Oct 10, 2025
91f86d6
Merge remote-tracking branch 'origin/main' into push-down-mv_expand-p…
kanoshiou Oct 24, 2025
a8b3897
Insert an eval if target in mv_expand is aliased in child project
kanoshiou Oct 26, 2025
83aa387
Add tests for #136596
kanoshiou Oct 26, 2025
2095c44
Update docs/changelog/136398.yaml
kanoshiou Oct 26, 2025
8d64398
Merge branch 'refs/heads/main' into push-down-mv_expand-past-project
kanoshiou Oct 26, 2025
a46cc7d
Merge branch 'refs/heads/main' into push-down-mv_expand-past-project
kanoshiou Nov 5, 2025
471124c
Merge branch 'refs/heads/main' into push-down-mv_expand-past-project
kanoshiou Nov 9, 2025
d08a6ea
refactor(esql): Improve MvExpand push-down logic in logical optimizer
kanoshiou Nov 9, 2025
075e826
refactor(esql): Improve MvExpand push-down logic in logical optimizer
kanoshiou Nov 9, 2025
6018ce0
Merge branch 'refs/heads/main' into push-down-mv_expand-past-project
kanoshiou Nov 9, 2025
3afecaa
refactor(esql): Improve MvExpand push-down logic in logical optimizer
kanoshiou Nov 10, 2025
2b3d55a
refactor(esql): Improve MvExpand push-down logic in logical optimizer
kanoshiou Nov 10, 2025
8e69eb0
Merge branch 'main' into push-down-mv_expand-past-project
kanoshiou Nov 10, 2025
faea78b
refactor(esql): Improve MvExpand push-down logic in PushDownMvExpandP…
kanoshiou Nov 10, 2025
7f8faf6
Merge branch 'refs/heads/main' into push-down-mv_expand-past-project
kanoshiou Nov 25, 2025
92dacb5
fix: refine `PushDownMvExpandPastProject` logic by correcting the ali…
kanoshiou Nov 28, 2025
eb3f373
Merge branch 'refs/heads/main' into push-down-mv_expand-past-project
kanoshiou Nov 28, 2025
adab9f8
fix(esql): simplify MvExpand pushdown logic by extracting to utility …
kanoshiou Nov 29, 2025
dd86f32
Update test
kanoshiou Nov 29, 2025
b042096
fix(esql): refactor MvExpand pushdown logic to directly manipulate pr…
kanoshiou Nov 29, 2025
d8c221a
Avoid inconsistent plans
kanoshiou Nov 29, 2025
b5ede88
Create a temporary attribute for the UnionAll case
kanoshiou Nov 30, 2025
cd844d0
Add more tests for UnionAll cases
kanoshiou Nov 30, 2025
2c45495
Merge branch 'refs/heads/main' into push-down-mv_expand-past-project
kanoshiou Nov 30, 2025
9410846
Merge branch 'refs/heads/main' into push-down-mv_expand-past-project
kanoshiou Dec 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/changelog/136398.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pr: 136398
summary: "ESQL: Push down `MvExpand` past `Project`"
area: ES|QL
type: enhancement
issues:
- 136292
- 136596
- 119074
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public abstract class GenerativeRestTest extends ESRestTestCase implements Query
"Data too large", // Circuit breaker exceptions eg. https://github.com/elastic/elasticsearch/issues/130072
"long overflow", // https://github.com/elastic/elasticsearch/issues/135759
"cannot be cast to class", // https://github.com/elastic/elasticsearch/issues/133992
"can't find input for", // https://github.com/elastic/elasticsearch/issues/136596
"unexpected byte", // https://github.com/elastic/elasticsearch/issues/136598
"out of bounds for length", // https://github.com/elastic/elasticsearch/issues/136851
"optimized incorrectly due to missing references", // https://github.com/elastic/elasticsearch/issues/138231
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,157 @@ Production | false | true | -1 | 1
Success | false | true | -1 | 1 | null | 1
;


testPushDownMvExpandPastProject
// https://github.com/elastic/elasticsearch/issues/136596
required_capability: push_down_mv_expand_past_project
from partial_mapping_no_source_sample_data,no_mapping_sample_data
| eval spZehshmg = false, `@timestamp` = null, xrRYYdVC = -1193517579093331092
| rename xrRYYdVC AS `goqRirHJaE`, `@timestamp` AS gilMWmQyQ
| sort goqRirHJaE NULLS LAST
| where true
| limit 14644
| mv_expand `spZehshmg`
| keep goqRirHJaE
;

goqRirHJaE:long
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
-1193517579093331092
;


testPushDownMvExpandPastProject2
required_capability: push_down_mv_expand_past_project
from languages
| eval a = [1,2]
| rename a AS b
| sort b
| mv_expand b
| keep b
;

b:integer
1
2
1
2
1
2
1
2
;


testPushDownMvExpandPastProject3
required_capability: push_down_mv_expand_past_project
from languages
| eval a = 2
| rename a AS b
| mv_expand b
| keep b
;

b:integer
2
2
2
2
;


testPushDownMvExpandPastProject4
required_capability: push_down_mv_expand_past_project
row a = [1,2,3]
| eval c = a
| eval a = 12
| rename c as b
| keep *
| mv_expand b
;

b:integer | a:integer
1 | 12
2 | 12
3 | 12
;


testPushDownMvExpandPastProject5
required_capability: fork_v9
required_capability: subquery_in_from_command
required_capability: push_down_mv_expand_past_project
from employees, (from employees | keep salary)
| eval salary = salary::keyword
| keep salary
| mv_expand salary
| sort salary
| limit 6
;

salary:keyword
25324
25324
25945
25945
25976
25976
;


testPushDownMvExpandPastProject6
required_capability: fork_v9
required_capability: subquery_in_from_command
required_capability: push_down_mv_expand_past_project
from employees, (from employees | keep salary)
| eval salary = salary::keyword
| eval tmp = salary
| keep salary, tmp
| mv_expand salary
| sort salary
| limit 6
;

salary:keyword | tmp:keyword
25324 | 25324
25324 | 25324
25945 | 25945
25945 | 25945
25976 | 25976
25976 | 25976
;


testPushDownMvExpandPastProject7
required_capability: fork_v9
required_capability: subquery_in_from_command
required_capability: push_down_mv_expand_past_project
from employees, (from employees | keep salary)
| eval salary = salary::keyword
| eval tmp = salary
| keep salary, tmp
| mv_expand tmp
| sort salary
| limit 6
;

salary:keyword | tmp:keyword
25324 | 25324
25324 | 25324
25945 | 25945
25945 | 25945
25976 | 25976
25976 | 25976
;
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,11 @@ public enum Cap {
*/
KNN_FUNCTION_OPTIONS_K_VISIT_PERCENTAGE,

/**
* Support for pushing down MV_EXPAND past PROJECT with complex queries.
*/
PUSH_DOWN_MV_EXPAND_PAST_PROJECT,

/**
* Enables automatically grouping by all dimension fields in TS mode queries and outputs the _timeseries column
* with all the dimensions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownFilterAndLimitIntoUnionAll;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownInferencePlan;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownJoinPastProject;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownMvExpandPastProject;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownRegexExtract;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushLimitToKnn;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.RemoveStatsOverride;
Expand Down Expand Up @@ -219,6 +220,7 @@ protected static Batch<LogicalPlan> operators() {
new PushDownRegexExtract(),
new PushDownEnrich(),
new PushDownJoinPastProject(),
new PushDownMvExpandPastProject(),
new PushDownAndCombineOrderBy(),
new PushDownFilterAndLimitIntoUnionAll(),
new PruneRedundantOrderBy(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.logical;

import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.Nullability;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.Project;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class PushDownMvExpandPastProject extends OptimizerRules.OptimizerRule<MvExpand> {
@Override
protected LogicalPlan rule(MvExpand mvExpand) {
if (mvExpand.child() instanceof Project pj) {
LogicalPlan finalChild = pj.child();
NamedExpression finalTarget = mvExpand.target();
Attribute finalExpanded = mvExpand.expanded();

String expandedFieldName = finalExpanded.name();
List<NamedExpression> projections = new ArrayList<>(pj.projections());
Set<String> inputNames = pj.inputSet().stream().map(NamedExpression::name).collect(Collectors.toSet());

// Find if the target is aliased in the project and create an alias with temporary names for it.
for (int i = 0; i < projections.size(); i++) {
if (projections.get(i) instanceof Alias alias) {
boolean replaced = false;
/*
* If the expanded field has the same name as a field in the projection's input set,
* and the projection shadows that specific field from the projection input set.
* Pushing down the MvExpand in such cases would cause duplicate output attributes.
* To avoid this case, we create a temporary attribute for the expanded field and
* update the projection to alias this temporary attribute back to the original name.
* This can happen with aliases generated by ResolveUnionTypesInUnionAll.
*
* Example query:
* from employees, (from employees | keep salary)
* | eval salary = salary::keyword
* | keep salary
* | mv_expand salary
*
* From plan:
* MvExpand[language_code{r}#4,language_code{r}#17]
* \_Project[[$$language_code$converted_to$keyword{r$}#20 AS language_code#4]]
* \_UnionAll[[language_code{r}#15, $$language_code$converted_to$keyword{r$}#20, language_name{r}#16]]
*
* To plan:
* Project[[$$language_code$temp_name$21{r$}#22 AS language_code#17]]
* \_MvExpand[$$language_code$converted_to$keyword{r$}#20,$$language_code$temp_name$21{r$}#22]
* \_UnionAll[[language_code{r}#15, $$language_code$converted_to$keyword{r$}#20, language_name{r}#16]]
*
*
* If the original mv_expand target field is referenced elsewhere in the projections,
* a defensive eval will also be injected.
*
* Example query:
* from languages, (from languages | keep language_code)
* | eval language_code = language_code::keyword
* | eval tmp = language_code
* | keep language_code, tmp
* | mv_expand language_code
*
* From plan:
* MvExpand[language_code{r}#4,language_code{r}#22]
* \_Project[[$$language_code$converted_to$keyword{r$}#25 AS language_code#4,$$language_code$converted_to$keyword{r$}#25
* AS tmp#7]]
* \_UnionAll[[language_code{r}#20, $$language_code$converted_to$keyword{r$}#25, language_name{r}#21]]
*
* To plan:
* Project[[$$language_code$temp_name$26{r$}#27 AS language_code#22, $$language_code$converted_to$keyword{r$}#25
* AS tmp#7]]
* \_MvExpand[$$language_code$converted_to$keyword$language_code$0{r}#28,$$language_code$temp_name$26{r$}#27]
* \_Eval[[$$language_code$converted_to$keyword{r$}#25 AS $$language_code$converted_to$keyword$language_code$0#28]]
* \_UnionAll[[language_code{r}#20, $$language_code$converted_to$keyword{r$}#25, language_name{r}#21]]
*/
if (alias.toAttribute().semanticEquals(finalTarget.toAttribute())) {
if (inputNames.contains(expandedFieldName) && inputNames.contains(alias.toAttribute().name())) {
ReferenceAttribute tempAttribute = new ReferenceAttribute(
alias.source(),
null,
TemporaryNameUtils.locallyUniqueTemporaryName(alias.name()),
alias.dataType(),
Nullability.FALSE,
null,
true
);
projections.set(i, new Alias(alias.source(), expandedFieldName, tempAttribute, finalExpanded.id()));
finalExpanded = tempAttribute;
replaced = true;
}

// Check if the alias's original field (child) is referenced elsewhere in the projections.
// If the original field is not referenced by any other projection or alias,
// we don't need to inject an Eval to preserve it, and can safely resolve renames and push down.
Comment on lines +105 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very helpful comments, btw!

if (projections.stream()
.anyMatch(
ne -> ne.semanticEquals(alias.child())
|| ne instanceof Alias as && as.child().semanticEquals(alias.child()) && as != alias
) == false) {
// The alias's original field is not referenced elsewhere, no need to preserve it,
finalTarget = (NamedExpression) alias.child();
break;
}

// for query like: row a = 2 | eval b = a | keep * | mv_expand b
Alias aliasAlias = new Alias(
alias.source(),
TemporaryNameUtils.temporaryName(alias.child(), alias.toAttribute(), 0),
alias.child()
);
if (replaced == false) {
projections.set(i, finalExpanded);
}
finalChild = new Eval(aliasAlias.source(), finalChild, List.of(aliasAlias));
finalTarget = aliasAlias.toAttribute();
break;
} else if (alias.child().semanticEquals(finalTarget.toAttribute())) {
// for query like: row a = 2 | eval b = a | keep * | mv_expand a
Alias aliasAlias = new Alias(
alias.source(),
TemporaryNameUtils.temporaryName(alias.child(), alias.toAttribute(), 0),
alias.child()
);
projections.set(i, alias.replaceChild(aliasAlias.toAttribute()));
finalChild = new Eval(aliasAlias.source(), finalChild, List.of(aliasAlias));
break;
}
}
}

// Push down the MvExpand past the Project
MvExpand pushedDownMvExpand = new MvExpand(mvExpand.source(), finalChild, finalTarget, finalExpanded);

// Update projections to point to the expanded attribute
Attribute target = finalTarget.toAttribute();
for (int i = 0; i < projections.size(); i++) {
NamedExpression ne = projections.get(i);
if (ne instanceof Alias alias && alias.child().semanticEquals(target)) {
projections.set(i, alias.replaceChild(finalExpanded));
} else if (ne.semanticEquals(target)) {
projections.set(i, finalExpanded);
}
}

return new Project(pj.source(), pushedDownMvExpand, projections);
}
return mvExpand;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public static Project pushDownPastProject(UnaryPlan parent) {
}
}

private static <P extends LogicalPlan> P resolveRenamesFromProject(P plan, Project project) {
public static <P extends LogicalPlan> P resolveRenamesFromProject(P plan, Project project) {
AttributeMap.Builder<Expression> aliasBuilder = AttributeMap.builder();
project.forEachExpression(Alias.class, a -> aliasBuilder.put(a.toAttribute(), a.child()));
var aliases = aliasBuilder.build();
Expand Down
Loading