Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2507,13 +2507,9 @@ async fn verify_join_output_partitioning() -> Result<()> {
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightMark => {
let right_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new_with_schema("c2_c1", &join_schema)?),
Arc::new(Column::new_with_schema("c2_c2", &join_schema)?),
];
assert_eq!(
out_partitioning,
&Partitioning::Hash(right_exprs, default_partition_count)
&Partitioning::RoundRobinBatch(default_partition_count)
);
}
JoinType::Full => {
Expand Down
117 changes: 60 additions & 57 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,15 +708,15 @@ fn multi_hash_joins() -> Result<()> {
// Should include 4 RepartitionExecs
_ => {
assert_plan!(plan_distrib, @r"
HashJoinExec: mode=Partitioned, join_type=..., on=[(b1@6, c@2)]
RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10
HashJoinExec: mode=Partitioned, join_type=..., on=[(a@0, c@2)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
HashJoinExec: mode=Partitioned, join_type=..., on=[(a@0, b1@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");

Expand Down Expand Up @@ -776,13 +776,14 @@ fn multi_joins_after_alias() -> Result<()> {
plan_distrib,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]
ProjectionExec: expr=[a@0 as a1, a@0 as a2]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1
RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10
ProjectionExec: expr=[a@0 as a1, a@0 as a2]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
"
);
Expand All @@ -804,13 +805,14 @@ fn multi_joins_after_alias() -> Result<()> {
plan_distrib,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a2@1, c@2)]
ProjectionExec: expr=[a@0 as a1, a@0 as a2]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1
RepartitionExec: partitioning=Hash([a2@1], 10), input_partitions=10
ProjectionExec: expr=[a@0 as a1, a@0 as a2]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
"
);
Expand Down Expand Up @@ -863,9 +865,9 @@ fn multi_joins_after_multi_alias() -> Result<()> {
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
"
);
Expand Down Expand Up @@ -1085,17 +1087,18 @@ fn multi_hash_join_key_ordering() -> Result<()> {
@r"
FilterExec: c@6 > 1
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
RepartitionExec: partitioning=Hash([B@2, C@3, AA@1], 10), input_partitions=10
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
"
Expand Down Expand Up @@ -1221,21 +1224,21 @@ fn reorder_join_keys_to_left_input() -> Result<()> {
assert_eq!(captured_join_type, join_type.to_string());

insta::allow_duplicates! {insta::assert_snapshot!(modified_plan, @r"
HashJoinExec: mode=Partitioned, join_type=..., on=[(AA@1, a1@5), (B@2, b1@6), (C@3, c@2)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]
RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");}
HashJoinExec: mode=Partitioned, join_type=..., on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]
RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");}
}

Ok(())
Expand Down Expand Up @@ -1349,21 +1352,21 @@ fn reorder_join_keys_to_right_input() -> Result<()> {
let (_, plan_str) =
hide_first(reordered.as_ref(), r"join_type=(\w+)", "join_type=...");
insta::allow_duplicates! {insta::assert_snapshot!(plan_str, @r"
HashJoinExec: mode=Partitioned, join_type=..., on=[(C@3, c@2), (B@2, b1@6), (AA@1, a1@5)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]
RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");}
HashJoinExec: mode=Partitioned, join_type=..., on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]
RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");}
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1610,7 +1610,7 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= ce AND d@0 IN (SET) ([ca, cb, cc, cd, ce]) ]
"
);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ async fn test_physical_plan_display_indent_multi_children() {
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
"
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use analysis::{AnalysisContext, ExprBoundaries, analyze};
pub use equivalence::{
AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union,
};
pub use partitioning::{Distribution, Partitioning};
pub use partitioning::{Distribution, Partitioning, PartitioningSatisfaction};
pub use physical_expr::{
add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering,
create_ordering, create_physical_sort_expr, create_physical_sort_exprs,
Expand Down
Loading
Loading