-
Notifications
You must be signed in to change notification settings - Fork 478
Fluss sink supports dynamic shuffle. #1784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
50ad020 to
249e260
Compare
|
@wuchong @leonardBang , CC |
249e260 to
bc6be75
Compare
|
@wuchong , CC, this pr is needed. |
bc6be75 to
c2f5a53
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements dynamic shuffle mode for Fluss sink to address performance bottlenecks in the current bucket-based shuffle approach. The implementation collects data statistics at each checkpoint, aggregates them in a coordinator, and dynamically redistributes records across downstream subtasks based on partition traffic patterns. This approach prevents single-subtask bottlenecks and provides better load balancing, especially for partitioned tables with uneven traffic distribution.
Key Changes:
- Introduces
DistributionModeenum with three modes: NONE, BUCKET_SHUFFLE, and DYNAMIC_SHUFFLE - Implements statistics collection and coordination infrastructure (DataStatistics, DataStatisticsOperator, DataStatisticsCoordinator)
- Adds weighted assignment strategies (WeightedRandomAssignment, WeightedBucketIdAssignment) for dynamic partition-to-subtask mapping
- Extends FlussSerializationSchema with size calculation method for accurate weight estimation
Reviewed changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated 19 comments.
Show a summary per file
| File | Description |
|---|---|
| DistributionMode.java | New enum defining shuffle distribution modes (NONE, BUCKET_SHUFFLE, DYNAMIC_SHUFFLE) |
| DataStatistics.java | Data structure tracking partition names and their frequency counts |
| DataStatisticsOperator.java | Flink operator collecting local statistics and forwarding records wrapped with statistics |
| DataStatisticsCoordinator.java | Coordinator aggregating statistics from subtasks and broadcasting global statistics |
| StatisticsOrRecordChannelComputer.java | Channel computer implementing dynamic shuffle based on partition statistics |
| WeightedRandomAssignment.java | Partition assignment strategy using weighted random distribution |
| WeightedBucketIdAssignment.java | Bucket-aware partition assignment combining bucketing with weighted distribution |
| RowDataSerializationSchema.java | Extended with size() method to calculate record size for weight estimation |
| FlinkSink.java | Updated to support DYNAMIC_SHUFFLE mode with pre-write topology transformation |
| FlinkTableSink.java | Modified to use DistributionMode and provide DataStreamSinkProvider for dynamic shuffle |
| FlussSinkBuilder.java | Updated API to accept DistributionMode and TypeInformation for shuffle configuration |
| FlinkConnectorOptions.java | Added SINK_DISTRIBUTION_MODE configuration option |
| Test files | Comprehensive unit tests for statistics, channel computer, and partition assignment |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
Outdated
Show resolved
Hide resolved
...-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
Show resolved
Hide resolved
...-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
Show resolved
Hide resolved
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
Outdated
Show resolved
Hide resolved
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
Outdated
Show resolved
Hide resolved
...mon/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java
Show resolved
Hide resolved
...flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsSerializer.java
Show resolved
Hide resolved
...k-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordSerializer.java
Show resolved
Hide resolved
...mon/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java
Show resolved
Hide resolved
...mon/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java
Show resolved
Hide resolved
e6591b3 to
a1ccee5
Compare
a1ccee5 to
24af450
Compare
|
@wuchong , I have rebase this pr, CC |
Purpose
Linked issue: close #1789
Brief change log
Tests
API and Format
Documentation