Skip to content

Commit 5419ff5

Browse files
feat: hash partitioning satisfies subset (#19304)
## Which issue does this PR close? - Closes #19269. ## Rationale for this change See to issue #19269 for deeper rationale. DF did not have the notion that being partitioned on a superset of the required partitioning satisfied the condition. Having this logic will eliminate unnecessary repartitions and in turn other operators like partial aggregations. I introduced this behavior with the `repartition_subset_satisfactions` flag (default false) as there are some cases where repartitioning may still be wanted when we satisfy partitioning via this subset property. In particular, if when partitioned via Hash(a) there is data skew but when partitioned on Hash(a, b) there is better distribution, a user may want to turn this optimization off. I also made it the case such that if we satisfy repartitioning via a subset but the current amount of partitions < target_partitions, then we will still repartition to maintain and increase parallelism in the system when possible. ## What changes are included in this PR? - Modified `satisfy()` logic to check for subsets and return an enum of type of match: exact, subset, none - Do in `EnforceDistribution`, where `satisfy()` is called, do not allow subset logic for partitioned join operators as partitioning on each side much match exactly, thus need to repartition if subset logic is true - Created unit and sqllogictests ## Are these changes tested? - Unit test - sqllogictest - tpch correctness ### Benchmarks I did not see any drastic changes in benches, but the shuffle eliminations will be great improvements for distributed DF. <img width="628" height="762" alt="Screenshot 2025-12-12 at 8 28 15 PM" src="https://github.com/user-attachments/assets/4b42945f-34e0-46c9-a4ce-e7ccdd0c0603" /> <img width="490" height="746" alt="Screenshot 2025-12-12 at 8 30 15 PM" src="https://github.com/user-attachments/assets/846aef1b-8c5d-462d-83e7-7fa1e2a9372e" /> ## Are there any user-facing changes? Yes, users will now have the `repartition_subset_satisfications` option as described in this PR --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent b3d2cb6 commit 5419ff5

File tree

11 files changed

+1352
-234
lines changed

11 files changed

+1352
-234
lines changed

datafusion/common/src/config.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,34 @@ config_namespace! {
10001000
/// ```
10011001
pub repartition_sorts: bool, default = true
10021002

1003+
/// Partition count threshold for subset satisfaction optimization.
1004+
///
1005+
/// When the current partition count is >= this threshold, DataFusion will
1006+
/// skip repartitioning if the required partitioning expression is a subset
1007+
/// of the current partition expression such as Hash(a) satisfies Hash(a, b).
1008+
///
1009+
/// When the current partition count is < this threshold, DataFusion will
1010+
/// repartition to increase parallelism even when subset satisfaction applies.
1011+
///
1012+
/// Set to 0 to always repartition (disable subset satisfaction optimization).
1013+
/// Set to a high value to always use subset satisfaction.
1014+
///
1015+
/// Example (subset_repartition_threshold = 4):
1016+
/// ```text
1017+
/// Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a])
1018+
///
1019+
/// If current partitions (3) < threshold (4), repartition:
1020+
/// AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)]
1021+
/// RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3
1022+
/// AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)]
1023+
/// DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3)
1024+
///
1025+
/// If current partitions (8) >= threshold (4), use subset satisfaction:
1026+
/// AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)]
1027+
/// DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8)
1028+
/// ```
1029+
pub subset_repartition_threshold: usize, default = 4
1030+
10031031
/// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
10041032
/// (i.e. setting `preserve_order` to true on `RepartitionExec` and
10051033
/// using `SortPreservingMergeExec`)

0 commit comments

Comments
 (0)