Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,45 @@ total_duration:long | day:date | days:date
17016205 |2023-10-23T13:00:00.000Z|[2023-10-23T12:00:00.000Z, 2023-10-23T13:00:00.000Z]
;

evalBeforeDoubleInlinestats1
required_capability: inlinestats_v9

FROM employees
| EVAL salaryK = salary/1000
| INLINESTATS count = COUNT(*) BY salaryK
| INLINESTATS min = MIN(MV_COUNT(languages)) BY salaryK
| SORT emp_no
| KEEP emp_no, still_hired, count
| LIMIT 5
;

emp_no:integer |still_hired:boolean|count:long
10001 |true |1
10002 |true |3
10003 |false |2
10004 |true |2
10005 |true |1
;

evalBeforeDoubleInlinestats2
required_capability: inlinestats_v9

FROM employees
| EVAL jobs = MV_COUNT(job_positions)
| INLINESTATS count = COUNT(*) BY jobs
| INLINESTATS min = MIN(MV_COUNT(languages)) BY jobs
| SORT emp_no
| KEEP emp_no, jobs, count, min
| LIMIT 5
;

emp_no:integer |jobs:integer|count:long|min:integer
10001 |2 |18 |1
10002 |1 |27 |1
10003 |null |11 |1
10004 |4 |26 |1
10005 |null |11 |1
;

evalBeforeInlinestatsAndKeepAfter1
required_capability: inlinestats_v9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -37,7 +36,7 @@ public PropagateEmptyRelation() {
@Override
protected LogicalPlan rule(UnaryPlan plan, LogicalOptimizerContext ctx) {
LogicalPlan p = plan;
if (plan.child() instanceof LocalRelation local && local.supplier() == EmptyLocalSupplier.EMPTY) {
if (plan.child() instanceof LocalRelation local && local.hasEmptySupplier()) {
// only care about non-grouped aggs might return something (count)
if (plan instanceof Aggregate agg && agg.groupings().isEmpty()) {
List<Block> emptyBlocks = aggsFromEmpty(ctx.foldCtx(), agg.aggregates());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
Expand Down Expand Up @@ -79,7 +78,7 @@ private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder u
recheck.set(false);
p = switch (p) {
case Aggregate agg -> pruneColumnsInAggregate(agg, used, inlineJoin);
case InlineJoin inj -> pruneColumnsInInlineJoin(inj, used, recheck);
case InlineJoin inj -> pruneColumnsInInlineJoinRight(inj, used, recheck);
case Eval eval -> pruneColumnsInEval(eval, used, recheck);
case Project project -> inlineJoin ? pruneColumnsInProject(project, used) : p;
case EsRelation esr -> pruneColumnsInEsRelation(esr, used);
Expand Down Expand Up @@ -148,12 +147,12 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
return p;
}

private static LogicalPlan pruneColumnsInInlineJoin(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
private static LogicalPlan pruneColumnsInInlineJoinRight(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) {
LogicalPlan p = ij;

used.addAll(ij.references());
var right = pruneColumns(ij.right(), used, true);
if (right.output().isEmpty()) {
if (right.output().isEmpty() || isLocalEmptyRelation(right)) {
p = ij.left();
recheck.set(true);
} else if (right != ij.right()) {
Expand Down Expand Up @@ -181,18 +180,19 @@ private static LogicalPlan pruneColumnsInEval(Eval eval, AttributeSet.Builder us
return p;
}

// Note: only run when the Project is a descendent of an InlineJoin.
private static LogicalPlan pruneColumnsInProject(Project project, AttributeSet.Builder used) {
LogicalPlan p = project;

// A project atop a stub relation will only output attributes which are already part of the INLINEJOIN left-hand side output.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels brittle in the sense that there may be other rules that might have a different behavior and mess up with the expected pair of steps/actions (1. transform the aggregate in a Project then 2. look for the exact combination Project -> StubRelation) this pruning expects here.

I've had such a solution in my draft PR here but eventually I changed it after reconsidering what is happening in the life of an InlineJoin (basically I considered the flow of the EsqlSession as well, because there are things that are InlineJoin specific and it expects a StubRelation to exist OR the InlineJoin to be pruned completely) and I ended up with this solution instead: https://github.com/elastic/elasticsearch/pull/132934/files#diff-4c0de47236c79e13849f81a2b9765240c822a6af6e90902ba4912f1548c65082R90-R107

Maybe explore a bit more the pruning idea based on what I mentioned above on the idea that the Project + StubRelation you are expecting there might be coming from other rules and mess up the pruning.

Copy link
Contributor Author

@bpintea bpintea Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Generally:

the Project + StubRelation you are expecting there might be coming from other rules

this simplification only happens when the plan has the pattern: InlineJoin - Project - StubRelation. It'll only be problematic if we'll ever have StubRelations on the left hand-side of the InlineJoin. But I guess the verification can be done earlier in the flow, on Aggregate/Project substitution and make the substitution less pattern-dependent.
I'll give it a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@astefan, I've taken another approach, simplifying the analysis of the Aggregate iif this is part of an InlineJoin, which allows simplifying the Project analysis downstream too. I think the logic should hold irrespective of InlineJoin being currently only used for INLINESTATS or later modeling other commands too.

if (project.child() instanceof StubRelation) {
// Use an empty relation as a marker for a subsequent pass over the InlineJoin.
return emptyLocalRelation(project);
}

var remaining = pruneUnusedAndAddReferences(project.projections(), used);
if (remaining != null) {
p = remaining.isEmpty() || remaining.stream().allMatch(FieldAttribute.class::isInstance)
? emptyLocalRelation(project)
: new Project(project.source(), project.child(), remaining);
} else if (project.output().stream().allMatch(FieldAttribute.class::isInstance)) {
// Use empty relation as a marker for a subsequent pass, in case the project is only outputting field attributes (which are
// already part of the INLINEJOIN left-hand side output).
p = emptyLocalRelation(project);
p = remaining.isEmpty() ? emptyLocalRelation(project) : new Project(project.source(), project.child(), remaining);
}

return p;
Expand All @@ -216,7 +216,11 @@ private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet

private static LogicalPlan emptyLocalRelation(LogicalPlan plan) {
// create an empty local relation with no attributes
return new LocalRelation(plan.source(), List.of(), EmptyLocalSupplier.EMPTY);
return new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY);
}

private static boolean isLocalEmptyRelation(LogicalPlan plan) {
return plan instanceof LocalRelation local && local.hasEmptySupplier();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
out.writeNamedWriteable(supplier);
} else {
if (supplier == EmptyLocalSupplier.EMPTY) {
if (hasEmptySupplier()) {
out.writeVInt(0);
} else {// here we can only have an ImmediateLocalSupplier as this was the only implementation apart from EMPTY
((ImmediateLocalSupplier) supplier).writeTo(out);
Expand All @@ -77,6 +77,10 @@ public LocalSupplier supplier() {
return supplier;
}

public boolean hasEmptySupplier() {
return supplier == EmptyLocalSupplier.EMPTY;
}

@Override
public boolean expressionsResolved() {
return true;
Expand Down
Loading