From 0741bbb3c4ba83a33aa5c85f6318716656201019 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Mon, 26 May 2025 18:31:28 +0300
Subject: [PATCH 01/20] Adopt a "LogicalPlan" approach to running multiple
sub-queries, coordinated from the EsqlSession.
---
.../xpack/esql/ccq/MultiClusterSpecIT.java | 4 +-
.../src/main/resources/inlinestats.csv-spec | 189 ++++++++++++------
.../xpack/esql/action/EsqlCapabilities.java | 2 +-
.../esql/plan/logical/join/InlineJoin.java | 20 +-
.../esql/plan/physical/HashJoinExec.java | 13 +-
.../xpack/esql/planner/mapper/Mapper.java | 8 +-
.../xpack/esql/session/EsqlSession.java | 85 ++++----
.../logical/PropagateInlineEvalsTests.java | 4 +-
8 files changed, 186 insertions(+), 139 deletions(-)
diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java
index 9c842c4cf1781..9671d158912bc 100644
--- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java
+++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java
@@ -49,7 +49,7 @@
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V5;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
-import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7;
+import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V8;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
@@ -128,7 +128,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
- assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V7.capabilityName()));
+ assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V8.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 30ed34d9ec611..576125cb96419 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -3,7 +3,7 @@
//
allFieldsReturned
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM hosts METADATA _index
| INLINESTATS c = COUNT(*) BY host_group
@@ -16,7 +16,7 @@ eth0 |epsilon gw instance|epsilon |[fe80::cae2:65ff:fece:feb9,
;
maxOfInt
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
// tag::max-languages[]
FROM employees
| KEEP emp_no, languages
@@ -38,7 +38,7 @@ emp_no:integer | languages:integer | max_lang:integer
;
maxOfIntByKeyword
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender
@@ -56,7 +56,7 @@ emp_no:integer | languages:integer | max_lang:integer | gender:keyword
;
maxOfLongByKeyword
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, avg_worked_seconds, gender
@@ -71,7 +71,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | gender:
;
maxOfLong
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, avg_worked_seconds, gender
@@ -84,7 +84,7 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon
;
maxOfLongByCalculatedKeyword
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
// tag::longest-tenured-by-first[]
FROM employees
@@ -107,7 +107,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
;
maxOfLongByCalculatedNamedKeyword
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, avg_worked_seconds, last_name
@@ -126,7 +126,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
;
maxOfLongByCalculatedDroppedKeyword
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| INLINESTATS max_avg_worked_seconds = MAX(avg_worked_seconds) BY l = SUBSTRING(last_name, 0, 1)
@@ -145,7 +145,7 @@ emp_no:integer | avg_worked_seconds:long | last_name:keyword | max_avg_worked_se
;
maxOfLongByEvaledKeyword
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| EVAL l = SUBSTRING(last_name, 0, 1)
@@ -165,7 +165,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | l:keywo
;
maxOfLongByInt
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, avg_worked_seconds, languages
@@ -183,7 +183,7 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag
;
maxOfLongByIntDouble
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, avg_worked_seconds, languages, height
@@ -200,9 +200,9 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag
10086 | 328580163 | 328580163 | 1 | 1.7
;
-
-two-Ignore
+two
required_capability: join_planning_v1
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, avg_worked_seconds, gender
@@ -212,14 +212,41 @@ FROM employees
| SORT emp_no ASC
| LIMIT 3;
-emp_no:integer | languages:integer | avg_worked_seconds:long | gender:keyword | avg_avg_worked_seconds:double | max_languages:integer
- 10002 | 5 | 328922887 | F | 3.133013149047619E8 | 5
- 10006 | 3 | 372957040 | F | 2.978159518235294E8 | 5
- 10007 | 4 | 393084805 | F | 2.863684210555556E8 | 5
+emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:integer|max_languages:integer|gender:keyword
+10002 |328922887 |3.133013149047619E8 |5 |5 |F
+10006 |372957040 |2.978159518235294E8 |3 |5 |F
+10007 |393084805 |2.863684210555556E8 |4 |5 |F
+;
+
+three-Ignore
+required_capability: inlinestats_v8
+// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
+
+FROM employees
+| KEEP emp_no, languages, avg_worked_seconds, gender
+| INLINESTATS avg_avg_worked_seconds = AVG(avg_worked_seconds) BY languages
+| WHERE avg_worked_seconds > avg_avg_worked_seconds
+| INLINESTATS max_languages = MAX(languages) BY gender
+| SORT emp_no ASC
+| LIMIT 10
+| INLINESTATS min(languages), max(languages) by gender
+;
+
+emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:integer|max_languages:integer|min(languages):integer|max(languages):integer|gender:keyword
+10002 |328922887 |3.133013149047619E8 |5 |5 |3 |5 |F
+10006 |372957040 |2.978159518235294E8 |3 |5 |3 |5 |F
+10007 |393084805 |2.863684210555556E8 |4 |5 |3 |5 |F
+10010 |315236372 |2.863684210555556E8 |4 |5 |1 |5 |null
+10012 |365510850 |3.133013149047619E8 |5 |5 |1 |5 |null
+10015 |390266432 |3.133013149047619E8 |5 |5 |1 |5 |null
+10018 |309604079 |3.0318626831578946E8 |2 |5 |1 |5 |null
+10019 |342855721 |2.94833632E8 |1 |5 |1 |5 |null
+10020 |373309605 |3.181719481E8 |null |5 |null |null |M
+10023 |330870342 |3.181719481E8 |null |5 |3 |5 |F
;
byMultivaluedSimple
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
// tag::mv-group[]
FROM airports
@@ -237,7 +264,7 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
;
byMultivaluedMvExpand
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
// tag::mv-expand[]
FROM airports
@@ -257,7 +284,7 @@ GWL |9 |4 |military
;
byMvExpand
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
// tag::extreme-airports[]
FROM airports
@@ -299,6 +326,10 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
afterStats-Ignore
required_capability: join_planning_v1
+// fails with
+// line 1:104: Plan [ProjectExec[[count{r}#174, country{f}#182, avg{r}#177]]] optimized incorrectly due to missing references [count{r}#174]
+// line 1:104: Plan [TopNExec[[Order[count{r}#174,DESC,FIRST], Order[country{f}#182,ASC,LAST]],1000[INTEGER],null]] optimized incorrectly due to missing references [count{r}#174]
+// line 1:80: Plan [FilterExec[count{r}#174 > 13.365[DOUBLE]]] optimized incorrectly due to missing references [count{r}#174]
FROM airports
| STATS count=COUNT(*) BY country
@@ -321,7 +352,7 @@ count:long | country:keyword | avg:double
;
afterWhere
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM airports
| WHERE country != "United States"
@@ -338,30 +369,34 @@ abbrev:keyword | country:keyword | count:long
BDQ | India | 50
;
-afterLookup-Ignore
+afterLookup
required_capability: join_planning_v1
+required_capability: inlinestats_v8
+required_capability: join_lookup_v12
FROM airports
-| RENAME scalerank AS int
-| LOOKUP int_number_names ON int
-| RENAME name as scalerank
-| DROP int
+| EVAL backup_scalerank = scalerank
+| RENAME scalerank AS language_code
+| LOOKUP JOIN languages_lookup ON language_code
+| RENAME language_name as scalerank
+| DROP language_code
| INLINESTATS count=COUNT(*) BY scalerank
-| SORT abbrev ASC
-| KEEP abbrev, scalerank
-| LIMIT 4
+| SORT abbrev DESC
+| KEEP abbrev, *scalerank
+| LIMIT 5
;
-abbrev:keyword | scalerank:keyword
- ABJ | four
- ABQ | six
- ABV | five
- ACA | four
+abbrev:keyword |backup_scalerank:integer| scalerank:keyword
+null |8 |null
+null |8 |null
+null |8 |null
+ZRH |3 |Spanish
+ZNZ |4 |German
;
afterEnrich
required_capability: join_planning_v1
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
required_capability: enrich_load
FROM airports
@@ -382,7 +417,7 @@ abbrev:keyword | city:keyword | "COUNT(*)":long | region:text
;
beforeStats
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM airports
| EVAL lat = ST_Y(location)
@@ -395,7 +430,7 @@ northern:long | southern:long
;
beforeKeepSort
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| INLINESTATS max_salary = MAX(salary) by languages
@@ -410,7 +445,7 @@ emp_no:integer | languages:integer | max_salary:integer
;
beforeKeepWhere
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| INLINESTATS max_salary = MAX(salary) by languages
@@ -424,7 +459,7 @@ emp_no:integer | languages:integer | max_salary:integer
beforeEnrich
required_capability: join_planning_v1
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
required_capability: enrich_load
FROM airports
@@ -442,9 +477,10 @@ ABV |Abuja |385 |major |Municipal Are
ACA |Acapulco de Juárez|385 |major |Acapulco de Juárez
;
-beforeAndAfterEnrich-Ignore
+beforeAndAfterEnrich
required_capability: join_planning_v1
-// does not fail, but it returns null for all values of count_region
+required_capability: inlinestats_v8
+required_capability: enrich_load
FROM airports
| KEEP abbrev, type, city
@@ -453,13 +489,16 @@ FROM airports
| WHERE MV_COUNT(region) == 1
| INLINESTATS count_region=COUNT(*) BY region
| SORT abbrev ASC
-| LIMIT 3
+| WHERE STARTS_WITH(abbrev, "AL")
+| LIMIT 5
;
-abbrev:keyword | city:keyword |"COUNT(*)":long | type:keyword | count_region:long | region:text
-ABJ |Abidjan |499 |mid | 1 |Abidjan
-ABV |Abuja |385 |major | 1 |Municipal Area Council
-ACA |Acapulco de Juárez|385 |major | 1 |Acapulco de Juárez
+abbrev:keyword | city:keyword |"COUNT(*)":long| type:keyword | count_region:long | region:text
+ALA |Almaty |385 |major |2 |Жетісу ауданы
+ALB |Colonie |499 |mid |1 |Town of Colonie
+ALC |Alicante |385 |major |1 |Alacant / Alicante
+ALG |Algiers |385 |major |1 |Alger
+ALL |Albenga |499 |mid |1 |Albenga
;
shadowing-Ignore
@@ -518,7 +557,7 @@ Zürich | Zürich
;
byConstant
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages
@@ -537,7 +576,7 @@ emp_no:integer | languages:integer | max_lang:integer | y:integer
;
aggConstant
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no
@@ -555,7 +594,7 @@ one:integer | emp_no:integer
;
percentile
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, salary
@@ -574,7 +613,7 @@ emp_no:integer | salary:integer | ninety_fifth_salary:double
;
byTwoCalculated
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM airports
| WHERE abbrev IS NOT NULL
@@ -595,6 +634,8 @@ abbrev:keyword | scalerank:integer | location:geo_point
byTwoCalculatedSecondOverwrites-Ignore
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
+// fails with
+// line 1:78: Plan [InlineJoin[LEFT,[x{r}#20, x{r}#23],[x{r}#20, x{r}#23],[x{r}#23, x{r}#23]]] optimized incorrectly due to missing references from left hand side [x{r}#20]
FROM airports
| WHERE abbrev IS NOT NULL
@@ -616,6 +657,9 @@ abbrev:keyword | scalerank:integer | location:geo_point
byTwoCalculatedSecondOverwritesReferencingFirst-Ignore
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
+// fails with
+// line 1:105: Plan [Aggregate[[x{r}#45],[MIN(scalerank{f}#50,true[BOOLEAN]) AS min_sl, x{r}#45]]] optimized incorrectly due to missing references [x{r}#45]
+// line 1:105: Plan [InlineJoin[LEFT,[x{r}#42, x{r}#45],[x{r}#42, x{r}#45],[x{r}#45, x{r}#45]]] optimized incorrectly due to missing references from left hand side [x{r}#42]
FROM airports
| WHERE abbrev IS NOT NULL
@@ -639,6 +683,8 @@ abbrev:keyword | scalerank:integer | location:geo_point
groupShadowsAgg-Ignore
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
+// fails with
+// line 1:134: column [location] cannot be used as an aggregate once declared in the STATS BY grouping key [lat_10 = ROUND(ST_Y(location), -1)]
FROM airports
| WHERE abbrev IS NOT NULL
@@ -659,7 +705,7 @@ abbrev:keyword | scalerank:integer | location:geo_point
;
groupShadowsField
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, salary, hire_date
@@ -678,7 +724,7 @@ emp_no:integer | salary:integer | avg_salary:double | hire_date:datetime
;
groupByExpression_And_ExistentField
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender
| EVAL x = "ABC"
@@ -696,7 +742,7 @@ emp_no:integer | languages:integer | x:keyword | max_lang:integer | y:keyword |
;
groupByRenamedColumn
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender
| INLINESTATS max_lang = MAX(languages) BY y = gender
@@ -714,11 +760,13 @@ emp_no:integer | languages:integer | gender:keyword | max_lang:integer | y:keywo
;
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
-groupByMultipleRenamedColumns_AndOneExpression_Last-Ignore
+groupByMultipleRenamedColumns_AndOneExpression_Last
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender, first_name
| INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, f = left(first_name,1)
+| SORT emp_no
| LIMIT 10
;
@@ -736,11 +784,13 @@ emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang:
;
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
-groupByMultipleRenamedColumns_AndTwoExpressions-Ignore
+groupByMultipleRenamedColumns_AndTwoExpressions
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender, first_name
| INLINESTATS max_lang = MAX(languages) BY f1 = left(first_name, 1), y = gender, f2 = left(first_name, 1), l = languages
+| SORT emp_no
| LIMIT 10
;
@@ -758,12 +808,14 @@ emp_no:integer | languages:integer | gender:keyword|first_name:keyword|max_lang:
;
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
-groupByMultipleRenamedColumns_AndMultipleRenames-Ignore
+groupByMultipleRenamedColumns_AndMultipleRenames
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender, first_name
| RENAME first_name as f
| INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, first_name = left(f, 1)
+| SORT emp_no
| LIMIT 10
;
@@ -781,12 +833,14 @@ emp_no:integer | languages:integer | gender:keyword| f:keyword |max_lang:
;
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
-groupByMultipleRenamedColumns_AndSameNameExpressionGroupingOverride-Ignore
+groupByMultipleRenamedColumns_AndSameNameExpressionGroupingOverride
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender, first_name
| RENAME first_name as f
| INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, f = left(f, 1)
+| SORT emp_no
| LIMIT 10
;
@@ -804,7 +858,7 @@ emp_no:integer | languages:integer | gender:keyword|max_lang:integer| y:keyword
;
twoAggregatesGroupedBy_AField_And_AnExpression
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender, last_name
@@ -826,7 +880,7 @@ emp_no:integer |languages:integer|last_name:keyword|max_lang:integer|min_lang:in
;
groupByMultipleRenamedColumns_InversedOrder
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, still_hired, gender
@@ -844,7 +898,7 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|max_lang:i
;
groupByMultipleRenamedColumns_InversedOrder_ComplexEval
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, still_hired, gender
@@ -863,7 +917,7 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|multilingu
;
groupByMultipleRenamedColumns_AndComplexEval
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, still_hired, gender
@@ -882,12 +936,14 @@ emp_no:integer |languages:integer|still_hired:boolean| gender:keyword|multilingu
;
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
-groupByMultipleRenamedColumns_AndConstantValue-Ignore
+groupByMultipleRenamedColumns_AndConstantValue
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender, first_name
| EVAL x = "ABC"
| INLINESTATS max_lang = MAX(languages) BY y = gender, l = languages, f = to_lower(x)
+| SORT emp_no
| LIMIT 10
;
@@ -905,7 +961,7 @@ emp_no:integer |languages:integer|gender:keyword |first_name:keyword | x:keyw
;
groupByRenamedExpression
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| KEEP emp_no, languages, gender, last_name
@@ -927,7 +983,7 @@ emp_no:integer |languages:integer|last_name:keyword|max_lang:integer|min_lang:in
;
doubleFilterOnLeftAndRight_InlineStats_Sides
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages
@@ -948,7 +1004,7 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary:
;
filterOnInlineStatsAggs
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages
@@ -967,7 +1023,7 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary:
;
filterOnInlineStatsAggsValues_And_Groupings
-required_capability: inlinestats_v7
+required_capability: inlinestats_v8
FROM employees
| INLINESTATS max_salary = MAX(salary), min_salary = MIN(salary) by languages
@@ -989,6 +1045,7 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary:
// line 1:101: Plan [FieldExtractExec[description{f}#274, host{f}#272, ip0{f}#273, _index..]<[],[]>] optimized incorrectly due to duplicate output attribute ip1{r}#267
// line 1:101: Plan [TopNExec[[Order[ip1{r}#267,ASC,LAST]],1[INTEGER],null]] optimized incorrectly due to duplicate output attribute ip1{r}#267
// line 1:44: Plan [HashJoinExec[[host_group{f}#271, card{f}#276],[host_group{f}#271, card{f}#276],[host_group{r}#271, card{r}#276],[ip1{r}#267]]] optimized incorrectly due to duplicate output attribute ip1{r}#267
+// https://github.com/elastic/elasticsearch/issues/128311
inlineStatsOverrideEVALed_FieldWithSameName-Ignore
FROM hosts METADATA _index
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
index 1687189d92c01..243093e535eac 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
@@ -889,7 +889,7 @@ public enum Cap {
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
* were refactored.
*/
- INLINESTATS_V7(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
+ INLINESTATS_V8(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
/**
* Support partial_results
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
index 8ce476120abcf..1ea733bd3d1b4 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
@@ -139,20 +139,12 @@ public List computeOutput(List left, List right
JoinType joinType = config().type();
List output;
if (LEFT.equals(joinType)) {
- AttributeSet rightFields = AttributeSet.of(config().rightFields());
- List leftOutputWithoutMatchFields = new ArrayList<>();
- // at this point "left" part of the join contains all the attributes that represent the input of the join
- // including any aliasing (evals) of expressions used as grouping attributes (or join "match fields") in the join itself
- for (Attribute attr : left().output()) {
- if (rightFields.contains(attr) == false) {
- // the aforementioned groupings expressions or aliasing are removed from the left set of attributes
- leftOutputWithoutMatchFields.add(attr);
- }
- }
- // the actual output of the join will place the left hand side attributes (excluding any aliasing of the groupings)
- // as first columns in the output followed by whatever the right hand side of join adds in this order: aggregates first,
- // followed by groupings (this order should be preserved inside the rightFields() output)
- output = mergeOutputAttributes(right, leftOutputWithoutMatchFields);
+ List leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList();
+ List rightWithAppendedKeys = new ArrayList<>(right);
+ rightWithAppendedKeys.removeAll(config().rightFields());
+ rightWithAppendedKeys.addAll(config().leftFields());
+
+ output = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys);
} else {
throw new IllegalArgumentException(joinType.joinName() + " unsupported");
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java
index 3818b4e5a4c32..55a34af436f8a 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/HashJoinExec.java
@@ -23,6 +23,8 @@
import java.util.Objects;
import java.util.Set;
+import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
+
public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
PhysicalPlan.class,
@@ -107,11 +109,12 @@ public PhysicalPlan estimateRowSize(State state) {
@Override
public List output() {
if (lazyOutput == null) {
- lazyOutput = new ArrayList<>(left().output());
- var rightFieldNames = rightFields.stream().map(Attribute::name).toList();
- lazyOutput.removeIf(a -> rightFieldNames.contains(a.name()));
- lazyOutput.addAll(addedFields);
- lazyOutput.addAll(rightFields);
+ List leftOutputWithoutKeys = left().output().stream().filter(attr -> leftFields.contains(attr) == false).toList();
+ List rightWithAppendedKeys = new ArrayList<>(right().output());
+ rightWithAppendedKeys.removeAll(rightFields);
+ rightWithAppendedKeys.addAll(leftFields);
+
+ lazyOutput = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys);
}
return lazyOutput;
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
index 5586140d809ea..81994ea1f5c7a 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
@@ -207,14 +207,10 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]");
}
- if (join instanceof InlineJoin) {
- return new FragmentExec(bp);
- }
-
PhysicalPlan left = map(bp.left());
// only broadcast joins supported for now - hence push down as a streaming operator
- if (left instanceof FragmentExec fragment) {
+ if (left instanceof FragmentExec) {
return new FragmentExec(bp);
}
@@ -228,7 +224,7 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
config.matchFields(),
config.leftFields(),
config.rightFields(),
- join.output()
+ join.rightOutputFields()
);
}
if (right instanceof FragmentExec fragment
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
index ef9d556cc27e0..248635fdab852 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
@@ -84,6 +84,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
+import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -94,10 +95,8 @@
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -206,85 +205,85 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener listener
) {
- PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
// TODO: this could be snuck into the underlying listener
EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
// execute any potential subplans
- executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
+ executeSubPlans(optimizedPlan, planRunner, executionInfo, request, listener);
}
- private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {}
+ private record LogicalPlanTuple(LogicalPlan nonStubbedSubPlan, LogicalPlan originalSubPlan) {}
private void executeSubPlans(
- PhysicalPlan physicalPlan,
+ LogicalPlan optimizedPlan,
PlanRunner runner,
EsqlExecutionInfo executionInfo,
EsqlQueryRequest request,
ActionListener listener
) {
- List subplans = new ArrayList<>();
-
- // Currently the inlinestats are limited and supported as streaming operators, thus present inside the fragment as logical plans
- // Below they get collected, translated into a separate, coordinator based plan and the results 'broadcasted' as a local relation
- physicalPlan.forEachUp(FragmentExec.class, f -> {
- f.fragment().forEachUp(InlineJoin.class, ij -> {
- // extract the right side of the plan and replace its source
- LogicalPlan subplan = InlineJoin.replaceStub(ij.left(), ij.right());
- // mark the new root node as optimized
- subplan.setOptimized();
- PhysicalPlan subqueryPlan = logicalPlanToPhysicalPlan(subplan, request);
- subplans.add(new PlanTuple(subqueryPlan, ij.right()));
- });
- });
-
- Iterator iterator = subplans.iterator();
+ var subPlan = firstSubPlan(optimizedPlan);
// TODO: merge into one method
- if (subplans.size() > 0) {
+ if (subPlan != null) {
// code-path to execute subplans
- executeSubPlan(new DriverCompletionInfo.Accumulator(), physicalPlan, iterator, executionInfo, runner, listener);
+ executeSubPlan(new DriverCompletionInfo.Accumulator(), optimizedPlan, subPlan, executionInfo, runner, request, listener);
} else {
+ PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
// execute main plan
runner.run(physicalPlan, listener);
}
}
+ private LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
+ Holder subPlan = new Holder<>();
+ // Collect the first inlinejoin (bottom up in the tree or, viewing from the user-friendly query pov, the closest to ES source
+ // inlinestats command)
+ optimizedPlan.forEachUp(InlineJoin.class, ij -> {
+ // extract the right side of the plan and replace its source
+ if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) {
+ var p = InlineJoin.replaceStub(ij.left(), ij.right());
+ p.setOptimized();
+ subPlan.set(new LogicalPlanTuple(p, ij.right()));
+ }
+ });
+ return subPlan.get();
+ }
+
private void executeSubPlan(
DriverCompletionInfo.Accumulator completionInfoAccumulator,
- PhysicalPlan plan,
- Iterator subPlanIterator,
+ LogicalPlan optimizedPlan,
+ LogicalPlanTuple subPlans,
EsqlExecutionInfo executionInfo,
PlanRunner runner,
+ EsqlQueryRequest request,
ActionListener listener
) {
- PlanTuple tuple = subPlanIterator.next();
+ // Create a physical plan out of the logical sub-plan
+ var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.nonStubbedSubPlan, request);
- runner.run(tuple.physical, listener.delegateFailureAndWrap((next, result) -> {
+ runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> {
try {
+ // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
completionInfoAccumulator.accumulate(result.completionInfo());
- LocalRelation resultWrapper = resultToPlan(tuple.logical, result);
+ LocalRelation resultWrapper = resultToPlan(subPlans.nonStubbedSubPlan, result);
// replace the original logical plan with the backing result
- PhysicalPlan newPlan = plan.transformUp(FragmentExec.class, f -> {
- LogicalPlan frag = f.fragment();
- return f.withFragment(
- frag.transformUp(
- InlineJoin.class,
- ij -> ij.right() == tuple.logical ? InlineJoin.inlineData(ij, resultWrapper) : ij
- )
- );
- });
+ LogicalPlan newLogicalPlan = optimizedPlan.transformUp(InlineJoin.class, ij ->
+ ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij
+ );
+ newLogicalPlan.setOptimized();
+ // look for the next inlinejoin plan
+ var newSubPlan = firstSubPlan(newLogicalPlan);
- if (subPlanIterator.hasNext() == false) {
- runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
+ if (newSubPlan == null) {// run the final "main" plan
+ var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request);
+ runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
completionInfoAccumulator.accumulate(finalResult.completionInfo());
finalListener.onResponse(
new Result(finalResult.schema(), finalResult.pages(), completionInfoAccumulator.finish(), executionInfo)
);
}));
- } else {
- // continue executing the subplans
- executeSubPlan(completionInfoAccumulator, newPlan, subPlanIterator, executionInfo, runner, next);
+ } else {// continue executing the subplans
+ executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, listener);
}
} finally {
Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks)));
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java
index 088b5c1c9205e..148cfd7ca4fa6 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvalsTests.java
@@ -83,7 +83,7 @@ public static void init() {
* \_StubRelation[[emp_no{f}#11, languages{f}#14, gender{f}#13, y{r}#10]]
*/
public void testGroupingAliasingMoved_To_LeftSideOfJoin() {
- assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
var plan = plan("""
from test
| keep emp_no, languages, gender
@@ -126,7 +126,7 @@ public void testGroupingAliasingMoved_To_LeftSideOfJoin() {
* {r}#21]]
*/
public void testGroupingAliasingMoved_To_LeftSideOfJoin_WithExpression() {
- assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
var plan = plan("""
from test
| keep emp_no, languages, gender, last_name, first_name
From cd31f725df51d5c44dc32ef1de5da96707cb0621 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Wed, 4 Jun 2025 18:55:10 +0300
Subject: [PATCH 02/20] Update docs/changelog/128917.yaml
---
docs/changelog/128917.yaml | 6 ++++++
1 file changed, 6 insertions(+)
create mode 100644 docs/changelog/128917.yaml
diff --git a/docs/changelog/128917.yaml b/docs/changelog/128917.yaml
new file mode 100644
index 0000000000000..2f2c379f8daad
--- /dev/null
+++ b/docs/changelog/128917.yaml
@@ -0,0 +1,6 @@
+pr: 128917
+summary: Adopt a "LogicalPlan" approach to running multiple sub-queries (with INLINESTATS
+ so far)
+area: ES|QL
+type: bug
+issues: []
From 811f5be138d3abd29dd4e37b68e9b2d98c59d950 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Wed, 4 Jun 2025 18:56:50 +0300
Subject: [PATCH 03/20] checkstyle
---
.../xpack/esql/plan/logical/join/InlineJoin.java | 1 -
.../elasticsearch/xpack/esql/planner/mapper/Mapper.java | 1 -
.../org/elasticsearch/xpack/esql/session/EsqlSession.java | 7 ++++---
3 files changed, 4 insertions(+), 5 deletions(-)
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
index 1ea733bd3d1b4..852d1d7f2d9b3 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
@@ -13,7 +13,6 @@
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
-import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
index 81994ea1f5c7a..8e61bda60c1f7 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java
@@ -25,7 +25,6 @@
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
-import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
index 248635fdab852..09f39e6c7e891 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
@@ -237,7 +237,7 @@ private LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
Holder subPlan = new Holder<>();
// Collect the first inlinejoin (bottom up in the tree or, viewing from the user-friendly query pov, the closest to ES source
// inlinestats command)
- optimizedPlan.forEachUp(InlineJoin.class, ij -> {
+ optimizedPlan.forEachUp(InlineJoin.class, ij -> {
// extract the right side of the plan and replace its source
if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) {
var p = InlineJoin.replaceStub(ij.left(), ij.right());
@@ -267,8 +267,9 @@ private void executeSubPlan(
LocalRelation resultWrapper = resultToPlan(subPlans.nonStubbedSubPlan, result);
// replace the original logical plan with the backing result
- LogicalPlan newLogicalPlan = optimizedPlan.transformUp(InlineJoin.class, ij ->
- ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij
+ LogicalPlan newLogicalPlan = optimizedPlan.transformUp(
+ InlineJoin.class,
+ ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij
);
newLogicalPlan.setOptimized();
// look for the next inlinejoin plan
From 567d600a3ca009825aae9ea19aeae4e0e9c97974 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Wed, 4 Jun 2025 19:21:48 +0300
Subject: [PATCH 04/20] More tests un-Ignored and updates after update from
main
---
.../src/main/resources/inlinestats.csv-spec | 16 +++-------
.../session/IndexResolverFieldNamesTests.java | 30 +++++++++----------
2 files changed, 19 insertions(+), 27 deletions(-)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 576125cb96419..73c896e78c5e0 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -324,12 +324,8 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
GWL | [mid, military] | 9 | [2, 4]
;
-afterStats-Ignore
-required_capability: join_planning_v1
-// fails with
-// line 1:104: Plan [ProjectExec[[count{r}#174, country{f}#182, avg{r}#177]]] optimized incorrectly due to missing references [count{r}#174]
-// line 1:104: Plan [TopNExec[[Order[count{r}#174,DESC,FIRST], Order[country{f}#182,ASC,LAST]],1000[INTEGER],null]] optimized incorrectly due to missing references [count{r}#174]
-// line 1:80: Plan [FilterExec[count{r}#174 > 13.365[DOUBLE]]] optimized incorrectly due to missing references [count{r}#174]
+afterStats
+required_capability: join_planning_v8
FROM airports
| STATS count=COUNT(*) BY country
@@ -1041,12 +1037,8 @@ emp_no:integer |languages:integer|salary:integer |max_salary:integer|min_salary:
10066 |5 |31897 |66817 |25324
;
-// Fails with (in json format only; for format=txt it returns empty result)
-// line 1:101: Plan [FieldExtractExec[description{f}#274, host{f}#272, ip0{f}#273, _index..]<[],[]>] optimized incorrectly due to duplicate output attribute ip1{r}#267
-// line 1:101: Plan [TopNExec[[Order[ip1{r}#267,ASC,LAST]],1[INTEGER],null]] optimized incorrectly due to duplicate output attribute ip1{r}#267
-// line 1:44: Plan [HashJoinExec[[host_group{f}#271, card{f}#276],[host_group{f}#271, card{f}#276],[host_group{r}#271, card{r}#276],[ip1{r}#267]]] optimized incorrectly due to duplicate output attribute ip1{r}#267
-// https://github.com/elastic/elasticsearch/issues/128311
-inlineStatsOverrideEVALed_FieldWithSameName-Ignore
+inlineStatsOverrideEVALed_FieldWithSameName
+required_capability: inlinestats_v8
FROM hosts METADATA _index
| EVAL x = ip1
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java
index 2a111e6bb606b..644c7ec60411c 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java
@@ -30,7 +30,7 @@ public void testBasicFromCommand() {
}
public void testBasicFromCommandWithInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("from test | inlinestats max(salary) by gender", ALL_FIELDS);
}
@@ -39,7 +39,7 @@ public void testBasicFromCommandWithMetadata() {
}
public void testBasicFromCommandWithMetadata_AndInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("from test metadata _index, _id, _version | inlinestats max(salary)", ALL_FIELDS);
}
@@ -305,7 +305,7 @@ public void testLimitZero() {
}
public void testLimitZero_WithInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
FROM employees
| INLINESTATS COUNT(*), MAX(salary) BY gender
@@ -320,7 +320,7 @@ public void testDocsDropHeight() {
}
public void testDocsDropHeight_WithInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
FROM employees
| DROP height
@@ -336,7 +336,7 @@ public void testDocsDropHeightWithWildcard() {
}
public void testDocsDropHeightWithWildcard_AndInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
FROM employees
| INLINESTATS MAX(salary) BY gender
@@ -503,7 +503,7 @@ public void testSortWithLimitOne_DropHeight() {
}
public void testSortWithLimitOne_DropHeight_WithInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("from employees | inlinestats avg(salary) by languages | sort languages | limit 1 | drop height*", ALL_FIELDS);
}
@@ -803,7 +803,7 @@ public void testFilterById() {
}
public void testFilterById_WithInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("FROM apps metadata _id | INLINESTATS max(rate) | WHERE _id == \"4\"", ALL_FIELDS);
}
@@ -1274,7 +1274,7 @@ public void testProjectDropPattern() {
}
public void testProjectDropPattern_WithInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
from test
| inlinestats max(foo) by bar
@@ -1357,7 +1357,7 @@ public void testCountAllAndOtherStatGrouped() {
}
public void testCountAllAndOtherStatGrouped_WithInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
from test
| inlinestats c = count(*), min = min(emp_no) by languages
@@ -1396,7 +1396,7 @@ public void testCountAllWithEval() {
}
public void testCountAllWithEval_AndInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
from test
| rename languages as l
@@ -1409,7 +1409,7 @@ public void testCountAllWithEval_AndInlinestats() {
}
public void testKeepAfterEval_AndInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
from test
| rename languages as l
@@ -1422,7 +1422,7 @@ public void testKeepAfterEval_AndInlinestats() {
}
public void testKeepBeforeEval_AndInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
from test
| rename languages as l
@@ -1435,7 +1435,7 @@ public void testKeepBeforeEval_AndInlinestats() {
}
public void testStatsBeforeEval_AndInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
from test
| rename languages as l
@@ -1447,7 +1447,7 @@ public void testStatsBeforeEval_AndInlinestats() {
}
public void testStatsBeforeInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
from test
| stats min = min(salary) by languages
@@ -1456,7 +1456,7 @@ public void testStatsBeforeInlinestats() {
}
public void testKeepBeforeInlinestats() {
- assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
assertFieldNames("""
from test
| keep languages, salary
From f834ce70dfad0e02418260ce1b488854644aae51 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Wed, 4 Jun 2025 20:08:59 +0300
Subject: [PATCH 05/20] Fix capability
---
.../qa/testFixtures/src/main/resources/inlinestats.csv-spec | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 73c896e78c5e0..5088e587a10b2 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -325,7 +325,7 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
;
afterStats
-required_capability: join_planning_v8
+required_capability: inlinestats_v8
FROM airports
| STATS count=COUNT(*) BY country
From d3f2838c5ac77073cada865aa6603cf4c4fc56bd Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Fri, 6 Jun 2025 16:14:05 +0300
Subject: [PATCH 06/20] TEMP
---
.../src/main/resources/inlinestats.csv-spec | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 5088e587a10b2..d9457401665cf 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -1050,3 +1050,13 @@ FROM hosts METADATA _index
description:text | host:keyword | ip0:ip | _index:keyword | x:ip | ip1:long | host_group:text | card:keyword
alpha db server |alpha |127.0.0.1 |hosts |127.0.0.1|1 |DB servers |eth0
;
+
+doubleShadowing
+
+FROM employees
+| INLINESTATS salary = min(salary) BY gender
+| INLINESTATS salary = max(salary) BY gender
+| KEEP salary, gender
+| SORT gender DESC
+| LIMIT 5
+;
From 1e9c355b1eadd1a7d47d1b38dabdc18a5f9218ce Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Wed, 11 Jun 2025 14:34:01 +0300
Subject: [PATCH 07/20] Adapt PruneColumns to have "visibility" into the
right-hand side of the inlinestats JOIN
---
.../src/main/resources/inlinestats.csv-spec | 48 ++++++++++++++++++-
.../optimizer/rules/logical/PruneColumns.java | 29 +++++++----
2 files changed, 66 insertions(+), 11 deletions(-)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index d9457401665cf..d6a708e3e6abe 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -1052,11 +1052,57 @@ alpha db server |alpha |127.0.0.1 |hosts |127.0.0.1|1
;
doubleShadowing
+required_capability: inlinestats_v8
FROM employees
| INLINESTATS salary = min(salary) BY gender
| INLINESTATS salary = max(salary) BY gender
| KEEP salary, gender
-| SORT gender DESC
+| SORT salary DESC, gender
| LIMIT 5
;
+
+salary:integer |gender:keyword
+25976 |F
+25976 |F
+25976 |F
+25976 |F
+25976 |F
+;
+
+doubleShadowingWithEval
+required_capability: inlinestats_v8
+
+from employees
+| eval salary = salary/100
+| inlinestats salary=min(salary) by gender
+| inlinestats salary=max(salary) by gender
+| keep salary, gender
+| sort salary desc, gender
+| limit 5
+;
+
+salary:integer|gender:keyword
+259 |F
+259 |F
+259 |F
+259 |F
+259 |F
+;
+
+doubleShadowingWithDoubleStats
+required_capability: inlinestats_v8
+
+from employees
+| stats salary=min(salary) by gender
+| stats salary=max(salary) by gender
+| inlinestats salary=min(salary)
+| inlinestats salary=max(salary)
+;
+ignoreOrder:true
+
+gender:keyword |salary:integer
+null |25324
+F |25324
+M |25324
+;
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
index 58e20030adb40..80368dba90459 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
@@ -22,6 +22,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Fork;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -39,15 +40,18 @@ public final class PruneColumns extends Rule {
public LogicalPlan apply(LogicalPlan plan) {
// track used references
var used = plan.outputSet().asBuilder();
+ // track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the
+ // inline join won't have it's columns pruned due to the lack of "visibility" into the right hand side output/Attributes
+ var inlineJoinRightOutput = new ArrayList();
Holder forkPresent = new Holder<>(false);
// while going top-to-bottom (upstream)
var pl = plan.transformDown(p -> {
- // Note: It is NOT required to do anything special for binary plans like JOINs. It is perfectly fine that transformDown descends
- // first into the left side, adding all kinds of attributes to the `used` set, and then descends into the right side - even
- // though the `used` set will contain stuff only used in the left hand side. That's because any attribute that is used in the
- // left hand side must have been created in the left side as well. Even field attributes belonging to the same index fields will
- // have different name ids in the left and right hand sides - as in the extreme example
+ // Note: It is NOT required to do anything special for binary plans like JOINs, except INLINESTATS. It is perfectly fine that
+ // transformDown descends first into the left side, adding all kinds of attributes to the `used` set, and then descends into
+ // the right side - even though the `used` set will contain stuff only used in the left hand side. That's because any attribute
+ // that is used in the left hand side must have been created in the left side as well. Even field attributes belonging to the
+ // same index fields will have different name ids in the left and right hand sides - as in the extreme example
// `FROM lookup_idx | LOOKUP JOIN lookup_idx ON key_field`.
// skip nodes that simply pass the input through
@@ -63,6 +67,10 @@ public LogicalPlan apply(LogicalPlan plan) {
return p;
}
+ if (p instanceof InlineJoin ij) {
+ inlineJoinRightOutput.addAll(ij.right().outputSet());
+ }
+
// remember used
boolean recheck;
// analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate
@@ -70,7 +78,7 @@ public LogicalPlan apply(LogicalPlan plan) {
do {
recheck = false;
if (p instanceof Aggregate aggregate) {
- var remaining = removeUnused(aggregate.aggregates(), used);
+ var remaining = removeUnused(aggregate.aggregates(), used, inlineJoinRightOutput);
if (remaining != null) {
if (remaining.isEmpty()) {
@@ -97,7 +105,7 @@ public LogicalPlan apply(LogicalPlan plan) {
}
}
} else if (p instanceof Eval eval) {
- var remaining = removeUnused(eval.fields(), used);
+ var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput);
// no fields, no eval
if (remaining != null) {
if (remaining.isEmpty()) {
@@ -111,7 +119,7 @@ public LogicalPlan apply(LogicalPlan plan) {
// Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway.
// However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index,
// it works differently as we extract all fields (other than the join key) that the EsRelation has.
- var remaining = removeUnused(esr.output(), used);
+ var remaining = removeUnused(esr.output(), used, inlineJoinRightOutput);
if (remaining != null) {
p = new EsRelation(esr.source(), esr.indexPattern(), esr.indexMode(), esr.indexNameWithModes(), remaining);
}
@@ -131,14 +139,15 @@ public LogicalPlan apply(LogicalPlan plan) {
* Prunes attributes from the list not found in the given set.
* Returns null if no changed occurred.
*/
- private static List removeUnused(List named, AttributeSet.Builder used) {
+ private static List removeUnused(List named, AttributeSet.Builder used, List exceptions) {
var clone = new ArrayList<>(named);
var it = clone.listIterator(clone.size());
// due to Eval, go in reverse
while (it.hasPrevious()) {
N prev = it.previous();
- if (used.contains(prev.toAttribute()) == false) {
+ var attr = prev.toAttribute();
+ if (used.contains(attr) == false && exceptions.contains(attr) == false) {
it.remove();
} else {
used.addAll(prev.references());
From 377328c7e8a990d83a9b3babded5b97ed61eadc1 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Wed, 11 Jun 2025 21:13:38 +0300
Subject: [PATCH 08/20] More fixes and tests
---
.../src/main/resources/inlinestats.csv-spec | 145 ++++++++++++++----
.../rules/logical/PropagateInlineEvals.java | 2 +-
.../xpack/esql/plan/logical/InlineStats.java | 4 +
.../esql/plan/logical/join/InlineJoin.java | 11 +-
.../xpack/esql/analysis/AnalyzerTests.java | 15 ++
5 files changed, 145 insertions(+), 32 deletions(-)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index d6a708e3e6abe..1ddb230ada83a 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -218,7 +218,7 @@ emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:
10007 |393084805 |2.863684210555556E8 |4 |5 |F
;
-three-Ignore
+three
required_capability: inlinestats_v8
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
@@ -312,16 +312,28 @@ FROM airports
// end::extreme-airports-result[]
;
-brokenwhy-Ignore
+mvMinMvExpand
required_capability: join_planning_v1
+required_capability: inlinestats_v8
FROM airports
+| EVAL original_type = type
| INLINESTATS min_scalerank=MIN(scalerank) BY type
| MV_EXPAND type
-| WHERE scalerank == MV_MIN(scalerank);
+| EVAL mvMin_scalerank = MV_MIN(scalerank)
+| WHERE scalerank == MV_MIN(scalerank)
+| SORT abbrev DESC NULLS LAST
+| LIMIT 7
+;
-abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
- GWL | [mid, military] | 9 | [2, 4]
+abbrev:keyword |city:keyword |city_location:geo_point |country:keyword| location:geo_point | name:text |scalerank:integer|original_type:keyword |min_scalerank:integer|type:keyword|mvMin_scalerank:integer
+ZRH |Zürich |POINT (8.5411 47.3744) |Switzerland |POINT (8.56221279534765 47.4523895064915) |Zurich Int'l |3 |major |2 |major |3
+ZNZ |Zanzibar |POINT (39.199 -6.165) |Tanzania |POINT (39.2223319841558 -6.21857034620282)|Zanzibar |4 |mid |2 |mid |4
+ZLO |Cihuatlán |POINT (-104.5667 19.25) |Mexico |POINT (-104.560095200097 19.1480860285854)|Playa de Oro Int'l |7 |mid |2 |mid |7
+ZHHH |Wuhan |POINT (114.2881 30.5872)|China |POINT (114.24694737615 30.6017141196702) |Wang-Chia Tun Airbase|6 |[mid, military] |[2, 4] |mid |6
+ZHHH |Wuhan |POINT (114.2881 30.5872)|China |POINT (114.24694737615 30.6017141196702) |Wang-Chia Tun Airbase|6 |[mid, military] |[2, 4] |military |6
+ZGC |Lanzhou |POINT (103.8318 36.0617)|China |POINT (103.615415363043 36.5078842461237) |Lanzhou Zhongchuan |6 |mid |2 |mid |6
+ZAR |Zaria |POINT (7.7 11.0667) |Nigeria |POINT (7.68726764310577 11.1352958601071) |Zaria |8 |mid |2 |mid |8
;
afterStats
@@ -627,11 +639,10 @@ abbrev:keyword | scalerank:integer | location:geo_point
ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 2 | 20 | -100
;
-byTwoCalculatedSecondOverwrites-Ignore
+byTwoCalculatedSecondOverwrites
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
-// fails with
-// line 1:78: Plan [InlineJoin[LEFT,[x{r}#20, x{r}#23],[x{r}#20, x{r}#23],[x{r}#23, x{r}#23]]] optimized incorrectly due to missing references from left hand side [x{r}#20]
+required_capability: inlinestats_v8
FROM airports
| WHERE abbrev IS NOT NULL
@@ -644,18 +655,16 @@ FROM airports
;
warning:Line 5:4: Field 'x' shadowed by field at line 6:3
-abbrev:keyword | scalerank:integer | location:geo_point | x:double | min_sl:integer
- ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 10 | 2
- ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 40 | 2
- ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2
+abbrev:keyword | scalerank:integer | location:geo_point | min_sl:integer| x:double
+ ZRH | 3 | POINT (8.56221279534765 47.4523895064915) | 2 | 10
+ ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 2 | 40
+ ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 2 | -100
;
-byTwoCalculatedSecondOverwritesReferencingFirst-Ignore
+byTwoCalculatedSecondOverwritesReferencingFirst
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
-// fails with
-// line 1:105: Plan [Aggregate[[x{r}#45],[MIN(scalerank{f}#50,true[BOOLEAN]) AS min_sl, x{r}#45]]] optimized incorrectly due to missing references [x{r}#45]
-// line 1:105: Plan [InlineJoin[LEFT,[x{r}#42, x{r}#45],[x{r}#42, x{r}#45],[x{r}#45, x{r}#45]]] optimized incorrectly due to missing references from left hand side [x{r}#42]
+required_capability: inlinestats_v8
FROM airports
| WHERE abbrev IS NOT NULL
@@ -669,35 +678,33 @@ FROM airports
;
warning:Line 6:4: Field 'x' shadowed by field at line 7:3
-abbrev:keyword | scalerank:integer | location:geo_point | x:double | min_sl:integer
- ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 10 | 2
- ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 40 | 2
- ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2
+abbrev:keyword | scalerank:integer | location:geo_point | min_sl:integer| x:double
+ ZRH | 3 | POINT (8.56221279534765 47.4523895064915) | 2 | 10
+ ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | 2 | 40
+ ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 2 | -100
;
-groupShadowsAgg-Ignore
+groupShadowsAgg
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
-// fails with
-// line 1:134: column [location] cannot be used as an aggregate once declared in the STATS BY grouping key [lat_10 = ROUND(ST_Y(location), -1)]
+required_capability: inlinestats_v8
FROM airports
| WHERE abbrev IS NOT NULL
| KEEP abbrev, scalerank, location
| INLINESTATS min_sl=MIN(scalerank)
- , lat_10 = ROUND(ST_Y(location), -1)
- BY lat_10 = ROUND(ST_Y(location), -1)
+ BY min_sl = ROUND(ST_Y(location), -1)
, lon_10 = ROUND(ST_X(location), -1)
| SORT abbrev DESC
| LIMIT 3
;
-warning:Line 5:3: Field 'lat_10' shadowed by field at line 6:4
+warning:Line 4:15: Field 'min_sl' shadowed by field at line 5:4
-abbrev:keyword | scalerank:integer | location:geo_point | lat_10:double | lon_10:double | min_sl:integer
- ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 50 | 10 | 2
- ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | -10 | 40 | 4
- ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100 | 2
+abbrev:keyword | scalerank:integer | location:geo_point | min_sl:double | lon_10:double
+ ZRH | 3 | POINT(8.56221279534765 47.4523895064915) | 50 | 10
+ ZNZ | 4 | POINT (39.2223319841558 -6.21857034620282) | -10 | 40
+ ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100
;
groupShadowsField
@@ -1106,3 +1113,81 @@ null |25324
F |25324
M |25324
;
+
+renamingGroupingWithItself
+required_capability: inlinestats_v8
+
+FROM employees
+| EVAL x = gender
+| INLINESTATS min_sl = MIN(salary) BY x = x
+| SORT salary DESC
+| KEEP salary, x, gender, min_sl, emp_no
+| LIMIT 5
+;
+
+salary:integer |x:keyword|gender:keyword |min_sl:integer |emp_no:integer
+74999 |M |M |25945 |10029
+74970 |M |M |25945 |10045
+74572 |F |F |25976 |10007
+73851 |F |F |25976 |10027
+73717 |null |null |25324 |10019
+;
+
+overridingGroupings
+required_capability: inlinestats_v8
+
+FROM employees
+| INLINESTATS min_sl = MIN(salary) BY x = gender, x = languages
+| KEEP salary, x, gender, min_sl, emp_no
+| SORT salary
+| LIMIT 5
+;
+warning:Line 2:39: Field 'x' shadowed by field at line 2:51
+
+salary:integer |x:integer |gender:keyword |min_sl:integer |emp_no:integer
+25324 |5 |null |25324 |10015
+25945 |5 |M |25324 |10035
+25976 |1 |F |25976 |10092
+26436 |3 |M |26436 |10048
+27215 |4 |F |27215 |10057
+;
+
+overridingExpressionGroupings
+required_capability: inlinestats_v8
+
+FROM employees
+| INLINESTATS min_sl = MIN(salary) BY x = TO_LOWER(gender), x = CONCAT(gender, gender)
+| SORT salary DESC
+| KEEP salary, x, gender, min_sl, emp_no
+| LIMIT 5
+;
+warning:Line 2:39: Field 'x' shadowed by field at line 2:61
+
+salary:integer |x:keyword |gender:keyword |min_sl:integer |emp_no:integer
+74999 |MM |M |25945 |10029
+74970 |MM |M |25945 |10045
+74572 |FF |F |25976 |10007
+73851 |FF |F |25976 |10027
+73717 |null |null |25324 |10019
+;
+
+reusingEvalExpressions_UsedInGroupings
+required_capability: inlinestats_v8
+
+FROM employees
+| KEEP salary, gender, emp_no
+| EVAL x = TO_LOWER(gender), x = CONCAT(x, " ", x)
+| INLINESTATS min_sl = MIN(salary) BY x
+| SORT salary DESC
+| LIMIT 5
+;
+
+salary:integer |gender:keyword |emp_no:integer |min_sl:integer | x:keyword
+74999 |M |10029 |25945 |m m
+74970 |M |10045 |25945 |m m
+74572 |F |10007 |25976 |f f
+73851 |F |10027 |25976 |f f
+73717 |null |10019 |25324 |null
+;
+
+
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvals.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvals.java
index 1d921a5037b6f..c83e4b319763a 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvals.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateInlineEvals.java
@@ -23,7 +23,7 @@
/**
* Replace any evaluation from the inlined aggregation side (right side) to the left side (source) to perform the matching.
- * In INLINE m = MIN(x) BY a + b the right side contains STATS m = MIN(X) BY a + b.
+ * In INLINESTATS m = MIN(x) BY a + b the right side contains STATS m = MIN(X) BY a + b.
* As the grouping key is used to perform the join, the evaluation required for creating it has to be copied to the left side
* as well.
*/
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java
index 724aa2da25983..be34631ec8149 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Objects;
+import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
/**
@@ -101,6 +102,9 @@ private JoinConfig joinConfig() {
for (Expression g : groupings) {
namedGroupings.add(Expressions.attribute(g));
}
+ // last named grouping wins, just like it happens for regular STATS
+ // ie BY x = field_1, x = field_2, the grouping is actually performed on second x (field_2)
+ namedGroupings = mergeOutputAttributes(namedGroupings, emptyList());
List leftFields = new ArrayList<>(groupings.size());
List rightFields = new ArrayList<>(groupings.size());
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
index 852d1d7f2d9b3..8eb7d9c252b64 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
@@ -59,7 +59,16 @@ public static LogicalPlan stubSource(UnaryPlan sourcePlan, LogicalPlan target) {
* Replaces the stubbed source with the actual source.
*/
public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) {
- return stubbed.transformUp(StubRelation.class, stubRelation -> source);
+ // here we could have used stubbed.transformUp(StubRelation.class, stubRelation -> source)
+ // but transformUp skips changing a node if its tranformed variant it's equal to its original variant.
+ // A StubRelation can contain in its output ReferenceAttributes which do not use NameIds for equality, but only names and
+ // two ReferenceAttributes with the same name are equal and the transformation will not be applied.
+ return stubbed.transformUp(UnaryPlan.class, up -> {
+ if (up.child() instanceof StubRelation) {
+ return up.replaceChild(source);
+ }
+ return up;
+ });
}
/**
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java
index 402c09afc3c8a..91c7ce5764df6 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java
@@ -4089,6 +4089,21 @@ public void testImplicitCastingForDateAndDateNanosFields() {
assertEquals("test*", esRelation.indexPattern());
}
+ public void testGroupingOverridesInStats() {
+ verifyUnsupported("""
+ from test
+ | stats MIN(salary) BY x = languages, x = x + 1
+ """, "Found 1 problem\n" + "line 2:43: Unknown column [x]", "mapping-default.json");
+ }
+
+ public void testGroupingOverridesInInlinestats() {
+ assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
+ verifyUnsupported("""
+ from test
+ | inlinestats MIN(salary) BY x = languages, x = x + 1
+ """, "Found 1 problem\n" + "line 2:49: Unknown column [x]", "mapping-default.json");
+ }
+
private void verifyNameAndType(String actualName, DataType actualType, String expectedName, DataType expectedType) {
assertEquals(expectedName, actualName);
assertEquals(expectedType, actualType);
From cffa1d7c363a637a0c592829ad0d993e5794a61d Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Thu, 12 Jun 2025 15:44:05 +0300
Subject: [PATCH 09/20] More deterministic test
---
.../qa/testFixtures/src/main/resources/inlinestats.csv-spec | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 1ddb230ada83a..3c96ccc1a9376 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -322,7 +322,7 @@ FROM airports
| MV_EXPAND type
| EVAL mvMin_scalerank = MV_MIN(scalerank)
| WHERE scalerank == MV_MIN(scalerank)
-| SORT abbrev DESC NULLS LAST
+| SORT abbrev DESC NULLS LAST, type
| LIMIT 7
;
From e3b5c928e26536784177a006bb8099e5417df814 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Thu, 12 Jun 2025 15:46:13 +0300
Subject: [PATCH 10/20] Update docs/changelog/128917.yaml
---
docs/changelog/128917.yaml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/changelog/128917.yaml b/docs/changelog/128917.yaml
index 2f2c379f8daad..2d80fb5b43354 100644
--- a/docs/changelog/128917.yaml
+++ b/docs/changelog/128917.yaml
@@ -2,5 +2,5 @@ pr: 128917
summary: Adopt a "LogicalPlan" approach to running multiple sub-queries (with INLINESTATS
so far)
area: ES|QL
-type: bug
+type: enhancement
issues: []
From bc6f8ccc3132f5426ef0c04f6d7169f43616f785 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Fri, 13 Jun 2025 13:19:53 +0300
Subject: [PATCH 11/20] Test inlinestats after stats with bucket and top
---
.../src/main/resources/inlinestats.csv-spec | 34 +++++++++++++++++++
1 file changed, 34 insertions(+)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 3c96ccc1a9376..c1671e3c8ed74 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -1190,4 +1190,38 @@ salary:integer |gender:keyword |emp_no:integer |min_sl:integer | x:keyword
73717 |null |10019 |25324 |null
;
+statsBeforeInlinestatsWithTopAndBucket1
+required_capability: inlinestats_v8
+
+FROM books
+| STATS avg_rating = AVG(ratings) BY decade = BUCKET(year, 10)
+| INLINESTATS decades = TOP(decade, 3, "DESC")
+| SORT avg_rating DESC
+| LIMIT 10
+;
+
+avg_rating:double | decade:double | decades:double
+4.954999923706055 |1960.0 |[2020.0, 2010.0, 2000.0]
+4.387647109873154 |1990.0 |[2020.0, 2010.0, 2000.0]
+4.339166651169459 |2000.0 |[2020.0, 2010.0, 2000.0]
+4.274615342800434 |2010.0 |[2020.0, 2010.0, 2000.0]
+4.063333352406819 |1970.0 |[2020.0, 2010.0, 2000.0]
+3.880000114440918 |2020.0 |[2020.0, 2010.0, 2000.0]
+3.6633334159851074|1980.0 |[2020.0, 2010.0, 2000.0]
+;
+
+statsBeforeInlinestatsWithTopAndBucket2
+required_capability: inlinestats_v8
+
+FROM sample_data
+| STATS total_duration = SUM(event_duration) BY day = BUCKET(@timestamp, 1 HOUR)
+| INLINESTATS days = TOP(day, 2, "ASC")
+| SORT total_duration ASC
+| LIMIT 5
+;
+
+total_duration:long | day:date | days:date
+6215122 |2023-10-23T12:00:00.000Z|[2023-10-23T12:00:00.000Z, 2023-10-23T13:00:00.000Z]
+17016205 |2023-10-23T13:00:00.000Z|[2023-10-23T12:00:00.000Z, 2023-10-23T13:00:00.000Z]
+;
From bb0450c323302e07c5d88ab7102705352b576238 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Tue, 17 Jun 2025 14:19:20 +0300
Subject: [PATCH 12/20] Have Row - LocalRelation transformation use a copying
blocks approach
---
.../src/main/resources/inlinestats.csv-spec | 85 +++++++++++++++----
.../logical/ReplaceRowAsLocalRelation.java | 4 +-
.../logical/local/CopyingLocalSupplier.java | 31 +++++++
.../logical/local/ImmediateLocalSupplier.java | 2 +-
.../xpack/esql/session/EsqlSession.java | 3 +-
.../xpack/esql/session/SessionUtils.java | 4 +-
6 files changed, 106 insertions(+), 23 deletions(-)
create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index c1671e3c8ed74..03c45b8f66b15 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -509,61 +509,112 @@ ALG |Algiers |385 |major |1
ALL |Albenga |499 |mid |1 |Albenga
;
-shadowing-Ignore
+shadowing
required_capability: join_planning_v1
+required_capability: inlinestats_v8
ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
-| INLINESTATS env=VALUES(right) BY client_ip
+| INLINESTATS env = VALUES(right) BY client_ip
;
-left:keyword | client_ip:keyword | right:keyword | env:keyword
-left | 172.21.0.5 | right | right
+left:keyword | right:keyword | env:keyword | client_ip:keyword
+left | right | right | 172.21.0.5
;
-shadowingMulti-Ignore
+shadowingMulti
required_capability: join_planning_v1
+required_capability: inlinestats_v8
ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right"
| INLINESTATS airport=VALUES(left), region=VALUES(left), city_boundary=VALUES(left) BY city
;
-left:keyword | city:keyword | middle:keyword | right:keyword | airport:keyword | region:keyword | city_boundary:keyword
-left | Zürich | middle | right | left | left | left
+left:keyword | middle:keyword | right:keyword | airport:keyword | region:keyword | city_boundary:keyword | city:keyword
+left | middle | right | left | left | left | Zürich
;
-shadowingSelf-Ignore
+shadowingSelf
required_capability: join_planning_v1
+required_capability: inlinestats_v8
-ROW city="Raleigh"
-| INLINESTATS city=COUNT(city)
+ROW city = "Raleigh"
+| INLINESTATS city = COUNT(city)
;
city:long
1
;
-shadowingSelfBySelf-Ignore
+shadowingSelfBySelf
required_capability: join_planning_v1
+required_capability: inlinestats_v8
-ROW city="Raleigh"
-| INLINESTATS city=COUNT(city) BY city
+ROW city = "Raleigh"
+| INLINESTATS city = COUNT(city) BY city
;
+warning:Line 2:15: Field 'city' shadowed by field at line 2:37
-city:long
-1
+city:keyword
+Raleigh
;
-shadowingInternal-Ignore
+shadowingInternal
required_capability: join_planning_v1
+required_capability: inlinestats_v8
ROW city = "Zürich"
-| INLINESTATS x=VALUES(city), x=VALUES(city)
+| INLINESTATS x = VALUES(city), x = VALUES(city)
;
+warning:Line 2:15: Field 'x' shadowed by field at line 2:33
city:keyword | x:keyword
Zürich | Zürich
;
+multiInlinestatsWithRow
+required_capability: inlinestats_v8
+
+row x = 1
+| inlinestats x = max(x) + min(x)
+| eval y = x + 1
+| inlinestats sum(y)
+| inlinestats count(y), count(x)
+;
+
+ x:integer | y:integer | sum(y):long | count(y):long | count(x):long
+2 |3 |3 |1 |1
+;
+
+ignoreUnusedEvaledValue-Ignore
+required_capability: inlinestats_v8
+// fails with expected [keys] to be non-empty
+
+ROW x = 1
+| INLINESTATS max(x)
+| EVAL y = x + 1
+| KEEP x
+;
+
+x:integer
+1
+;
+
+ignoreUnusedEvaledValue2-Ignore
+required_capability: inlinestats_v8
+// fails with expected [keys] to be non-empty
+
+from employees
+| inlinestats max(salary)
+| eval y = salary + 1
+| keep salary
+| sort salary desc
+| limit 1
+;
+
+ salary:integer
+74999
+;
+
byConstant
required_capability: inlinestats_v8
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java
index 9e7b6ce80422d..a5ea01c53aa4e 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceRowAsLocalRelation.java
@@ -11,8 +11,8 @@
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Row;
+import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
-import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import java.util.ArrayList;
@@ -29,6 +29,6 @@ protected LogicalPlan rule(Row row, LogicalOptimizerContext context) {
List
*/
-public interface LocalSupplier extends Supplier, Writeable {
+public interface LocalSupplier extends Supplier, Writeable, NamedWriteable {
LocalSupplier EMPTY = new LocalSupplier() {
+ @Override
+ public String getWriteableName() {
+ return "EmptySupplier";
+ }
+
@Override
public Block[] get() {
return BlockUtils.NO_BLOCKS;
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExec.java
index 84e09bdd643a4..9040b2dc27d9d 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExec.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LocalSourceExec.java
@@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.plan.physical;
+import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -39,14 +40,22 @@ public LocalSourceExec(Source source, List output, LocalSupplier supp
public LocalSourceExec(StreamInput in) throws IOException {
super(Source.readFrom((PlanStreamInput) in));
this.output = in.readNamedWriteableCollectionAsList(Attribute.class);
- this.supplier = LocalSupplier.readFrom((PlanStreamInput) in);
+ if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
+ this.supplier = in.readNamedWriteable(LocalSupplier.class);
+ } else {
+ this.supplier = LocalSupplier.readFrom((PlanStreamInput) in);
+ }
}
@Override
public void writeTo(StreamOutput out) throws IOException {
source().writeTo(out);
out.writeNamedWriteableCollection(output);
- supplier.writeTo(out);
+ if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
+ out.writeNamedWriteable(supplier);
+ } else {
+ supplier.writeTo(out);
+ }
}
@Override
From 0500ab62815f3b9e2496235293921a282c21018b Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Wed, 25 Jun 2025 19:27:25 +0300
Subject: [PATCH 14/20] Adapt to EXPLAIN command
---
.../xpack/esql/session/EsqlSession.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
index 53cadd6ad71df..5076555670815 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
@@ -102,6 +102,7 @@
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -235,6 +236,7 @@ public void executeOptimizedPlan(
ActionListener listener
) {
if (explainMode) {
+ PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
String physicalPlanString = physicalPlan.toString();
List fields = List.of(
new ReferenceAttribute(EMPTY, "role", DataType.KEYWORD),
@@ -247,11 +249,13 @@ public void executeOptimizedPlan(
values.add(List.of("coordinator", "optimizedPhysicalPlan", physicalPlanString));
var blocks = BlockUtils.fromList(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values);
physicalPlan = new LocalSourceExec(Source.EMPTY, fields, LocalSupplier.of(blocks));
+ planRunner.run(physicalPlan, listener);
+ } else {
+ // TODO: this could be snuck into the underlying listener
+ EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
+ // execute any potential subplans
+ executeSubPlans(optimizedPlan, planRunner, executionInfo, request, listener);
}
- // TODO: this could be snuck into the underlying listener
- EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
- // execute any potential subplans
- executeSubPlans(optimizedPlan, planRunner, executionInfo, request, listener);
}
private record LogicalPlanTuple(LogicalPlan nonStubbedSubPlan, LogicalPlan originalSubPlan) {}
From 17d593349bd86deb159bfd26f5a3b842872dae3e Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Fri, 27 Jun 2025 19:19:55 +0300
Subject: [PATCH 15/20] Add more tests and fix incorrect pruning of columns of
the inlinestats' Aggregation. Move two methods from EsqlSession to
InlineJoin. Address reviews Add more comments
---
.../src/main/resources/inlinestats.csv-spec | 143 +++++++++++++++---
.../optimizer/rules/logical/PruneColumns.java | 13 +-
.../esql/plan/logical/join/InlineJoin.java | 77 ++++++++--
.../logical/local/CopyingLocalSupplier.java | 32 ++++
.../xpack/esql/session/EsqlSession.java | 36 ++---
.../xpack/esql/session/SessionUtils.java | 4 +-
6 files changed, 243 insertions(+), 62 deletions(-)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 03c45b8f66b15..6155eea6a8166 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -201,7 +201,6 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag
;
two
-required_capability: join_planning_v1
required_capability: inlinestats_v8
FROM employees
@@ -210,17 +209,24 @@ FROM employees
| WHERE avg_worked_seconds > avg_avg_worked_seconds
| INLINESTATS max_languages = MAX(languages) BY gender
| SORT emp_no ASC
-| LIMIT 3;
+| LIMIT 10;
emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:integer|max_languages:integer|gender:keyword
10002 |328922887 |3.133013149047619E8 |5 |5 |F
10006 |372957040 |2.978159518235294E8 |3 |5 |F
10007 |393084805 |2.863684210555556E8 |4 |5 |F
+10010 |315236372 |2.863684210555556E8 |4 |5 |null
+10012 |365510850 |3.133013149047619E8 |5 |5 |null
+10015 |390266432 |3.133013149047619E8 |5 |5 |null
+10018 |309604079 |3.0318626831578946E8 |2 |5 |null
+10019 |342855721 |2.94833632E8 |1 |5 |null
+10020 |373309605 |3.181719481E8 |null |5 |M
+10023 |330870342 |3.181719481E8 |null |5 |F
;
three
required_capability: inlinestats_v8
-// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
+// used to fail with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
FROM employees
| KEEP emp_no, languages, avg_worked_seconds, gender
@@ -245,6 +251,32 @@ emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:
10023 |330870342 |3.181719481E8 |null |5 |3 |5 |F
;
+// TODO: INLINESTATS unit test needed for this one
+pushDownSort_To_LeftSideOnly
+required_capability: inlinestats_v8
+
+from employees
+| sort emp_no desc
+| inlinestats avg = avg(salary) by languages
+| limit 5
+| keep emp_no, avg, languages, gender
+;
+
+ emp_no:integer| avg:double |languages:integer|gender:keyword
+10001 |57305.0 |2 |M
+10002 |46272.5 |5 |F
+10003 |61805.0 |4 |M
+10004 |46272.5 |5 |M
+10005 |63528.0 |1 |M
+;
+
+from employees
+| sort emp_no desc
+| inlinestats avg = avg(salary) by languages
+| limit 5
+| keep emp_no, avg, languages, gender
+;
+
byMultivaluedSimple
required_capability: inlinestats_v8
@@ -313,7 +345,6 @@ FROM airports
;
mvMinMvExpand
-required_capability: join_planning_v1
required_capability: inlinestats_v8
FROM airports
@@ -378,7 +409,6 @@ abbrev:keyword | country:keyword | count:long
;
afterLookup
-required_capability: join_planning_v1
required_capability: inlinestats_v8
required_capability: join_lookup_v12
@@ -403,7 +433,6 @@ ZNZ |4 |German
;
afterEnrich
-required_capability: join_planning_v1
required_capability: inlinestats_v8
required_capability: enrich_load
@@ -466,7 +495,6 @@ emp_no:integer | languages:integer | max_salary:integer
;
beforeEnrich
-required_capability: join_planning_v1
required_capability: inlinestats_v8
required_capability: enrich_load
@@ -486,7 +514,6 @@ ACA |Acapulco de Juárez|385 |major |Acapulco de
;
beforeAndAfterEnrich
-required_capability: join_planning_v1
required_capability: inlinestats_v8
required_capability: enrich_load
@@ -510,7 +537,6 @@ ALL |Albenga |499 |mid |1
;
shadowing
-required_capability: join_planning_v1
required_capability: inlinestats_v8
ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
@@ -522,7 +548,6 @@ left | right | right | 172.21.0.5
;
shadowingMulti
-required_capability: join_planning_v1
required_capability: inlinestats_v8
ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right"
@@ -534,7 +559,6 @@ left | middle | right | left | left
;
shadowingSelf
-required_capability: join_planning_v1
required_capability: inlinestats_v8
ROW city = "Raleigh"
@@ -546,7 +570,6 @@ city:long
;
shadowingSelfBySelf
-required_capability: join_planning_v1
required_capability: inlinestats_v8
ROW city = "Raleigh"
@@ -559,7 +582,6 @@ Raleigh
;
shadowingInternal
-required_capability: join_planning_v1
required_capability: inlinestats_v8
ROW city = "Zürich"
@@ -585,9 +607,8 @@ row x = 1
2 |3 |3 |1 |1
;
-ignoreUnusedEvaledValue-Ignore
+ignoreUnusedEvaledValue_AndInlineStats
required_capability: inlinestats_v8
-// fails with expected [keys] to be non-empty
ROW x = 1
| INLINESTATS max(x)
@@ -599,9 +620,21 @@ x:integer
1
;
-ignoreUnusedEvaledValue2-Ignore
+ignoreUnusedEvaledValue_AndInlineStats2
+required_capability: inlinestats_v8
+
+ROW x = 1, z = 2
+| INLINESTATS max(x)
+| EVAL a = x + 1, b = z + 2
+| KEEP x, z
+;
+
+x:integer | z:integer
+1 | 2
+;
+
+ignoreUnusedEvaledValue_AndInlineStats3
required_capability: inlinestats_v8
-// fails with expected [keys] to be non-empty
from employees
| inlinestats max(salary)
@@ -615,6 +648,36 @@ from employees
74999
;
+ignoreUnusedEvaledValue_AndInlineStats4
+required_capability: inlinestats_v8
+
+from employees
+| inlinestats max(salary), m = min(salary) by gender
+| eval y = concat(gender, "")
+| keep emp_no
+| sort emp_no desc
+| limit 1
+;
+
+emp_no:integer
+10100
+;
+
+ignoreUnusedEvaledValue_AndInlineStats5
+required_capability: inlinestats_v8
+
+from employees
+| inlinestats max(salary), m = min(salary) by gender
+| eval y = m / 2
+| keep emp_no
+| sort emp_no desc
+| limit 1
+;
+
+emp_no:integer
+10100
+;
+
byConstant
required_capability: inlinestats_v8
@@ -691,7 +754,6 @@ abbrev:keyword | scalerank:integer | location:geo_point
;
byTwoCalculatedSecondOverwrites
-required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
required_capability: inlinestats_v8
@@ -713,7 +775,6 @@ abbrev:keyword | scalerank:integer | location:geo_point
;
byTwoCalculatedSecondOverwritesReferencingFirst
-required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
required_capability: inlinestats_v8
@@ -737,7 +798,6 @@ abbrev:keyword | scalerank:integer | location:geo_point
groupShadowsAgg
-required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
required_capability: inlinestats_v8
@@ -1128,6 +1188,49 @@ salary:integer |gender:keyword
25976 |F
;
+doubleShadowing_WithIntertwinedFilters
+required_capability: inlinestats_v8
+
+FROM employees
+| WHERE salary > 30000
+| INLINESTATS salary = min(salary) BY gender
+| WHERE salary > 31000
+| INLINESTATS salary = max(salary) BY gender
+| WHERE salary > 31500
+| KEEP salary, gender
+| SORT salary DESC, gender
+;
+warning:No limit defined, adding default limit of [1000]
+
+salary:integer |gender:keyword
+31120 |null
+31120 |null
+31120 |null
+31120 |null
+31120 |null
+31120 |null
+31120 |null
+31120 |null
+31120 |null
+;
+
+shadowingAggregateByNextGrouping
+required_capability: inlinestats_v8
+
+FROM employees
+| KEEP gender, languages, emp_no, salary
+| INLINESTATS gender = count_distinct(gender) BY languages
+| INLINESTATS avg(salary) BY gender
+| SORT emp_no
+| LIMIT 3
+;
+
+emp_no:integer |salary:integer |languages:integer|avg(salary):double|gender:keyword
+10001 |57305 |2 |48248.55 |2
+10002 |56371 |5 |48248.55 |2
+10003 |61805 |4 |48248.55 |2
+;
+
doubleShadowingWithEval
required_capability: inlinestats_v8
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
index 80368dba90459..a2f027f72e23d 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
@@ -29,6 +29,7 @@
import org.elasticsearch.xpack.esql.rule.Rule;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
@@ -41,7 +42,7 @@ public LogicalPlan apply(LogicalPlan plan) {
// track used references
var used = plan.outputSet().asBuilder();
// track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the
- // inline join won't have it's columns pruned due to the lack of "visibility" into the right hand side output/Attributes
+ // inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes
var inlineJoinRightOutput = new ArrayList();
Holder forkPresent = new Holder<>(false);
@@ -104,6 +105,16 @@ public LogicalPlan apply(LogicalPlan plan) {
p = aggregate.with(aggregate.groupings(), remaining);
}
}
+ } else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add tests for this IJ removal
+ var remaining = removeUnused(ij.right().output(), used, Collections.emptyList());
+ if (remaining != null) {
+ if (remaining.isEmpty()) {
+ // remove the InlineJoin altogether
+ p = ij.left();
+ recheck = true;
+ }
+ // TODO: InlineStats - prune only the unused columns from it?
+ }
} else if (p instanceof Eval eval) {
var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput);
// no fields, no eval
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
index 8eb7d9c252b64..57f0acda81a0f 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java
@@ -16,6 +16,7 @@
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@@ -55,12 +56,31 @@ public static LogicalPlan stubSource(UnaryPlan sourcePlan, LogicalPlan target) {
return sourcePlan.replaceChild(new StubRelation(sourcePlan.source(), target.output()));
}
+ /**
+ * TODO: perform better planning
+ * Keep the join in place or replace it with an Eval in case no grouping is necessary.
+ */
+ public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) {
+ if (target.config().matchFields().isEmpty()) {
+ List schema = data.output();
+ Block[] blocks = data.supplier().get();
+ List aliases = new ArrayList<>(schema.size());
+ for (int i = 0; i < schema.size(); i++) {
+ Attribute attr = schema.get(i);
+ aliases.add(new Alias(attr.source(), attr.name(), Literal.of(attr, BlockUtils.toJavaObject(blocks[i], 0)), attr.id()));
+ }
+ return new Eval(target.source(), target.left(), aliases);
+ } else {
+ return target.replaceRight(data);
+ }
+ }
+
/**
* Replaces the stubbed source with the actual source.
*/
public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) {
// here we could have used stubbed.transformUp(StubRelation.class, stubRelation -> source)
- // but transformUp skips changing a node if its tranformed variant it's equal to its original variant.
+ // but transformUp skips changing a node if its transformed variant is equal to its original variant.
// A StubRelation can contain in its output ReferenceAttributes which do not use NameIds for equality, but only names and
// two ReferenceAttributes with the same name are equal and the transformation will not be applied.
return stubbed.transformUp(UnaryPlan.class, up -> {
@@ -72,22 +92,49 @@ public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) {
}
/**
- * TODO: perform better planning
- * Keep the join in place or replace it with a projection in case no grouping is necessary.
+ * @param stubReplacedSubPlan - the completed / "destubbed" right-hand side of the bottommost InlineJoin in the plan. For example:
+ * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
+ * \_Limit[1000[INTEGER],false]
+ * \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
+ * @param originalSubPlan - the original (unchanged) right-hand side of the bottommost InlineJoin in the plan. For example:
+ * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
+ * \_StubRelation[[x{r}#99]]]
*/
- public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) {
- if (target.config().matchFields().isEmpty()) {
- List schema = data.output();
- Block[] blocks = data.supplier().get();
- List aliases = new ArrayList<>(schema.size());
- for (int i = 0; i < schema.size(); i++) {
- Attribute attr = schema.get(i);
- aliases.add(new Alias(attr.source(), attr.name(), Literal.of(attr, BlockUtils.toJavaObject(blocks[i], 0)), attr.id()));
+ public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan originalSubPlan) {}
+
+ /**
+ * Finds the "first" (closest to the source command or bottom up in the tree) {@link InlineJoin}, replaces the {@link StubRelation}
+ * of the right-hand side with left-hand side's source and returns a tuple.
+ *
+ * Original optimized plan:
+ * Limit[1000[INTEGER],true]
+ * \_InlineJoin[LEFT,[],[],[]]
+ * |_Limit[1000[INTEGER],false]
+ * | \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
+ * \_Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
+ * \_StubRelation[[x{r}#99]]
+ *
+ * Takes the right hand side:
+ * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
+ * \_StubRelation[[x{r}#99]]]
+ *
+ * And uses the left-hand side's source as its source:
+ * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
+ * \_Limit[1000[INTEGER],false]
+ * \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
+ */
+ public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
+ Holder subPlan = new Holder<>();
+ // Collect the first inlinejoin (bottom up in the tree)
+ optimizedPlan.forEachUp(InlineJoin.class, ij -> {
+ // extract the right side of the plan and replace its source
+ if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) {
+ var p = InlineJoin.replaceStub(ij.left(), ij.right());
+ p.setOptimized();
+ subPlan.set(new LogicalPlanTuple(p, ij.right()));
}
- return new Eval(target.source(), target.left(), aliases);
- } else {
- return target.replaceRight(data);
- }
+ });
+ return subPlan.get();
}
public InlineJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java
index bb38b3ad813c5..efdf2a27d63a2 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java
@@ -11,12 +11,44 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockUtils;
+import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceRowAsLocalRelation;
+import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
+import org.elasticsearch.xpack.esql.session.EsqlSession;
import java.io.IOException;
/**
* A {@link LocalSupplier} that allways creates a new copy of the {@link Block}s initially provided at creation time.
+ * This is created specifically for {@link InlineStats} usage in {@link EsqlSession} for queries that use ROW command.
+ *
+ * The ROW which gets replaced by {@link ReplaceRowAsLocalRelation} with a {@link LocalRelation} will have its blocks
+ * used (and released) at least twice:
+ * - the LocalRelation from the left-hand side is used as a source for the right-hand side
+ * - the same LocalRelation is then used to continue the execution of the query on the left-hand side
+ *
+ * To prevent the double release, this {@link ImmediateLocalSupplier} variant always creates a deep copy of the blocks
+ * received in the constructor initially.
+ *
+ * Example with the flow and the blocks reuse for a query like "row x = 1 | inlinestats y = max(x)"
+ * Step 1:
+ * Limit[1000[INTEGER],true]
+ * \_InlineJoin[LEFT,[],[],[]]
+ * |_Limit[1000[INTEGER],false]
+ * | \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
+ * \_Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
+ * \_StubRelation[[x{r}#99]]
+ *
+ * Step 2:
+ * Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
+ * \_Limit[1000[INTEGER],false]
+ * \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
+ *
+ * Step 3:
+ * Limit[1000[INTEGER],true]
+ * \_Eval[[1[INTEGER] AS y#102]]
+ * \_Limit[1000[INTEGER],false]
+ * \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
*/
public class CopyingLocalSupplier extends ImmediateLocalSupplier {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
index d8118b0e0f4e1..13e8d79bd994f 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
@@ -89,7 +89,6 @@
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
-import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
@@ -114,6 +113,7 @@
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;
+import static org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin.firstSubPlan;
public class EsqlSession {
@@ -217,7 +217,7 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener listener
) {
- if (explainMode) {
+ if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
String physicalPlanString = physicalPlan.toString();
List fields = List.of(
@@ -240,8 +240,6 @@ public void executeOptimizedPlan(
}
}
- private record LogicalPlanTuple(LogicalPlan nonStubbedSubPlan, LogicalPlan originalSubPlan) {}
-
private void executeSubPlans(
LogicalPlan optimizedPlan,
PlanRunner runner,
@@ -262,48 +260,40 @@ private void executeSubPlans(
}
}
- private LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
- Holder subPlan = new Holder<>();
- // Collect the first inlinejoin (bottom up in the tree)
- optimizedPlan.forEachUp(InlineJoin.class, ij -> {
- // extract the right side of the plan and replace its source
- if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) {
- var p = InlineJoin.replaceStub(ij.left(), ij.right());
- p.setOptimized();
- subPlan.set(new LogicalPlanTuple(p, ij.right()));
- }
- });
- return subPlan.get();
- }
-
private void executeSubPlan(
DriverCompletionInfo.Accumulator completionInfoAccumulator,
LogicalPlan optimizedPlan,
- LogicalPlanTuple subPlans,
+ InlineJoin.LogicalPlanTuple subPlans,
EsqlExecutionInfo executionInfo,
PlanRunner runner,
EsqlQueryRequest request,
ActionListener listener
) {
+ LOGGER.debug("Executing subplan:\n{}", subPlans.stubReplacedSubPlan());
// Create a physical plan out of the logical sub-plan
- var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.nonStubbedSubPlan, request);
+ var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request);
runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> {
try {
// Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
completionInfoAccumulator.accumulate(result.completionInfo());
- LocalRelation resultWrapper = resultToPlan(subPlans.nonStubbedSubPlan, result);
+ LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan(), result);
// replace the original logical plan with the backing result
LogicalPlan newLogicalPlan = optimizedPlan.transformUp(
InlineJoin.class,
- ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij
+ // use object equality since the right-hand side shouldn't have changed in the optimizedPlan at this point
+ // and equals would have ignored name IDs anyway
+ ij -> ij.right() == subPlans.originalSubPlan() ? InlineJoin.inlineData(ij, resultWrapper) : ij
);
+ // TODO: INLINESTATS can we do better here and further optimize the plan AFTER one of the subplans executed?
newLogicalPlan.setOptimized();
+ LOGGER.debug("Plan after previous subplan execution:\n{}", newLogicalPlan);
// look for the next inlinejoin plan
var newSubPlan = firstSubPlan(newLogicalPlan);
if (newSubPlan == null) {// run the final "main" plan
+ LOGGER.debug("Executing final plan:\n{}", newLogicalPlan);
var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request);
runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
completionInfoAccumulator.accumulate(finalResult.completionInfo());
@@ -320,7 +310,7 @@ private void executeSubPlan(
}));
}
- private LocalRelation resultToPlan(LogicalPlan plan, Result result) {
+ private static LocalRelation resultToPlan(LogicalPlan plan, Result result) {
List pages = result.pages();
List schema = result.schema();
// if (pages.size() > 1) {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java
index 1154761d16ec2..294d8110fd7b2 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java
@@ -26,9 +26,7 @@ public static Block[] fromPages(List schema, List pages) {
// Limit ourselves to 1mb of results similar to LOOKUP for now.
long bytesUsed = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum();
if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) {
- throw new IllegalArgumentException(
- "INLINESTATS sub-plan execution results too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb"
- );
+ throw new IllegalArgumentException("sub-plan execution results too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb");
}
int positionCount = pages.stream().mapToInt(Page::getPositionCount).sum();
Block.Builder[] builders = new Block.Builder[schema.size()];
From 41d21198889601d5d8b8917e1d8a9425ad3ffb90 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Mon, 30 Jun 2025 17:59:17 +0300
Subject: [PATCH 16/20] More stuff
---
.../src/main/resources/inlinestats.csv-spec | 14 +++-----------
.../esql/optimizer/rules/logical/PruneColumns.java | 8 ++++----
2 files changed, 7 insertions(+), 15 deletions(-)
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
index 6155eea6a8166..852e3dccf867f 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
@@ -256,7 +256,7 @@ pushDownSort_To_LeftSideOnly
required_capability: inlinestats_v8
from employees
-| sort emp_no desc
+| sort emp_no
| inlinestats avg = avg(salary) by languages
| limit 5
| keep emp_no, avg, languages, gender
@@ -270,13 +270,6 @@ from employees
10005 |63528.0 |1 |M
;
-from employees
-| sort emp_no desc
-| inlinestats avg = avg(salary) by languages
-| limit 5
-| keep emp_no, avg, languages, gender
-;
-
byMultivaluedSimple
required_capability: inlinestats_v8
@@ -1196,11 +1189,10 @@ FROM employees
| INLINESTATS salary = min(salary) BY gender
| WHERE salary > 31000
| INLINESTATS salary = max(salary) BY gender
-| WHERE salary > 31500
+| WHERE salary < 31500
| KEEP salary, gender
| SORT salary DESC, gender
;
-warning:No limit defined, adding default limit of [1000]
salary:integer |gender:keyword
31120 |null
@@ -1225,7 +1217,7 @@ FROM employees
| LIMIT 3
;
-emp_no:integer |salary:integer |languages:integer|avg(salary):double|gender:keyword
+emp_no:integer |salary:integer |languages:integer|avg(salary):double|gender:long
10001 |57305 |2 |48248.55 |2
10002 |56371 |5 |48248.55 |2
10003 |61805 |4 |48248.55 |2
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
index a2f027f72e23d..b0661b118e185 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java
@@ -29,7 +29,6 @@
import org.elasticsearch.xpack.esql.rule.Rule;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
/**
@@ -105,15 +104,16 @@ public LogicalPlan apply(LogicalPlan plan) {
p = aggregate.with(aggregate.groupings(), remaining);
}
}
- } else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add tests for this IJ removal
- var remaining = removeUnused(ij.right().output(), used, Collections.emptyList());
+ } else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal
+ var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput);
if (remaining != null) {
if (remaining.isEmpty()) {
// remove the InlineJoin altogether
p = ij.left();
recheck = true;
}
- // TODO: InlineStats - prune only the unused columns from it?
+ // TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs
+ // if they will not be used anyway
}
} else if (p instanceof Eval eval) {
var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput);
From bbe1a62081f348e740b537e4bf6a3bddd95e7f41 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Mon, 30 Jun 2025 19:49:39 +0300
Subject: [PATCH 17/20] Tests fixes
---
.../esql/plan/logical/AbstractLogicalPlanSerializationTests.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AbstractLogicalPlanSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AbstractLogicalPlanSerializationTests.java
index 415edea4cd40c..4960bae3d62f4 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AbstractLogicalPlanSerializationTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AbstractLogicalPlanSerializationTests.java
@@ -30,6 +30,7 @@ public static LogicalPlan randomChild(int depth) {
protected final NamedWriteableRegistry getNamedWriteableRegistry() {
List entries = new ArrayList<>();
entries.addAll(PlanWritables.logical());
+ entries.addAll(PlanWritables.others());
entries.addAll(ExpressionWritables.aggregates());
entries.addAll(ExpressionWritables.allExpressions());
return new NamedWriteableRegistry(entries);
From 0ba7a4abf618a3a689f83c6a8c561fe305348f73 Mon Sep 17 00:00:00 2001
From: Andrei Stefan
Date: Mon, 30 Jun 2025 20:30:50 +0300
Subject: [PATCH 18/20] Further fixes and addressing reviews
---
.../xpack/esql/EsqlTestUtils.java | 3 +-
.../rules/logical/PropagateEmptyRelation.java | 3 +-
.../optimizer/rules/logical/PruneColumns.java | 2 +
.../rules/logical/PruneEmptyPlans.java | 4 +-
.../logical/SkipQueryOnEmptyMappings.java | 4 +-
.../xpack/esql/plan/PlanWritables.java | 3 +-
.../esql/plan/logical/join/InlineJoin.java | 4 +-
.../logical/local/EmptyLocalSupplier.java | 64 +++++++++++++++++++
.../plan/logical/local/LocalSupplier.java | 38 +----------
.../LocalLogicalPlanOptimizerTests.java | 8 +--
.../optimizer/LogicalPlanOptimizerTests.java | 14 ++--
.../optimizer/PhysicalPlanOptimizerTests.java | 4 +-
.../logical/local/LocalSupplierTests.java | 31 +++++++--
.../LocalSourceExecSerializationTests.java | 4 +-
14 files changed, 122 insertions(+), 64 deletions(-)
create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/EmptyLocalSupplier.java
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
index 7ec9ee6344551..c304e9d9b742d 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
@@ -83,6 +83,7 @@
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
@@ -446,7 +447,7 @@ public static Literal L(Object value) {
}
public static LogicalPlan emptySource() {
- return new LocalRelation(Source.EMPTY, emptyList(), LocalSupplier.EMPTY);
+ return new LocalRelation(Source.EMPTY, emptyList(), EmptyLocalSupplier.EMPTY);
}
public static LogicalPlan localSource(BlockFactory blockFactory, List fields, List