Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0741bbb
Adopt a "LogicalPlan" approach to running multiple sub-queries,
astefan May 26, 2025
cd31f72
Update docs/changelog/128917.yaml
astefan Jun 4, 2025
811f5be
checkstyle
astefan Jun 4, 2025
567d600
More tests un-Ignored and updates after update from main
astefan Jun 4, 2025
f834ce7
Fix capability
astefan Jun 4, 2025
d3f2838
TEMP
astefan Jun 6, 2025
1e9c355
Adapt PruneColumns to have "visibility" into the right-hand side of
astefan Jun 11, 2025
fa22575
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 11, 2025
377328c
More fixes and tests
astefan Jun 11, 2025
1c745c7
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 12, 2025
cffa1d7
More deterministic test
astefan Jun 12, 2025
0882cbb
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 12, 2025
e3b5c92
Update docs/changelog/128917.yaml
astefan Jun 12, 2025
bc6f8cc
Test inlinestats after stats with bucket and top
astefan Jun 13, 2025
910c888
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 13, 2025
2d1a1ec
Merge branch 'support_multi_inlinestats_logicalPlan_approach' of http…
astefan Jun 13, 2025
bb0450c
Have Row - LocalRelation transformation use a copying blocks approach
astefan Jun 17, 2025
2f2cc30
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 20, 2025
a9e2d99
Attempt a serialization change to have to Suppliers for LocalRelation
astefan Jun 23, 2025
4661330
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 24, 2025
0500ab6
Adapt to EXPLAIN command
astefan Jun 25, 2025
4ed8c9b
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 25, 2025
17d5933
Add more tests and fix incorrect pruning of columns of the inlinestats'
astefan Jun 27, 2025
a88b8cf
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 27, 2025
41d2119
More stuff
astefan Jun 30, 2025
0cbfef9
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 30, 2025
bbe1a62
Tests fixes
astefan Jun 30, 2025
f136b79
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jun 30, 2025
0ba7a4a
Further fixes and addressing reviews
astefan Jun 30, 2025
a2527ea
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jul 2, 2025
c04806b
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jul 3, 2025
5bb4e4f
Rework the LocalSupplier serialization
astefan Jul 3, 2025
8d05f3f
Address reviews
astefan Jul 7, 2025
f7268ad
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jul 7, 2025
7ffaf8c
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
public static final TransportVersion ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS = def(9_113_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting case, not sure if we already have this in tests:

FROM ...
| SORT ...
| INLINESTATS ...
| LIMIT 5

Important about this:

  1. this should work - the sort is irrelevant for the stats and should be optimized away; and the limit should be able to be pushed down past the inline join in the second phase of the query.
    Actually, properly pushed down - not duplicated like it is currently! Because INLINESTATS importantly keeps the row count invariant, we can do things that we can't for the more general left join or lookup join. We should follow up on this to avoid inheriting the same limitations that we have for LOOKUP JOIN.
  2. the limit must not be pushed down in the first phase lest it messes up the stats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optimized logicalplan looks like this for that query you suggested (from employees | sort emp_no desc | inlinestats avg = avg(salary) by languages | limit 5 | keep emp_no, avg, languages, gender):

EsqlProject[[emp_no{f}#411, avg{r}#400, languages{f}#407, gender{f}#406]]
 \_Limit[5[INTEGER],true]
   \_InlineJoin[LEFT,[languages{f}#407],[languages{f}#407],[languages{r}#407]]
     |_TopN[[Order[emp_no{f}#411,DESC,FIRST]],5[INTEGER]]
     | \_EsRelation[employees][emp_no{f}#411, gender{f}#406, languages{f}#407, lan..]
     \_Project[[avg{r}#400, languages{f}#407]]
       \_Eval[[$$SUM$avg$0{r$}#413 / $$COUNT$avg$1{r$}#414 AS avg#400]]
         \_Aggregate[[languages{f}#407],[SUM(salary{f}#412,true[BOOLEAN]) AS $$SUM$avg$0#413, COUNT(salary{f}#412,true[BOOLEAN]) AS $$COUNT$avg$1#414, languages{f}#407]]
           \_StubRelation[[emp_no{f}#411, gender{f}#406, languages{f}#407, languages.byte{f}#408, languages.long{f}#410, languages.short{f}#409, salary{f}#412]]

which I think it looks right:

  • the right hand side is untouched
  • the limit is pushed down on the left hand side and made up as TopN
  • final limit on top of the inlinestats

I will add a TODO in the csv-spec for this exact query so that we also have a unit test for this.

Copy link
Contributor

@alex-spies alex-spies Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, this doesn't look right.

The output of INLINESTATS must not depend on subsequent commands. The limit being pushed down and combined into a topn is wrong.

I just checked out your code and ran your example query:

    emp_no     |      avg      |   languages   |    gender     
---------------+---------------+---------------+---------------
10100          |52379.0        |4              |F              
10099          |73578.0        |2              |F              
10098          |52379.0        |4              |F              
10097          |71165.0        |3              |M              
10096          |52379.0        |4              |M  

Now, if I change this to LIMIT 1 instead of LIMIT 5, I get

    emp_no     |      avg      |   languages   |    gender     
---------------+---------------+---------------+---------------
10100          |68431.0        |4              |F    

The average salary of female employees with 4 languages must not change just because I only look at 1 such employee in subsequent commands!

@astefan , I believe the limit duplication from LOOKUP JOINs is messing things up here. Respectively, how stub relations work. The thing is, the stub relation reaaaally stands for a specific plan node upstream from the join. Optimizations that push stuff into the left hand side of an inline join are generally correct but they accidentally also alter the right hand side due to the stub mechanism. My gut feeling is that the correct modelling would have the stub relation explicitly point at a specific subplan upstream, rather then implicitly standing for the left hand side of the join.

However, if we want to keep the current modeling of joins/inlinestats, I think we need to

  • Go over all the optimizer rules that apply to Join and
  • check if they need to be amended for InlineJoin, which unfortunately is a subclass that has super duper different semantics as long as we use stub relations the way we do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw: We also push down filters into the left hand side of joins but specifically exclude inline joins because that would change the row set over which we compute the stats.

Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ emp_no:integer | avg_worked_seconds:long | max_avg_worked_seconds:long | languag
;

two
required_capability: join_planning_v1
required_capability: inlinestats_v8

FROM employees
Expand All @@ -210,17 +209,24 @@ FROM employees
| WHERE avg_worked_seconds > avg_avg_worked_seconds
| INLINESTATS max_languages = MAX(languages) BY gender
| SORT emp_no ASC
| LIMIT 3;
| LIMIT 10;

emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:integer|max_languages:integer|gender:keyword
10002 |328922887 |3.133013149047619E8 |5 |5 |F
10006 |372957040 |2.978159518235294E8 |3 |5 |F
10007 |393084805 |2.863684210555556E8 |4 |5 |F
10010 |315236372 |2.863684210555556E8 |4 |5 |null
10012 |365510850 |3.133013149047619E8 |5 |5 |null
10015 |390266432 |3.133013149047619E8 |5 |5 |null
10018 |309604079 |3.0318626831578946E8 |2 |5 |null
10019 |342855721 |2.94833632E8 |1 |5 |null
10020 |373309605 |3.181719481E8 |null |5 |M
10023 |330870342 |3.181719481E8 |null |5 |F
;

three
required_capability: inlinestats_v8
// fails with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)
// used to fail with AssertionError at org.elasticsearch.xpack.esql.plan.logical.Limit.writeTo(Limit.java:70)

FROM employees
| KEEP emp_no, languages, avg_worked_seconds, gender
Expand All @@ -245,6 +251,25 @@ emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:
10023 |330870342 |3.181719481E8 |null |5 |3 |5 |F
;

// TODO: INLINESTATS unit test needed for this one
pushDownSort_To_LeftSideOnly
required_capability: inlinestats_v8

from employees
| sort emp_no
| inlinestats avg = avg(salary) by languages
| limit 5
| keep emp_no, avg, languages, gender
;

emp_no:integer| avg:double |languages:integer|gender:keyword
10001 |57305.0 |2 |M
10002 |46272.5 |5 |F
10003 |61805.0 |4 |M
10004 |46272.5 |5 |M
10005 |63528.0 |1 |M
;

byMultivaluedSimple
required_capability: inlinestats_v8

Expand Down Expand Up @@ -313,7 +338,6 @@ FROM airports
;

mvMinMvExpand
required_capability: join_planning_v1
required_capability: inlinestats_v8

FROM airports
Expand Down Expand Up @@ -378,7 +402,6 @@ abbrev:keyword | country:keyword | count:long
;

afterLookup
required_capability: join_planning_v1
required_capability: inlinestats_v8
required_capability: join_lookup_v12

Expand All @@ -403,7 +426,6 @@ ZNZ |4 |German
;

afterEnrich
required_capability: join_planning_v1
required_capability: inlinestats_v8
required_capability: enrich_load

Expand Down Expand Up @@ -466,7 +488,6 @@ emp_no:integer | languages:integer | max_salary:integer
;

beforeEnrich
required_capability: join_planning_v1
required_capability: inlinestats_v8
required_capability: enrich_load

Expand All @@ -486,7 +507,6 @@ ACA |Acapulco de Juárez|385 |major |Acapulco de
;

beforeAndAfterEnrich
required_capability: join_planning_v1
required_capability: inlinestats_v8
required_capability: enrich_load

Expand All @@ -510,7 +530,6 @@ ALL |Albenga |499 |mid |1
;

shadowing
required_capability: join_planning_v1
required_capability: inlinestats_v8

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
Expand All @@ -522,7 +541,6 @@ left | right | right | 172.21.0.5
;

shadowingMulti
required_capability: join_planning_v1
required_capability: inlinestats_v8

ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right"
Expand All @@ -534,7 +552,6 @@ left | middle | right | left | left
;

shadowingSelf
required_capability: join_planning_v1
required_capability: inlinestats_v8

ROW city = "Raleigh"
Expand All @@ -546,7 +563,6 @@ city:long
;

shadowingSelfBySelf
required_capability: join_planning_v1
required_capability: inlinestats_v8

ROW city = "Raleigh"
Expand All @@ -559,7 +575,6 @@ Raleigh
;

shadowingInternal
required_capability: join_planning_v1
required_capability: inlinestats_v8

ROW city = "Zürich"
Expand All @@ -585,9 +600,8 @@ row x = 1
2 |3 |3 |1 |1
;

ignoreUnusedEvaledValue-Ignore
ignoreUnusedEvaledValue_AndInlineStats
required_capability: inlinestats_v8
// fails with expected [keys] to be non-empty

ROW x = 1
| INLINESTATS max(x)
Expand All @@ -599,9 +613,21 @@ x:integer
1
;

ignoreUnusedEvaledValue2-Ignore
ignoreUnusedEvaledValue_AndInlineStats2
required_capability: inlinestats_v8

ROW x = 1, z = 2
| INLINESTATS max(x)
| EVAL a = x + 1, b = z + 2
| KEEP x, z
;

x:integer | z:integer
1 | 2
;

ignoreUnusedEvaledValue_AndInlineStats3
required_capability: inlinestats_v8
// fails with expected [keys] to be non-empty

from employees
| inlinestats max(salary)
Expand All @@ -615,6 +641,36 @@ from employees
74999
;

ignoreUnusedEvaledValue_AndInlineStats4
required_capability: inlinestats_v8

from employees
| inlinestats max(salary), m = min(salary) by gender
| eval y = concat(gender, "")
| keep emp_no
| sort emp_no desc
| limit 1
;

emp_no:integer
10100
;

ignoreUnusedEvaledValue_AndInlineStats5
required_capability: inlinestats_v8

from employees
| inlinestats max(salary), m = min(salary) by gender
| eval y = m / 2
| keep emp_no
| sort emp_no desc
| limit 1
;

emp_no:integer
10100
;

byConstant
required_capability: inlinestats_v8

Expand Down Expand Up @@ -691,7 +747,6 @@ abbrev:keyword | scalerank:integer | location:geo_point
;

byTwoCalculatedSecondOverwrites
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
required_capability: inlinestats_v8

Expand All @@ -713,7 +768,6 @@ abbrev:keyword | scalerank:integer | location:geo_point
;

byTwoCalculatedSecondOverwritesReferencingFirst
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
required_capability: inlinestats_v8

Expand All @@ -737,7 +791,6 @@ abbrev:keyword | scalerank:integer | location:geo_point


groupShadowsAgg
required_capability: join_planning_v1
required_capability: stats_alias_collision_warnings
required_capability: inlinestats_v8

Expand Down Expand Up @@ -1128,6 +1181,48 @@ salary:integer |gender:keyword
25976 |F
;

doubleShadowing_WithIntertwinedFilters
required_capability: inlinestats_v8

FROM employees
| WHERE salary > 30000
| INLINESTATS salary = min(salary) BY gender
| WHERE salary > 31000
| INLINESTATS salary = max(salary) BY gender
| WHERE salary < 31500
| KEEP salary, gender
| SORT salary DESC, gender
;

salary:integer |gender:keyword
31120 |null
31120 |null
31120 |null
31120 |null
31120 |null
31120 |null
31120 |null
31120 |null
31120 |null
;

shadowingAggregateByNextGrouping
required_capability: inlinestats_v8

FROM employees
| KEEP gender, languages, emp_no, salary
| INLINESTATS gender = count_distinct(gender) BY languages
| INLINESTATS avg(salary) BY gender
| SORT emp_no
| LIMIT 3
;

emp_no:integer |salary:integer |languages:integer|avg(salary):double|gender:long
10001 |57305 |2 |48248.55 |2
10002 |56371 |5 |48248.55 |2
10003 |61805 |4 |48248.55 |2
;

doubleShadowingWithEval
required_capability: inlinestats_v8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public LogicalPlan apply(LogicalPlan plan) {
// track used references
var used = plan.outputSet().asBuilder();
// track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the
// inline join won't have it's columns pruned due to the lack of "visibility" into the right hand side output/Attributes
// inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes
var inlineJoinRightOutput = new ArrayList<Attribute>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused how this works in general. Expressions in the inline join's left side generally cannot depend on the stats output, resp. the inline join's right hand side. The dependency is converse - the IJ's right hand side depends on expressions in the left, because it will normally contain a stub.

Also, LOOKUP JOIN doesn't require handling the right hand side output attributes especially, which is suspicious.

Maybe I'm just not seeing it and need an example, though :)

The complication that I would expect: expressions on the left may get pruned because they don't appear in used. Without having tried, something like:

EVAL x = 2*y | INLINE STATS x = avg(x)

The first x is only required in the stats in the right hand side of the inline join, so we may optimize this away on accident if PruneColumns first descends into the left hand side and thus didn't yet add the x to the used attributes.

I didn't check if this actually happens, or if we maybe descend first into the right after all. Still, probably worth a test.

Holder<Boolean> forkPresent = new Holder<>(false);

Expand Down Expand Up @@ -104,6 +104,17 @@ public LogicalPlan apply(LogicalPlan plan) {
p = aggregate.with(aggregate.groupings(), remaining);
}
}
} else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal
var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't all of the attributes in ij.right().output() protected because they're contained in inlineJoinRightOutput?

if (remaining != null) {
if (remaining.isEmpty()) {
// remove the InlineJoin altogether
p = ij.left();
recheck = true;
}
// TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs
// if they will not be used anyway
Comment on lines +117 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

}
} else if (p instanceof Eval eval) {
var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some aggs leave downstream EVALs, like AVG. Does this prevent them from being pruned, even if they're not needed downstream at all?

E.g. in INLINESTATS x = avg(y), a = count_distinct(field) BY z | EVAL x = z I'd expect that making the inline join's right hand side "unpruneable" prevents this rule from correctly pruning unused columns.

// no fields, no eval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.ImmediateLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
Expand Down Expand Up @@ -65,6 +67,7 @@ public static List<NamedWriteableRegistry.Entry> getNamedWriteables() {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(logical());
entries.addAll(physical());
entries.addAll(others());
return entries;
}

Expand Down Expand Up @@ -124,4 +127,8 @@ public static List<NamedWriteableRegistry.Entry> physical() {
TopNExec.ENTRY
);
}

public static List<NamedWriteableRegistry.Entry> others() {
return List.of(CopyingLocalSupplier.ENTRY, ImmediateLocalSupplier.ENTRY);
}
}
Loading
Loading