Skip to content

Commit 93d177d

Browse files
authored
Extend dynamic filter to joins that preserve probe side ON (#20447)
The dynamic filter from HashJoinExec was previously gated to Inner joins only. PR #20192 refactored the join filter pushdown infrastructure, which makes extending self-generated filters to join types that preserve probe side on as defined by by `on_lr_is_preserved` function trivial. This PR promotes that function to the `JoinType` and uses it to determine what self-generated dynamic join filter to push down. This enables dynamic filter for hash joins to Left, LeftSemi, RightSemi, LeftAnti and LeftMark. ## Which issue does this PR close? This PR makes progress on #16973 ## Rationale for this change The self-generated dynamic filter in HashJoinExec filters the probe side using build-side values. For Left and LeftSemi joins, the right-hand probe side has the same filtering semantics as Inner. Relaxing the gate using `on_lr_is_preserved`. ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent 1f37a33 commit 93d177d

File tree

6 files changed

+391
-40
lines changed

6 files changed

+391
-40
lines changed

datafusion/common/src/join_type.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,35 @@ impl JoinType {
9797
}
9898
}
9999

100+
/// Whether each side of the join is preserved for ON-clause filter pushdown.
101+
///
102+
/// It is only correct to push ON-clause filters below a join for preserved
103+
/// inputs.
104+
///
105+
/// # "Preserved" input definition
106+
///
107+
/// A join side is preserved if the join returns all or a subset of the rows
108+
/// from that side, such that each output row directly maps to an input row.
109+
/// If a side is not preserved, the join can produce extra null rows that
110+
/// don't map to any input row.
111+
///
112+
/// # Return Value
113+
///
114+
/// A tuple of booleans - (left_preserved, right_preserved).
115+
pub fn on_lr_is_preserved(&self) -> (bool, bool) {
116+
match self {
117+
JoinType::Inner => (true, true),
118+
JoinType::Left => (false, true),
119+
JoinType::Right => (true, false),
120+
JoinType::Full => (false, false),
121+
JoinType::LeftSemi | JoinType::RightSemi => (true, true),
122+
JoinType::LeftAnti => (false, true),
123+
JoinType::RightAnti => (true, false),
124+
JoinType::LeftMark => (false, true),
125+
JoinType::RightMark => (true, false),
126+
}
127+
}
128+
100129
/// Does the join type support swapping inputs?
101130
pub fn supports_swap(&self) -> bool {
102131
matches!(

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4086,3 +4086,277 @@ async fn test_filter_with_projection_pushdown() {
40864086
];
40874087
assert_batches_eq!(expected, &result);
40884088
}
4089+
4090+
#[tokio::test]
4091+
async fn test_hashjoin_dynamic_filter_pushdown_left_join() {
4092+
use datafusion_common::JoinType;
4093+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
4094+
4095+
// Create build side with limited values
4096+
let build_batches = vec![
4097+
record_batch!(
4098+
("a", Utf8, ["aa", "ab"]),
4099+
("b", Utf8, ["ba", "bb"]),
4100+
("c", Float64, [1.0, 2.0])
4101+
)
4102+
.unwrap(),
4103+
];
4104+
let build_side_schema = Arc::new(Schema::new(vec![
4105+
Field::new("a", DataType::Utf8, false),
4106+
Field::new("b", DataType::Utf8, false),
4107+
Field::new("c", DataType::Float64, false),
4108+
]));
4109+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
4110+
.with_support(true)
4111+
.with_batches(build_batches)
4112+
.build();
4113+
4114+
// Create probe side with more values (some won't match)
4115+
let probe_batches = vec![
4116+
record_batch!(
4117+
("a", Utf8, ["aa", "ab", "ac", "ad"]),
4118+
("b", Utf8, ["ba", "bb", "bc", "bd"]),
4119+
("e", Float64, [1.0, 2.0, 3.0, 4.0])
4120+
)
4121+
.unwrap(),
4122+
];
4123+
let probe_side_schema = Arc::new(Schema::new(vec![
4124+
Field::new("a", DataType::Utf8, false),
4125+
Field::new("b", DataType::Utf8, false),
4126+
Field::new("e", DataType::Float64, false),
4127+
]));
4128+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
4129+
.with_support(true)
4130+
.with_batches(probe_batches)
4131+
.build();
4132+
4133+
// Create HashJoinExec with Left join and CollectLeft mode
4134+
let on = vec![
4135+
(
4136+
col("a", &build_side_schema).unwrap(),
4137+
col("a", &probe_side_schema).unwrap(),
4138+
),
4139+
(
4140+
col("b", &build_side_schema).unwrap(),
4141+
col("b", &probe_side_schema).unwrap(),
4142+
),
4143+
];
4144+
let plan = Arc::new(
4145+
HashJoinExec::try_new(
4146+
build_scan,
4147+
Arc::clone(&probe_scan),
4148+
on,
4149+
None,
4150+
&JoinType::Left,
4151+
None,
4152+
PartitionMode::CollectLeft,
4153+
datafusion_common::NullEquality::NullEqualsNothing,
4154+
false,
4155+
)
4156+
.unwrap(),
4157+
) as Arc<dyn ExecutionPlan>;
4158+
4159+
// Expect the dynamic filter predicate to be pushed down into the probe side DataSource
4160+
insta::assert_snapshot!(
4161+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
4162+
@r"
4163+
OptimizationTest:
4164+
input:
4165+
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
4166+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4167+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
4168+
output:
4169+
Ok:
4170+
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
4171+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4172+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
4173+
",
4174+
);
4175+
4176+
// Actually apply the optimization and execute the plan
4177+
let mut config = ConfigOptions::default();
4178+
config.execution.parquet.pushdown_filters = true;
4179+
config.optimizer.enable_dynamic_filter_pushdown = true;
4180+
let plan = FilterPushdown::new_post_optimization()
4181+
.optimize(plan, &config)
4182+
.unwrap();
4183+
4184+
// Test that dynamic filter linking survives with_new_children
4185+
let children = plan.children().into_iter().map(Arc::clone).collect();
4186+
let plan = plan.with_new_children(children).unwrap();
4187+
4188+
let config = SessionConfig::new().with_batch_size(10);
4189+
let session_ctx = SessionContext::new_with_config(config);
4190+
session_ctx.register_object_store(
4191+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
4192+
Arc::new(InMemory::new()),
4193+
);
4194+
let state = session_ctx.state();
4195+
let task_ctx = state.task_ctx();
4196+
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
4197+
.await
4198+
.unwrap();
4199+
4200+
// After execution, verify the dynamic filter was populated with bounds and IN-list
4201+
insta::assert_snapshot!(
4202+
format!("{}", format_plan_for_test(&plan)),
4203+
@r"
4204+
- HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)]
4205+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4206+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
4207+
"
4208+
);
4209+
4210+
// Verify result correctness: left join preserves all build (left) rows.
4211+
// All build rows match probe rows here, so we get 2 matched rows.
4212+
// The dynamic filter pruned unmatched probe rows (ac, ad) at scan time,
4213+
// which is safe because those probe rows would never match any build row.
4214+
let result = format!("{}", pretty_format_batches(&batches).unwrap());
4215+
insta::assert_snapshot!(
4216+
result,
4217+
@r"
4218+
+----+----+-----+----+----+-----+
4219+
| a | b | c | a | b | e |
4220+
+----+----+-----+----+----+-----+
4221+
| aa | ba | 1.0 | aa | ba | 1.0 |
4222+
| ab | bb | 2.0 | ab | bb | 2.0 |
4223+
+----+----+-----+----+----+-----+
4224+
"
4225+
);
4226+
}
4227+
4228+
#[tokio::test]
4229+
async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
4230+
use datafusion_common::JoinType;
4231+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
4232+
4233+
// Create build side with limited values
4234+
let build_batches = vec![
4235+
record_batch!(
4236+
("a", Utf8, ["aa", "ab"]),
4237+
("b", Utf8, ["ba", "bb"]),
4238+
("c", Float64, [1.0, 2.0])
4239+
)
4240+
.unwrap(),
4241+
];
4242+
let build_side_schema = Arc::new(Schema::new(vec![
4243+
Field::new("a", DataType::Utf8, false),
4244+
Field::new("b", DataType::Utf8, false),
4245+
Field::new("c", DataType::Float64, false),
4246+
]));
4247+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
4248+
.with_support(true)
4249+
.with_batches(build_batches)
4250+
.build();
4251+
4252+
// Create probe side with more values (some won't match)
4253+
let probe_batches = vec![
4254+
record_batch!(
4255+
("a", Utf8, ["aa", "ab", "ac", "ad"]),
4256+
("b", Utf8, ["ba", "bb", "bc", "bd"]),
4257+
("e", Float64, [1.0, 2.0, 3.0, 4.0])
4258+
)
4259+
.unwrap(),
4260+
];
4261+
let probe_side_schema = Arc::new(Schema::new(vec![
4262+
Field::new("a", DataType::Utf8, false),
4263+
Field::new("b", DataType::Utf8, false),
4264+
Field::new("e", DataType::Float64, false),
4265+
]));
4266+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
4267+
.with_support(true)
4268+
.with_batches(probe_batches)
4269+
.build();
4270+
4271+
// Create HashJoinExec with LeftSemi join and CollectLeft mode
4272+
let on = vec![
4273+
(
4274+
col("a", &build_side_schema).unwrap(),
4275+
col("a", &probe_side_schema).unwrap(),
4276+
),
4277+
(
4278+
col("b", &build_side_schema).unwrap(),
4279+
col("b", &probe_side_schema).unwrap(),
4280+
),
4281+
];
4282+
let plan = Arc::new(
4283+
HashJoinExec::try_new(
4284+
build_scan,
4285+
Arc::clone(&probe_scan),
4286+
on,
4287+
None,
4288+
&JoinType::LeftSemi,
4289+
None,
4290+
PartitionMode::CollectLeft,
4291+
datafusion_common::NullEquality::NullEqualsNothing,
4292+
false,
4293+
)
4294+
.unwrap(),
4295+
) as Arc<dyn ExecutionPlan>;
4296+
4297+
// Expect the dynamic filter predicate to be pushed down into the probe side DataSource
4298+
insta::assert_snapshot!(
4299+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
4300+
@r"
4301+
OptimizationTest:
4302+
input:
4303+
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
4304+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4305+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
4306+
output:
4307+
Ok:
4308+
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
4309+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4310+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
4311+
",
4312+
);
4313+
4314+
// Actually apply the optimization and execute the plan
4315+
let mut config = ConfigOptions::default();
4316+
config.execution.parquet.pushdown_filters = true;
4317+
config.optimizer.enable_dynamic_filter_pushdown = true;
4318+
let plan = FilterPushdown::new_post_optimization()
4319+
.optimize(plan, &config)
4320+
.unwrap();
4321+
4322+
// Test that dynamic filter linking survives with_new_children
4323+
let children = plan.children().into_iter().map(Arc::clone).collect();
4324+
let plan = plan.with_new_children(children).unwrap();
4325+
4326+
let config = SessionConfig::new().with_batch_size(10);
4327+
let session_ctx = SessionContext::new_with_config(config);
4328+
session_ctx.register_object_store(
4329+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
4330+
Arc::new(InMemory::new()),
4331+
);
4332+
let state = session_ctx.state();
4333+
let task_ctx = state.task_ctx();
4334+
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
4335+
.await
4336+
.unwrap();
4337+
4338+
// After execution, verify the dynamic filter was populated with bounds and IN-list
4339+
insta::assert_snapshot!(
4340+
format!("{}", format_plan_for_test(&plan)),
4341+
@r"
4342+
- HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)]
4343+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
4344+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
4345+
"
4346+
);
4347+
4348+
// Verify result correctness: left semi join returns only build (left) rows
4349+
// that have at least one matching probe row. Output schema is build-side columns only.
4350+
let result = format!("{}", pretty_format_batches(&batches).unwrap());
4351+
insta::assert_snapshot!(
4352+
result,
4353+
@r"
4354+
+----+----+-----+
4355+
| a | b | c |
4356+
+----+----+-----+
4357+
| aa | ba | 1.0 |
4358+
| ab | bb | 2.0 |
4359+
+----+----+-----+
4360+
"
4361+
);
4362+
}

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -176,27 +176,9 @@ pub(crate) fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
176176
}
177177
}
178178

179-
/// For a given JOIN type, determine whether each input of the join is preserved
180-
/// for the join condition (`ON` clause filters).
181-
///
182-
/// It is only correct to push filters below a join for preserved inputs.
183-
///
184-
/// # Return Value
185-
/// A tuple of booleans - (left_preserved, right_preserved).
186-
///
187-
/// See [`lr_is_preserved`] for a definition of "preserved".
179+
/// See [`JoinType::on_lr_is_preserved`] for details.
188180
pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) {
189-
match join_type {
190-
JoinType::Inner => (true, true),
191-
JoinType::Left => (false, true),
192-
JoinType::Right => (true, false),
193-
JoinType::Full => (false, false),
194-
JoinType::LeftSemi | JoinType::RightSemi => (true, true),
195-
JoinType::LeftAnti => (false, true),
196-
JoinType::RightAnti => (true, false),
197-
JoinType::LeftMark => (false, true),
198-
JoinType::RightMark => (true, false),
199-
}
181+
join_type.on_lr_is_preserved()
200182
}
201183

202184
/// Evaluates the columns referenced in the given expression to see if they refer

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -837,9 +837,8 @@ impl HashJoinExec {
837837
}
838838

839839
fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
840-
if self.join_type != JoinType::Inner
841-
|| !config.optimizer.enable_join_dynamic_filter_pushdown
842-
{
840+
let (_, probe_preserved) = self.join_type.on_lr_is_preserved();
841+
if !probe_preserved || !config.optimizer.enable_join_dynamic_filter_pushdown {
843842
return false;
844843
}
845844

0 commit comments

Comments
 (0)