Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/129745.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 129745
summary: "ESQL: Fix `mv_expand` inconsistent column order"
area: ES|QL
type: bug
issues:
- 129000
Original file line number Diff line number Diff line change
Expand Up @@ -261,5 +261,9 @@ public boolean isEmpty() {
public AttributeSet build() {
return new AttributeSet(mapBuilder.build());
}

public void clear() {
mapBuilder.keySet().clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,56 @@ emp_no:integer | job_positions:keyword
10001 | Accountant
10001 | Senior Python Developer
;

testMvExpandInconsistentColumnOrder1
required_capability: fix_mv_expand_inconsistent_column_order
from languages
| eval foo_1 = 1, foo_2 = 2
| sort language_code
| mv_expand foo_1
;

language_code:integer | language_name:keyword | foo_1:integer | foo_2:integer
1 | English | 1 | 2
2 | French | 1 | 2
3 | Spanish | 1 | 2
4 | German | 1 | 2
;

testMvExpandInconsistentColumnOrder2
required_capability: fix_mv_expand_inconsistent_column_order
from languages
| eval foo_1 = [1, 3], foo_2 = 2
| sort language_code
| mv_expand foo_1
;

language_code:integer | language_name:keyword | foo_1:integer | foo_2:integer
1 | English | 1 | 2
1 | English | 3 | 2
2 | French | 1 | 2
2 | French | 3 | 2
3 | Spanish | 1 | 2
3 | Spanish | 3 | 2
4 | German | 1 | 2
4 | German | 3 | 2
;

testMvExpandInconsistentColumnOrder3
required_capability: fix_mv_expand_inconsistent_column_order
from message_types,languages_lookup_non_unique_key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi cluster tests don't like the languages_lookup_non_unique_key index because it's a lookup index, and it's treated differently; the languages index is used in an enrich policy, and the multicluster tests also treat such indices differently. Finally, the CSV tests don't like having more than 1 index.

I'm going to push an update that replaces these tests by ones that use indices that our csv tests and multi cluster tests don't treat especially :)

| sort type
| eval language_code = 1, `language_name` = false, message = true, foo_3 = 1, foo_2 = null
| eval foo_3 = "1", `foo_3` = -1, foo_1 = 1, `language_code` = null, `foo_2` = "1"
| mv_expand foo_1
| limit 5
;

country:text | country.keyword:keyword | type:keyword | language_name:boolean | message:boolean | foo_3:integer | foo_1:integer | language_code:null | foo_2:keyword
null | null | Development | false | true | -1 | 1 | null | 1
null | null | Disconnected | false | true | -1 | 1 | null | 1
null | null | Error | false | true | -1 | 1 | null | 1
null | null | Production | false | true | -1 | 1 | null | 1
null | null | Success | false | true | -1 | 1 | null | 1
;

Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,12 @@ public enum Cap {
*/
NO_PLAIN_STRINGS_IN_LITERALS,

/**
* Support for the mv_expand target attribute should be retained in its original position.
* see <a href="https://github.com/elastic/elasticsearch/issues/129000"> ES|QL: inconsistent column order #129000 </a>
*/
FIX_MV_EXPAND_INCONSISTENT_COLUMN_ORDER,

/**
* (Re)Added EXPLAIN command
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ public PhysicalPlan apply(PhysicalPlan plan) {

// no need for projection when dealing with aggs
if (logicalFragment instanceof Aggregate == false) {
List<Attribute> output = new ArrayList<>(requiredAttrBuilder.build());
// we should respect the order of the attributes
List<Attribute> output = new ArrayList<>();
for (Attribute attribute : logicalFragment.outputSet()) {
if (requiredAttrBuilder.contains(attribute)) {
output.add(attribute);
requiredAttrBuilder.remove(attribute);
}
}
output.addAll(requiredAttrBuilder.build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, adding the remaining required attributes is not wrong, but the requiredAttrBuilder should really be empty after removing all the attributes that we added to output, because all the required attributes have to be in the fragment's outputSet - otherwise, we're constructing an invalid plan.

Rather than removing attributes from requiredAttrBuilder and performing this addAll here, I think we can just assert that the size of output and the size of requiredAttrBuilder are the same. (Although the PlanConsistencyChecker will likely also catch this.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the required attributes have to be in the fragment's outputSet

I gave it some thought, and it does make sense.

assert that the size of output and the size of requiredAttrBuilder are the same

I have a question: my understanding is that requiredAttrBuilder should be a subset of output, meaning its size should be less than or equal to that of output. Why, then, are they the same size here? If their sizes are the same, then why bother maintaining requiredAttrBuilder? Wouldn't it be ok to just use output for execution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I confused outputSet with the new array list output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I was wrong. There's a bug in the planning of remote enrich that can make it so that the output of the fragment doesn't contain an attribute, but it's actually physically there and the projection needs to include it, or a downstream TopN won't know what attribute to sort on. This is what happened in #118531.

@kanoshiou , I'll re-instate your initial approach (but still using output rather than outputSet) and I'll add a comment to the problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alex-spies!

// if all the fields are filtered out, it's only the count that matters
// however until a proper fix (see https://github.com/elastic/elasticsearch/issues/98703)
// add a synthetic field (so it doesn't clash with the user defined one) to return a constant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
Expand Down Expand Up @@ -193,6 +194,7 @@
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -658,7 +660,7 @@ public void testExtractorForField() {
var exchange = asRemoteExchange(topN.child());
var project = as(exchange.child(), ProjectExec.class);
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("salary", "emp_no", "last_name"));
assertThat(names(extract.attributesToExtract()), contains("emp_no", "last_name", "salary"));
var source = source(extract.child());
assertThat(source.limit(), is(topN.limit()));
assertThat(source.sorts(), is(fieldSorts(topN.order())));
Expand Down Expand Up @@ -2494,7 +2496,7 @@ public void testPushDownEvalSwapFilter() {
var extract = as(project.child(), FieldExtractExec.class);
assertThat(
extract.attributesToExtract().stream().map(Attribute::name).collect(Collectors.toList()),
contains("last_name", "first_name")
contains("first_name", "last_name")
);

// Now verify the correct Lucene push-down of both the filter and the sort
Expand Down Expand Up @@ -3176,6 +3178,55 @@ public void testProjectAwayAllColumnsWhenOnlyTheCountMattersInStats() {
assertThat(Expressions.names(esQuery.attrs()), contains("_doc"));
}

/**
* LimitExec[1000[INTEGER],336]
* \_MvExpandExec[foo_1{r}#3,foo_1{r}#20]
* \_TopNExec[[Order[emp_no{f}#9,ASC,LAST]],1000[INTEGER],336]
* \_ExchangeExec[[_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, gender{f}#11, hire_date{f}#16, job{f}#17, job.raw{f}#18, la
* nguages{f}#12, last_name{f}#13, long_noidx{f}#19, salary{f}#14, foo_1{r}#3, foo_2{r}#5],false]
* \_ProjectExec[[_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, gender{f}#11, hire_date{f}#16, job{f}#17, job.raw{f}#18, la
* nguages{f}#12, last_name{f}#13, long_noidx{f}#19, salary{f}#14, foo_1{r}#3, foo_2{r}#5]]
* \_FieldExtractExec[_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]&gt;[],[]&lt;
* \_EvalExec[[1[INTEGER] AS foo_1#3, 1[INTEGER] AS foo_2#5]]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#35], limit[1000], sort[[FieldSort[field=emp_no{f}#9,
* direction=ASC, nulls=LAST]]] estimatedRowSize[352]
*/
public void testProjectAwayMvExpandColumnOrder() {
var plan = optimizedPlan(physicalPlan("""
from test
| eval foo_1 = 1, foo_2 = 1
| sort emp_no
| mv_expand foo_1
"""));
var limit = as(plan, LimitExec.class);
var mvExpand = as(limit.child(), MvExpandExec.class);
var topN = as(mvExpand.child(), TopNExec.class);
var exchange = as(topN.child(), ExchangeExec.class);
var project = as(exchange.child(), ProjectExec.class);

assertThat(
Expressions.names(project.projections()),
containsInRelativeOrder(
"_meta_field",
"emp_no",
"first_name",
"gender",
"hire_date",
"job",
"job.raw",
"languages",
"last_name",
"long_noidx",
"salary",
"foo_1",
"foo_2"
)
);
var fieldExtract = as(project.child(), FieldExtractExec.class);
var eval = as(fieldExtract.child(), EvalExec.class);
EsQueryExec esQuery = as(eval.child(), EsQueryExec.class);
}

/**
* ProjectExec[[a{r}#5]]
* \_EvalExec[[__a_SUM@81823521{r}#15 / __a_COUNT@31645621{r}#16 AS a]]
Expand Down Expand Up @@ -5698,9 +5749,9 @@ public void testPushTopNKeywordToSource() {
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. The only reason these queries produced correct output was because there was a Project at the top that made the attribute order here irrelevant; but actually, we had changed it locally, and this shows that ProjectAwayColumns was never fully correct to begin with.

var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "location", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "location", "name"));
var source = source(extract.child());
assertThat(source.limit(), is(topN.limit()));
assertThat(source.sorts(), is(fieldSorts(topN.order())));
Expand Down Expand Up @@ -5740,9 +5791,9 @@ public void testPushTopNAliasedKeywordToSource() {
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name"));
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "location", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "location", "name"));
var source = source(extract.child());
assertThat(source.limit(), is(topN.limit()));
assertThat(source.sorts(), is(fieldSorts(topN.order())));
Expand Down Expand Up @@ -5785,9 +5836,9 @@ public void testPushTopNDistanceToSource() {
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "distance"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "distance"));
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
var evalExec = as(extract.child(), EvalExec.class);
var alias = as(evalExec.fields().get(0), Alias.class);
assertThat(alias.name(), is("distance"));
Expand Down Expand Up @@ -5847,15 +5898,15 @@ public void testPushTopNInlineDistanceToSource() {
names(project.projections()),
contains(
equalTo("abbrev"),
equalTo("name"),
equalTo("location"),
equalTo("country"),
equalTo("city"),
equalTo("country"),
equalTo("location"),
equalTo("name"),
startsWith("$$order_by$0$")
)
);
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
var evalExec = as(extract.child(), EvalExec.class);
var alias = as(evalExec.fields().get(0), Alias.class);
assertThat(alias.name(), startsWith("$$order_by$0$"));
Expand Down Expand Up @@ -5922,9 +5973,9 @@ public void testPushTopNDistanceWithFilterToSource() {
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "distance"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "distance"));
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
var evalExec = as(extract.child(), EvalExec.class);
var alias = as(evalExec.fields().get(0), Alias.class);
assertThat(alias.name(), is("distance"));
Expand Down Expand Up @@ -6019,9 +6070,9 @@ public void testPushTopNDistanceWithCompoundFilterToSource() {
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "distance"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "distance"));
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
var evalExec = as(extract.child(), EvalExec.class);
var alias = as(evalExec.fields().get(0), Alias.class);
assertThat(alias.name(), is("distance"));
Expand Down Expand Up @@ -6108,10 +6159,10 @@ public void testPushTopNDistanceAndPushableFieldWithCompoundFilterToSource() {
project = as(exchange.child(), ProjectExec.class);
assertThat(
names(project.projections()),
contains("abbrev", "name", "location", "country", "city", "scalerank", "scale", "distance", "loc")
contains("abbrev", "city", "country", "location", "name", "scalerank", "distance", "scale", "loc")
);
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
var evalExec = as(extract.child(), EvalExec.class);
var alias = as(evalExec.fields().get(0), Alias.class);
assertThat(alias.name(), is("distance"));
Expand Down Expand Up @@ -6191,9 +6242,9 @@ public void testPushTopNDistanceAndNonPushableEvalWithCompoundFilterToSource() {
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "scalerank", "distance"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "scalerank", "distance"));
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
var topNChild = as(extract.child(), TopNExec.class);
extract = as(topNChild.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("scalerank"));
Expand Down Expand Up @@ -6267,9 +6318,9 @@ public void testPushTopNDistanceAndNonPushableEvalsWithCompoundFilterToSource()
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "scalerank", "distance"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "distance", "scalerank"));
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
var topNChild = as(extract.child(), TopNExec.class);
var filter = as(topNChild.child(), FilterExec.class);
assertThat(filter.condition(), isA(And.class));
Expand Down Expand Up @@ -6344,9 +6395,9 @@ public void testPushTopNDistanceWithCompoundFilterToSourceAndDisjunctiveNonPusha
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "scalerank", "distance"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "scalerank", "distance"));
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name"));
var topNChild = as(extract.child(), TopNExec.class);
var filter = as(topNChild.child(), FilterExec.class);
assertThat(filter.condition(), isA(Or.class));
Expand Down Expand Up @@ -6413,9 +6464,9 @@ public void testPushCompoundTopNDistanceWithCompoundFilterToSource() {
var exchange = asRemoteExchange(topN.child());

project = as(exchange.child(), ProjectExec.class);
assertThat(names(project.projections()), contains("abbrev", "name", "location", "country", "city", "scalerank", "distance"));
assertThat(names(project.projections()), contains("abbrev", "city", "country", "location", "name", "scalerank", "distance"));
var extract = as(project.child(), FieldExtractExec.class);
assertThat(names(extract.attributesToExtract()), contains("abbrev", "name", "country", "city", "scalerank"));
assertThat(names(extract.attributesToExtract()), contains("abbrev", "city", "country", "name", "scalerank"));
var evalExec = as(extract.child(), EvalExec.class);
var alias = as(evalExec.fields().get(0), Alias.class);
assertThat(alias.name(), is("distance"));
Expand Down
Loading