Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 52 additions & 50 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,15 +708,15 @@ fn multi_hash_joins() -> Result<()> {
// Should include 4 RepartitionExecs
_ => {
assert_plan!(plan_distrib, @r"
HashJoinExec: mode=Partitioned, join_type=..., on=[(b1@6, c@2)]
RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10
HashJoinExec: mode=Partitioned, join_type=..., on=[(a@0, c@2)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
HashJoinExec: mode=Partitioned, join_type=..., on=[(a@0, b1@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");

Expand Down Expand Up @@ -776,13 +776,14 @@ fn multi_joins_after_alias() -> Result<()> {
plan_distrib,
@r"
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]
ProjectionExec: expr=[a@0 as a1, a@0 as a2]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1
RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10
ProjectionExec: expr=[a@0 as a1, a@0 as a2]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
"
);
Expand Down Expand Up @@ -863,9 +864,9 @@ fn multi_joins_after_multi_alias() -> Result<()> {
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]
RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
"
);
Expand Down Expand Up @@ -1085,17 +1086,18 @@ fn multi_hash_join_key_ordering() -> Result<()> {
@r"
FilterExec: c@6 > 1
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
RepartitionExec: partitioning=Hash([B@2, C@3, AA@1], 10), input_partitions=10
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
"
Expand Down Expand Up @@ -1221,21 +1223,21 @@ fn reorder_join_keys_to_left_input() -> Result<()> {
assert_eq!(captured_join_type, join_type.to_string());

insta::allow_duplicates! {insta::assert_snapshot!(modified_plan, @r"
HashJoinExec: mode=Partitioned, join_type=..., on=[(AA@1, a1@5), (B@2, b1@6), (C@3, c@2)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]
RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");}
HashJoinExec: mode=Partitioned, join_type=..., on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]
RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");}
}

Ok(())
Expand Down Expand Up @@ -1349,21 +1351,21 @@ fn reorder_join_keys_to_right_input() -> Result<()> {
let (_, plan_str) =
hide_first(reordered.as_ref(), r"join_type=(\w+)", "join_type=...");
insta::allow_duplicates! {insta::assert_snapshot!(plan_str, @r"
HashJoinExec: mode=Partitioned, join_type=..., on=[(C@3, c@2), (B@2, b1@6), (AA@1, a1@5)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]
RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");}
HashJoinExec: mode=Partitioned, join_type=..., on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]
ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]
RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]
RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
");}
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not possible to do filtering during the scan anymore with this approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible - but somehow it got removed during the AI-based generation ;)

- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ]
"
);

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ async fn test_physical_plan_display_indent_multi_children() {
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)], projection=[c1@0]
RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], file_type=csv, has_header=true
RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1
RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1
DataSourceExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1@0 as c2], file_type=csv, has_header=true
"
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use analysis::{AnalysisContext, ExprBoundaries, analyze};
pub use equivalence::{
AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union,
};
pub use partitioning::{Distribution, Partitioning};
pub use partitioning::{Distribution, Partitioning, PartitioningSatisfaction};
pub use physical_expr::{
add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering,
create_ordering, create_physical_sort_expr, create_physical_sort_exprs,
Expand Down
35 changes: 25 additions & 10 deletions datafusion/physical-plan/src/joins/array_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ impl ArrayMap {
pub fn get_matched_indices_with_limit_offset(
&self,
prob_side_keys: &[ArrayRef],
indices: Option<&[u32]>,
limit: usize,
current_offset: MapOffset,
probe_indices: &mut Vec<u32>,
Expand All @@ -256,6 +257,7 @@ impl ArrayMap {
lookup_and_get_indices,
self,
array,
indices,
limit,
current_offset,
probe_indices,
Expand All @@ -267,6 +269,7 @@ impl ArrayMap {
fn lookup_and_get_indices<T: ArrowNumericType>(
&self,
array: &ArrayRef,
indices: Option<&[u32]>,
limit: usize,
current_offset: MapOffset,
probe_indices: &mut Vec<u32>,
Expand All @@ -283,11 +286,14 @@ impl ArrayMap {
let have_null = arr.null_count() > 0;

if self.next.is_empty() {
for prob_idx in current_offset.0..arr.len() {
let num_rows = indices.map_or(arr.len(), |v| v.len());
for i in current_offset.0..num_rows {
if build_indices.len() == limit {
return Ok(Some((prob_idx, None)));
return Ok(Some((i, None)));
}

let prob_idx = indices.map_or(i, |v| v[i] as usize);

// short circuit
if have_null && arr.is_null(prob_idx) {
continue;
Expand All @@ -307,6 +313,7 @@ impl ArrayMap {
Ok(None)
} else {
let mut remaining_output = limit;
let num_rows = indices.map_or(arr.len(), |v| v.len());
let to_skip = match current_offset {
// None `initial_next_idx` indicates that `initial_idx` processing hasn't been started
(idx, None) => idx,
Expand All @@ -316,10 +323,11 @@ impl ArrayMap {
// Otherwise, process remaining `initial_idx` matches by traversing `next_chain`,
// to start with the next index
(idx, Some(next_idx)) => {
let is_last = idx == arr.len() - 1;
let prob_idx = indices.map_or(idx, |v| v[idx] as usize);
let is_last = idx == num_rows - 1;
if let Some(next_offset) = traverse_chain(
&self.next,
idx,
prob_idx,
next_idx as u32,
&mut remaining_output,
probe_indices,
Expand All @@ -332,19 +340,21 @@ impl ArrayMap {
}
};

for prob_side_idx in to_skip..arr.len() {
for i in to_skip..num_rows {
if remaining_output == 0 {
return Ok(Some((prob_side_idx, None)));
return Ok(Some((i, None)));
}

if arr.is_null(prob_side_idx) {
let prob_idx = indices.map_or(i, |v| v[i] as usize);

if arr.is_null(prob_idx) {
continue;
}

let is_last = prob_side_idx == arr.len() - 1;
let is_last = i == num_rows - 1;

// SAFETY: prob_idx is guaranteed to be within bounds by the loop range.
let prob_val: u64 = unsafe { arr.value_unchecked(prob_side_idx) }.as_();
let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_();
let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize;
if idx_in_build_side >= self.data.len()
|| self.data[idx_in_build_side] == 0
Expand All @@ -356,7 +366,7 @@ impl ArrayMap {

if let Some(offset) = traverse_chain(
&self.next,
prob_side_idx,
prob_idx,
build_idx,
&mut remaining_output,
probe_indices,
Expand Down Expand Up @@ -430,6 +440,7 @@ mod tests {
while let Some(o) = next {
next = map.get_matched_indices_with_limit_offset(
&probe,
None,
1,
o,
&mut prob_idx,
Expand Down Expand Up @@ -457,6 +468,7 @@ mod tests {
// Skip 10, find 1, next is 2
let next = map.get_matched_indices_with_limit_offset(
&probe,
None,
1,
(0, None),
&mut p_idx,
Expand All @@ -469,6 +481,7 @@ mod tests {
// Find 2, end
let next = map.get_matched_indices_with_limit_offset(
&probe,
None,
1,
next.unwrap(),
&mut p_idx,
Expand All @@ -494,6 +507,7 @@ mod tests {
// batch_size=3, should get 2 matches from first '1' and 1 match from second '1'
let result_offset = array_map.get_matched_indices_with_limit_offset(
&prob_side_keys,
None,
3,
(0, None),
&mut prob_indices,
Expand Down Expand Up @@ -525,6 +539,7 @@ mod tests {
// Call once to get all matches
let result_offset = array_map.get_matched_indices_with_limit_offset(
&prob_side_keys,
None,
10, // A batch size larger than number of probes
(0, None),
&mut prob_indices,
Expand Down
Loading
Loading