Skip to content

Conversation

@dnhatn
Copy link
Member

@dnhatn dnhatn commented Oct 1, 2025

Currently, LocalExecutionPlanner re-generates the intermediate output of a partial aggregation, leading to its intermediate outputs don't match the layout - see the new assertions added to the exchange sink node. This hasn't been an issue because:

  1. For exchange sink, we don't check the layout; we just pass the page to the exchange buffer.

  2. Partial/final aggregations work due to the cache in AggregateMapper.

This becomes a problem when I try to add field extraction after partial aggregation on data nodes in time-series. With this change, we create a layout that matches the output attributes of partial aggregation. This change also removes the cache in AggregateMapper, which was confusing.

@dnhatn dnhatn force-pushed the avoid-regenerate-agg-state branch 4 times, most recently from e2bfc00 to 591ca3f Compare October 2, 2025 04:11
@dnhatn dnhatn force-pushed the avoid-regenerate-agg-state branch from 591ca3f to 3221cc6 Compare October 2, 2025 04:32
@dnhatn dnhatn changed the title Avoid re-generating intermediate states when executing plans Ensure partial aggregation outputs matches layout Oct 2, 2025
@dnhatn dnhatn changed the title Ensure partial aggregation outputs matches layout Ensure partial aggregation outputs match layout Oct 2, 2025
@dnhatn dnhatn requested review from alex-spies and ivancea October 2, 2025 04:34
@dnhatn dnhatn marked this pull request as ready for review October 2, 2025 04:35
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Oct 2, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that we don't just re-compute the intermediate attributes anymore, which was confusing. Thanks, Nhat!

if (Assertions.ENABLED) {
List<Attribute> inputAttributes = exchangeSink.child().output();
for (Attribute attr : inputAttributes) {
assert source.layout.get(attr.id()) != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe throw ISE instead, so that messing up an agg will not kill the node during a full run of the test suite. That makes for easier-to-triage CI issues opened by the CI bot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be an assertion to verify the invariant. I opened #135862 to handle the spec tests when the test cluster is broken.

if (Assertions.ENABLED) {
List<Attribute> inputAttributes = exchangeSink.child().output();
for (Attribute attr : inputAttributes) {
assert source.layout.get(attr.id()) != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added check is very reasonable. Maybe it makes sense to add it to the general plan method, as it's an invariant that's not only required for planning exchanges, but for every plan node?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ I tried to add this, but this invariant doesn't hold for ExchangeSinkExec and ExchangeSourceExec. I will address this in a follow-up.

List<Aggregator.Factory> aggregatorFactories = new ArrayList<>();

// append channels to the layout
if (aggregatorMode.isOutputPartial()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, AggregateExec#output has its own isOutputPartial check. I think it is probably correct to remove this if-else and always call layout.append(aggregateExec.output()); the only thing AggregateExec#output does differently in the non-partial case is de-duplicating based on name, but we shouldn't have duplicates to begin with. And if we have, we shouldn't be adding them to the output layout if they're not in the agg's output to begin with.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can use aggregateExec.output() - I pushed c5ab8d8

@dnhatn
Copy link
Member Author

dnhatn commented Oct 2, 2025

Thanks Alex!

@dnhatn dnhatn merged commit c1cce66 into elastic:main Oct 2, 2025
34 checks passed
@dnhatn dnhatn deleted the avoid-regenerate-agg-state branch October 2, 2025 20:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants