-
Couldn't load subscription status.
- Fork 25.6k
ESQL: INLINESTATS implementation with multiple LogicalPlan updates #128917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: INLINESTATS implementation with multiple LogicalPlan updates #128917
Conversation
coordinated from the EsqlSession.
|
Hi @astefan, I've created a changelog YAML for you. |
the inlinestats JOIN
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
| | INLINESTATS min_scalerank=MIN(scalerank) BY type | ||
| | MV_EXPAND type | ||
| | WHERE scalerank == MV_MIN(scalerank); | ||
| | EVAL mvMin_scalerank = MV_MIN(scalerank) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know the original intention with this test, so I've updated it to make some kind of sense and to also keep what I thought to be its original purpose.
| // as first columns in the output followed by whatever the right hand side of join adds in this order: aggregates first, | ||
| // followed by groupings (this order should be preserved inside the rightFields() output) | ||
| output = mergeOutputAttributes(right, leftOutputWithoutMatchFields); | ||
| List<Attribute> leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is taken from @alex-spies great suggestion here
| throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]"); | ||
| } | ||
|
|
||
| if (join instanceof InlineJoin) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This temporary section has been removed; it was introduced with the first PR about reviving inlinestats, and it was also questioned about its usefulness. After changing the approach to use LogicalPlan rebuild (instead of PhysicalPlans) this part was not needed anymore.
…support_multi_inlinestats_logicalPlan_approach
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second round - focused on the execution in EsqlSession this time.
Looks like we're going back in the direction of the initial approach to inline stats, where each phase had a full optimizer run. That's the right approach IMO.
I want to have another look next week to better understand the changes to the optimizer rules. But please go ahead and merge whenever you're happy (I see Bogdan already 👍'd the PR); if I find something I find important, I'll leave a comment after the merge.
| } | ||
|
|
||
| private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {} | ||
| private record LogicalPlanTuple(LogicalPlan nonStubbedSubPlan, LogicalPlan originalSubPlan) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a comment explaining the purpose wouldn't hurt; it's not clear without reading more what the non stubbed plan is, and the name LogicalPlanTuple is also rather generic.
| }); | ||
| LogicalPlan newLogicalPlan = optimizedPlan.transformUp( | ||
| InlineJoin.class, | ||
| ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I think it's important we use object equality here - regular equality will not suffice because it ignores e.g. name ids.
Maybe worth a comment?
| InlineJoin.class, | ||
| ij -> ij.right() == subPlans.originalSubPlan ? InlineJoin.inlineData(ij, resultWrapper) : ij | ||
| ); | ||
| newLogicalPlan.setOptimized(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is surprising. Shouldn't we re-optimize the plan now that we were able to replace the stub with an actual result?
After the stub was replaced, we can actually do more stuff, like push down limits (which before that would be wrong as it would have affected the stats).
If I understand correctly, this is something we might want to improve later, right? If so, let's leave a comment; maybe a TODO to make the intention clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, a TODO is right.
I had my doubts with this thing. I feel like there are things here that were left hanging (things do still seem they can be optimized further), but at this stage of the inlinestats progress, I think it's worth ignoring it. But TODO is needed, because it's something we need to think about a little.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I thought this was sufficient. The approach basically gradually executes righthand sides of (inline) joins, always resulting in a LocalRelation. Ending up with a join with whatever was before on the left and this local relation on the right. Not sure if we can further optimise this (and haven't done it before - the LIMIT makes it past the InlineJoin into the lefthand side already, since InlineJoin preserves the rows count).
But a TODO can re-eval things later, 👍 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, we can further optimize it alright :)
The limit being pushed/copied down into the left hand side is a bug - that should only happen in subsequent passes. See my comment here.
Also, if the INLINESTATS has no BY clause, it can be turned into an EVAL with literals in subsequent phases, which can trigger more optimizations (like constant folding, filter pushdown, optimizing away checks against constant keyword fields). Some of this could probably be somehow hacked into the first optimization pass, but currently I think it's more natural (or at least easier) to just have another optimizer pass per query phase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit being pushed/copied down into the left hand side is a bug - that should only happen in subsequent passes.
Actually, very true!
Also, if the INLINESTATS has no BY clause, it can be turned into an EVAL with literals in subsequent phases
I see. Yes, this could be done, but not with the current shape of the planning (i.e. rerunning the optimiser as it is now won't replan). But yes, you're right, this could still be evolved.
Aggregation. Move two methods from EsqlSession to InlineJoin. Address reviews Add more comments
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heya, all done now.
This is a nice advancement of INLINE STATS, let's go! I think there are a couple things to be looked at in follow ups, but this is fine to merge as-is IMO; we can continue iterating in follow-ups as the changes here do not seem to affect the code health of the non-INLINE STATS code paths.
Most notable follow-up items IMO:
- Correctness of optimization rules w.r.t. stubs, esp. the limit pushdown.
- More logical plan optimizer tests for PruneColumns and a good hard look at PruneColumns (maybe I'm misreading things though, see comments below)
| /** | ||
| * Replaces the stubbed source with the actual source. | ||
| */ | ||
| public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we add a note that this will replace all stubs with the new source? In case of a plan with 2 stubs (dual inlinestats in the query), this method should normally be avoided to avoid chaos.
| } | ||
|
|
||
| if (p instanceof InlineJoin ij) { | ||
| inlineJoinRightOutput.addAll(ij.right().outputSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a logical plan optimizer test that demonstrates the necessity for tracking the right outputs of inline stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will be followups. Added TODO
| // However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index, | ||
| // it works differently as we extract all fields (other than the join key) that the EsRelation has. | ||
| var remaining = removeUnused(esr.output(), used); | ||
| var remaining = removeUnused(esr.output(), used, inlineJoinRightOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is odd. The right outputs should be computed from STATS, so the only attribute that could also occur here is a looked up field attr that's later used in the inlinestats' BY clause.
But this should also be contained in the InlineJoin's .references, i.e. should end up in the used set.
| } | ||
| } else if (p instanceof Eval eval) { | ||
| var remaining = removeUnused(eval.fields(), used); | ||
| var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some aggs leave downstream EVALs, like AVG. Does this prevent them from being pruned, even if they're not needed downstream at all?
E.g. in INLINESTATS x = avg(y), a = count_distinct(field) BY z | EVAL x = z I'd expect that making the inline join's right hand side "unpruneable" prevents this rule from correctly pruning unused columns.
| recheck = false; | ||
| if (p instanceof Aggregate aggregate) { | ||
| var remaining = removeUnused(aggregate.aggregates(), used); | ||
| var remaining = removeUnused(aggregate.aggregates(), used, inlineJoinRightOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to protect the computed aggs in an inlinestats, no matter what?
I don't think this is correct long term; unused aggs in inlinestats need to be pruned, too. If the problematic case is when the inline stats ends up with 0 aggs, then I think we need an extra case that prunes the whole inline join. As inline joins preserve row count, this should be correct.
Can we at least leave a TODO? I think we'll need to revisit this and add a bunch of tests to make sure all works as intended.
x-pack/plugin/esql/qa/testFixtures/src/main/resources/inlinestats.csv-spec
Show resolved
Hide resolved
| // TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs | ||
| // if they will not be used anyway |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
| } | ||
| } | ||
| } else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal | ||
| var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't all of the attributes in ij.right().output() protected because they're contained in inlineJoinRightOutput?
| var used = plan.outputSet().asBuilder(); | ||
| // track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the | ||
| // inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes | ||
| var inlineJoinRightOutput = new ArrayList<Attribute>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused how this works in general. Expressions in the inline join's left side generally cannot depend on the stats output, resp. the inline join's right hand side. The dependency is converse - the IJ's right hand side depends on expressions in the left, because it will normally contain a stub.
Also, LOOKUP JOIN doesn't require handling the right hand side output attributes especially, which is suspicious.
Maybe I'm just not seeing it and need an example, though :)
The complication that I would expect: expressions on the left may get pruned because they don't appear in used. Without having tried, something like:
EVAL x = 2*y | INLINE STATS x = avg(x)
The first x is only required in the stats in the right hand side of the inline join, so we may optimize this away on accident if PruneColumns first descends into the left hand side and thus didn't yet add the x to the used attributes.
I didn't check if this actually happens, or if we maybe descend first into the right after all. Still, probably worth a test.
…support_multi_inlinestats_logicalPlan_approach
|
Asking some feedback from @nik9000 as well, especially related to CopyingLocalSupplier and whatever I am doing in there related to Blocks :-). I didn't change the ImmediateLocalSupplier to always behave like CopyingLocalSupplier because I wanted this behavior only for |
…support_multi_inlinestats_logicalPlan_approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some opinions on the serialization change. Compared to the rest of the work they are super minor, but I think we should resolve my questions before merging.
...esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/CopyingLocalSupplier.java
Show resolved
Hide resolved
| if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { | ||
| this.supplier = in.readNamedWriteable(LocalSupplier.class); | ||
| } else { | ||
| this.supplier = LocalSupplier.readFrom((PlanStreamInput) in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's important this method say that it's for the legacy serialization format somehow. Maybe just rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, what about moving it to right here?
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeVInt(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should probably be empty instead of compatible with the previous serialization.
| if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) { | ||
| out.writeNamedWriteable(supplier); | ||
| } else { | ||
| supplier.writeTo(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it's subtle in a way that folks aren't likely to catch by reading it. I did because you told me to look here. But I don't trust myself to see it six months from now.
You made this work by forcing each variant to serialize in a way that's compatible with the original serialization. I think it'd be less surprising if instead you explicitly made an ImmediateSupplier or even just copied the old serialization code to here.
| new PlanStreamOutput(output, null).writeNamedWriteable(instance); | ||
| } else { | ||
| instance.writeTo(new PlanStreamOutput(output, null)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be more traditional to make this abstract and make a subclass for each LocalSupplier subclass. This works as is though.
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
…support_multi_inlinestats_logicalPlan_approach
|
Would Inline stats provide the result that this issue presents? |
…lastic#128917) Part of elastic#124715 and similar to elastic#128476. Different from elastic#128476 in that it takes a "LogicalPlan" approach to running a sub-query, integrating its result back in the "main" LogicalPlan and continuing running the query.
Part of #124715 and similar to #128476.
Different from #128476 in that it takes a "LogicalPlan" approach to running a sub-query, integrating its result back in the "main" LogicalPlan and continuing running the query.