Skip to content

Commit 25dc590

Browse files
add tests for MvExpand and Aggregate with multiple batches
1 parent 3723510 commit 25dc590

File tree

2 files changed

+107
-8
lines changed

2 files changed

+107
-8
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,7 +1453,7 @@ emp_no:integer | language_code:integer | language_name:keyword
14531453
10093 | 3 | Spanish
14541454
;
14551455

1456-
multipleBatches
1456+
multipleBatchesWithSort
14571457
required_capability: join_lookup_v12
14581458
required_capability: remove_redundant_sort
14591459
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches
@@ -1462,10 +1462,80 @@ from *
14621462
| rename city.country.continent.planet.name as message
14631463
| lookup join message_types_lookup on message
14641464
| sort language_code, birth_date
1465-
| keep birth_date
1465+
| keep language_code
14661466
| limit 1
14671467
;
14681468

1469-
birth_date:date
1470-
null
1469+
language_code:integer
1470+
1
1471+
;
1472+
1473+
multipleBatchesWithMvExpand
1474+
required_capability: join_lookup_v12
1475+
required_capability: remove_redundant_sort
1476+
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches
1477+
1478+
from *
1479+
| rename city.country.continent.planet.name as message
1480+
| lookup join message_types_lookup on message
1481+
| keep birth_date, language_code
1482+
| mv_expand birth_date
1483+
| sort birth_date, language_code
1484+
| limit 1
1485+
;
1486+
1487+
birth_date:datetime |language_code:integer
1488+
1952-02-27T00:00:00.000Z |null
1489+
;
1490+
1491+
multipleBatchesWithAggregate1
1492+
required_capability: join_lookup_v12
1493+
required_capability: remove_redundant_sort
1494+
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches
1495+
1496+
from *
1497+
| rename city.country.continent.planet.name as message
1498+
| lookup join message_types_lookup on message
1499+
| keep birth_date, language_code
1500+
| stats x=max(birth_date), y=min(language_code)
1501+
;
1502+
1503+
x:datetime |y:integer
1504+
1965-01-03T00:00:00.000Z |1
1505+
;
1506+
1507+
multipleBatchesWithAggregate2
1508+
required_capability: join_lookup_v12
1509+
required_capability: remove_redundant_sort
1510+
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches
1511+
1512+
from *
1513+
| rename city.country.continent.planet.name as message
1514+
| lookup join message_types_lookup on message
1515+
| keep birth_date, language_code
1516+
| stats m=min(birth_date) by language_code
1517+
| sort language_code
1518+
| limit 1
1519+
;
1520+
1521+
m:datetime |language_code:integer
1522+
null |1
1523+
;
1524+
1525+
multipleBatchesWithAggregate3
1526+
required_capability: join_lookup_v12
1527+
required_capability: remove_redundant_sort
1528+
required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches
1529+
1530+
from *
1531+
| rename city.country.continent.planet.name as message
1532+
| lookup join message_types_lookup on message
1533+
| keep birth_date, language_code
1534+
| stats m=min(language_code) by birth_date
1535+
| sort birth_date
1536+
| limit 1
1537+
;
1538+
1539+
m:integer |birth_date:datetime
1540+
null |1952-02-27T00:00:00.000Z
14711541
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
import org.elasticsearch.common.util.Maps;
1111
import org.elasticsearch.index.IndexMode;
1212
import org.elasticsearch.xpack.esql.core.expression.Alias;
13+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1314
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1415
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1516
import org.elasticsearch.xpack.esql.core.expression.Literal;
1617
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
18+
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1719
import org.elasticsearch.xpack.esql.core.type.DataType;
1820
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
1921
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
@@ -22,6 +24,7 @@
2224
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2325
import org.elasticsearch.xpack.esql.plan.logical.Filter;
2426
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
27+
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
2528
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
2629
import org.elasticsearch.xpack.esql.plan.logical.Project;
2730
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
@@ -81,7 +84,7 @@ else if (plan instanceof Project project) {
8184
Alias nullAlias = nullLiteral.get(f.dataType());
8285
// save the first field as null (per datatype)
8386
if (nullAlias == null) {
84-
// In case of batch executions on data nodes and join exists, SearchStats may not always be available for all,
87+
// In case of batch executions on data nodes and join exists, SearchStats may not always be available for all
8588
// fields, creating a new alias for null with the same id as the field id can potentially cause planEval to add a
8689
// duplicated ChannelSet to a layout, and Layout.builder().build() could throw a NullPointerException.
8790
// As a workaround, assign a new alias id to the null alias when join exists and SearchStats is not available.
@@ -117,14 +120,40 @@ else if (plan instanceof Project project) {
117120
? f
118121
: Literal.of(f, null)
119122
);
123+
} else if (plan instanceof MvExpand m) {
124+
NamedExpression target = m.target();
125+
AttributeSet joinAttributes = joinAttributes(m);
126+
if (joinAttributes.isEmpty() == false // rewrite only when there is join, TODO do we want to rewrite when there is no join?
127+
&& target instanceof FieldAttribute f
128+
&& stats.exists(f.fieldName()) == false
129+
&& joinAttributes.contains(f) == false
130+
&& f.field() instanceof PotentiallyUnmappedKeywordEsField == false) {
131+
// Replace missing target field with null.
132+
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null));
133+
NamedExpression nullTarget = alias.toAttribute();
134+
plan = new Eval(m.source(), m.child(), List.of(alias));
135+
// The expanded reference is built on top of target field with the same name, and the parent plans all reference to the
136+
// expanded reference other than the target field, keep expanded's id unchanged, otherwise the parent plans cannot find
137+
// it.
138+
Attribute nullExpanded = new ReferenceAttribute(
139+
nullTarget.source(),
140+
nullTarget.name(),
141+
nullTarget.dataType(),
142+
nullTarget.nullable(),
143+
m.expanded().id(),
144+
false
145+
);
146+
plan = new MvExpand(m.source(), plan, nullTarget, nullExpanded);
147+
}
120148
}
121-
122149
return plan;
123150
}
124151

125-
private AttributeSet joinAttributes(Project project) {
152+
private AttributeSet joinAttributes(LogicalPlan plan) {
126153
var attributes = new AttributeSet();
127-
project.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributes.addAll(p.output())));
154+
if (plan instanceof Project || plan instanceof MvExpand) {
155+
plan.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributes.addAll(p.output())));
156+
}
128157
return attributes;
129158
}
130159
}

0 commit comments

Comments
 (0)