-
Notifications
You must be signed in to change notification settings - Fork 844
refactor: support row/bucket shuffle for aggregation #19155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
dqhl76
wants to merge
32
commits into
databendlabs:main
Choose a base branch
from
dqhl76:aggregate-24
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,802
−2,075
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This was referenced Dec 25, 2025
refactor: refactor spiller row shuffle spill tmp save clean pass bucket partial force spill pass tpch basic pass feat: add support for partitioned aggregate serialization and deserialization save remove unused activate_worker refactor: also add stats for final aggregate feat: implement configurable aggregate shuffle mode refactor: add basic bucket level shuffled agg refactor: remove legacy final agg feat: refactor aggregate function in build_aggregate to make it clear
Contributor
Docker Image for PR
|
Contributor
Contributor
Docker Image for PR
|
Contributor
11 tasks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
ci-benchmark
Benchmark: run all test
pr-refactor
this PR changes the code base without new features or bugfix
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/
Summary
This PR included:
Support for Two Shuffle Methods: Added support for both Row and Bucket shuffle modes. Please refer to the diagram below for a visual comparison of the differences.
Recursive Spilling in Final Aggregate: Enabled the final aggregate processor to spill and restore data recursively. This enhancement not introduce performance regression compared to the previous implementation.
Reason for Dual Shuffle Methods:
Background: In our previous implementation, we used a "growth payload" strategy for the partial aggregation stage. While this dynamic approach was flexible and effective for varying data sizes, it necessitated an alignment processor (TransformPartitionBucket) and made implementing spilling support for the final aggregation difficult.
Solution: We moved to a fixed-bucket strategy determined at plan-time (based on
node_numandthread_num). This symmetry between partial and final aggregation simplifies the architecture and enables simpler spilling support.The Trade-off: With high concurrency (many nodes/threads), a purely fixed-bucket approach creates excessive buckets, degrading partial aggregation performance.
The Fix: We introduce a Row Shuffle method for high-concurrency scenarios. This forces the partial aggregation to use a single bucket, followed by a scatter operation to distribute data to the final aggregation processors, avoiding the overhead of managing too many buckets early in the pipeline.
Tests
Type of change
This change is