Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V4.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V5.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that it's probably a good idea to add tests for INLINESTATS ... BY x = bucket(...) and INLINESTATS ... BY x = CATEGORIZE(...). I think the latter cannot work because the join key in BY x = CATEGORIZE(...) is computed during the aggregation, whereas INLINESTATS requires the join key to be present before that.

Cc @jan-elastic , I think we'll have to start out with a limitation where INLINESTATS can't use CATEGORIZE, at least at first; to enable this, I think we'd somehow have to grab the categorizer from the first phase of the query (which computes the STATS) and make it available to the second phase of the query (which performs the joining with every row we see).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I can live with the fact that the first version of INLINESTATS doesn't work with CATEGORIZE.

Just open a GitHub issue for that and it can be resolved later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #124717

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//

maxOfInt
required_capability: inlinestats_v4
required_capability: inlinestats_v5
// tag::max-languages[]
FROM employees
| KEEP emp_no, languages
Expand All @@ -25,7 +25,7 @@ emp_no:integer | languages:integer | max_lang:integer
;

maxOfIntByKeyword
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, languages, gender
Expand All @@ -43,7 +43,7 @@ emp_no:integer | languages:integer | max_lang:integer | gender:keyword
;

maxOfLongByKeyword
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, avg_worked_seconds, gender
Expand All @@ -58,7 +58,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | gender:
;

maxOfLong
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, avg_worked_seconds, gender
Expand All @@ -71,7 +71,7 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon
;

maxOfLongByCalculatedKeyword
required_capability: inlinestats_v4
required_capability: inlinestats_v5

// tag::longest-tenured-by-first[]
FROM employees
Expand All @@ -94,7 +94,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
;

maxOfLongByCalculatedNamedKeyword
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, avg_worked_seconds, last_name
Expand All @@ -113,7 +113,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
;

maxOfLongByCalculatedDroppedKeyword
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1)
Expand All @@ -132,7 +132,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
;

maxOfLongByEvaledKeyword
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| EVAL l = SUBSTRING(last_name, 0, 1)
Expand All @@ -152,7 +152,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | l:keywo
;

maxOfLongByInt
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, avg_worked_seconds, languages
Expand All @@ -170,7 +170,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag
;

maxOfLongByIntDouble
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, avg_worked_seconds, languages, height
Expand Down Expand Up @@ -244,7 +244,7 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
;

byMvExpand
required_capability: inlinestats_v4
required_capability: inlinestats_v5

// tag::extreme-airports[]
FROM airports
Expand Down Expand Up @@ -308,7 +308,7 @@ count:long | country:keyword | avg:double
;

afterWhere
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM airports
| WHERE country != "United States"
Expand Down Expand Up @@ -367,7 +367,7 @@ abbrev:keyword | city:keyword | region:text | "COUNT(*)":long
;

beforeStats
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM airports
| EVAL lat = ST_Y(location)
Expand All @@ -380,7 +380,7 @@ northern:long | southern:long
;

beforeKeepSort
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| INLINESTATS max_salary = MAX(salary) by languages
Expand Down Expand Up @@ -501,7 +501,7 @@ Zürich | Zürich
;

byConstant
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, languages
Expand All @@ -520,7 +520,7 @@ emp_no:integer | languages:integer | max_lang:integer | y:integer
;

aggConstant
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no
Expand All @@ -538,7 +538,7 @@ one:integer | emp_no:integer
;

percentile
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, salary
Expand All @@ -557,7 +557,7 @@ emp_no:integer | salary:integer | ninety_fifth_salary:double
;

byTwoCalculated
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM airports
| WHERE abbrev IS NOT NULL
Expand Down Expand Up @@ -642,7 +642,7 @@ abbrev:keyword | scalerank:integer | location:geo_point
;

groupShadowsField
required_capability: inlinestats_v4
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, salary, hire_date
Expand All @@ -661,7 +661,7 @@ emp_no:integer | salary:integer | avg_salary:double | hire_date:datetime
;

groupByExpression_And_ExistentField
required_capability: inlinestats_v4
required_capability: inlinestats_v5
FROM employees
| KEEP emp_no, languages, gender
| EVAL x = "ABC"
Expand All @@ -678,8 +678,8 @@ emp_no:integer | languages:integer | x:keyword | max_lang:integer | y:keyword |
10005 |1 |ABC |5 |abc |M
;

groupByRenamedColumn-Ignore
required_capability: inlinestats_v4
groupByRenamedColumn
required_capability: inlinestats_v5
FROM employees
| KEEP emp_no, languages, gender
| INLINESTATS max_lang = MAX(languages) BY y = gender
Expand All @@ -695,3 +695,115 @@ emp_no:integer | languages:integer | gender:keyword | max_lang:integer | y:keywo
10012 | 5 | null | 5 | null
10014 | 5 | null | 5 | null
;

Copy link
Contributor Author

@astefan astefan Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the following tests fail with the same error. This is likely due to something that @alex-spies caught in a previous review, meaning too much is sent to the data nodes for processing, instead of being done on the coordinator. This is also tied to scenarios where multiple inlinestats commands are used in a query.

// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
groupByMultipleRenamedColumns_AndOneExpression_Last-Ignore

FROM employees
| KEEP emp_no, languages, gender, first_name
| INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, f = left(first_name,1)
| LIMIT 10
;

emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang:integer| y:keyword | l:integer |f:keyword
10001 |2 |M |Georgi |2 |M |2 |G
10002 |5 |F |Bezalel |5 |F |5 |B
10003 |4 |M |Parto |4 |M |4 |P
10004 |5 |M |Chirstian |5 |M |5 |C
10005 |1 |M |Kyoichi |1 |M |1 |K
10006 |3 |F |Anneke |3 |F |3 |A
10007 |4 |F |Tzvetan |4 |F |4 |T
10008 |2 |M |Saniya |2 |M |2 |S
10009 |1 |F |Sumant |1 |F |1 |S
10010 |4 |null |Duangkaew |4 |null |4 |D
;

// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
groupByMultipleRenamedColumns_AndTwoExpressions-Ignore

FROM employees
| KEEP emp_no, languages, gender, first_name
| INLINESTATS max_lang = MAX(languages) BY f1 = left(first_name, 1), y = gender, f2 = left(first_name, 1), l = languages
| LIMIT 10
;

emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang:integer| f1:keyword | y:keyword | f2:keyword |l:integer
10001 |2 |M |Georgi |2 |G |M |G |2
10002 |5 |F |Bezalel |5 |B |F |B |5
10003 |4 |M |Parto |4 |P |M |P |4
10004 |5 |M |Chirstian |5 |C |M |C |5
10005 |1 |M |Kyoichi |1 |K |M |K |1
10006 |3 |F |Anneke |3 |A |F |A |3
10007 |4 |F |Tzvetan |4 |T |F |T |4
10008 |2 |M |Saniya |2 |S |M |S |2
10009 |1 |F |Sumant |1 |S |F |S |1
10010 |4 |null |Duangkaew |4 |D |null |D |4
;

// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
groupByMultipleRenamedColumns_AndMultipleRenames-Ignore

FROM employees
| KEEP emp_no, languages, gender, first_name
| RENAME first_name as f
| INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, first_name = left(f, 1)
| LIMIT 10
;

emp_no:integer | languages:integer | gender:keyword| f:keyword |max_lang:integer| y:keyword | l:integer |first_name:keyword
10001 |2 |M |Georgi |2 |M |2 |G
10002 |5 |F |Bezalel |5 |F |5 |B
10003 |4 |M |Parto |4 |M |4 |P
10004 |5 |M |Chirstian |5 |M |5 |C
10005 |1 |M |Kyoichi |1 |M |1 |K
10006 |3 |F |Anneke |3 |F |3 |A
10007 |4 |F |Tzvetan |4 |F |4 |T
10008 |2 |M |Saniya |2 |M |2 |S
10009 |1 |F |Sumant |1 |F |1 |S
10010 |4 |null |Duangkaew |4 |null |4 |D
;

// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
groupByMultipleRenamedColumns_AndSameNameExpressionGroupingOverride-Ignore

FROM employees
| KEEP emp_no, languages, gender, first_name
| RENAME first_name as f
| INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, f = left(f, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subtle semantic question:

The implementation assumes that expressions in the BY clause take effect in the main branch of the join as well. I.e. INLINESTATS ... BY ..., f = left(f, 1) means that the attribute f in the left hand side gets overwritten before being aggregated by/joined on.

But there's a different way to understand this, and it's actually useful: INLINESTATS ... BY ..., f = left(f, 1) could mean that f is overwritten only on the right hand side where the aggregation happens, and then is matched up with the left hand side.

I don't think we want the latter to be the semantics here. But that could be useful in case that e.g. the left hand side has a counter field that indicates the current row (or similar) - then INLINESTATS ... BY counter = counter + 1 would actually shift the stats by 1 row.

| LIMIT 10
;

emp_no:integer | languages:integer | gender:keyword|max_lang:integer| y:keyword | l:integer |f:keyword
10001 |2 |M |2 |M |2 |G
10002 |5 |F |5 |F |5 |B
10003 |4 |M |4 |M |4 |P
10004 |5 |M |5 |M |5 |C
10005 |1 |M |1 |M |1 |K
10006 |3 |F |3 |F |3 |A
10007 |4 |F |4 |F |4 |T
10008 |2 |M |2 |M |2 |S
10009 |1 |F |1 |F |1 |S
10010 |4 |null |4 |null |4 |D
;

twoAggregatesGroupedBy_AField_And_AnExpression
required_capability: inlinestats_v5

FROM employees
| KEEP emp_no, languages, gender, last_name
| WHERE gender IS NOT NULL
| INLINESTATS max_lang = MAX(languages), min_lang = MIN(languages) BY f = left(last_name, 1), gender
| SORT last_name DESC
| LIMIT 8
;

emp_no:integer |languages:integer|last_name:keyword|max_lang:integer|min_lang:integer| f:keyword | gender:keyword
10053 |3 |Zschoche |4 |3 |Z |F
10083 |1 |Zockler |1 |1 |Z |M
10007 |4 |Zielinski |4 |3 |Z |F
10097 |3 |Waschkowski |3 |3 |W |M
10020 |null |Warwick |3 |3 |W |M
10043 |1 |Tzvieli |1 |1 |T |M
10049 |5 |Tramer |5 |5 |T |F
10028 |null |Tempesti |1 |1 |T |M
;
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ public enum Cap {
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
* were refactored.
*/
INLINESTATS_V4(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
INLINESTATS_V5(EsqlPlugin.INLINESTATS_FEATURE_FLAG),

/**
* Support partial_results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ protected static Batch<LogicalPlan> substitutions() {
new ReplaceAggregateNestedExpressionWithEval(),
new ReplaceRegexMatch(),
new ReplaceTrivialTypeConversions(),
new ReplaceOrderByExpressionWithEval(),
new PropagateInlineEvals(),
new ReplaceAliasingEvalWithProject(),
new SkipQueryOnEmptyMappings(),
new SubstituteSpatialSurrogates(),
new ReplaceOrderByExpressionWithEval(),
new PropagateInlineEvals()
new SubstituteSpatialSurrogates()
// new NormalizeAggregate(), - waits on https://github.com/elastic/elasticsearch/issues/100634
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ protected LogicalPlan rule(InlineJoin plan) {
// check if there's any grouping that uses a reference on the right side
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a small cleanup here.

// if so, look for the source until finding a StubReference
// then copy those on the left side as well

LogicalPlan left = plan.left();
LogicalPlan right = plan.right();

Expand All @@ -46,7 +45,6 @@ protected LogicalPlan rule(InlineJoin plan) {
// first checks any aggregate that declares expressions inside the grouping
// second that checks any found references to collect their declaration
right = right.transformDown(p -> {

if (p instanceof Aggregate aggregate) {
// collect references
for (Expression g : aggregate.groupings()) {
Expand All @@ -56,6 +54,10 @@ protected LogicalPlan rule(InlineJoin plan) {
}
}

if (groupingRefs.isEmpty()) {
return p;
}

// find their declaration and remove it
// TODO: this doesn't take into account aliasing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment may be stale now.

if (p instanceof Eval eval) {
Expand Down