Skip to content

Commit f1e5c94

Browse files
authored
Compute Dynamic Filters only when a consumer supports them (#19546)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Closes #17527 ## Rationale for this change Currently, DataFusion computes bounds for all queries that contain a HashJoinExec node whenever the option enable_dynamic_filter_pushdown is set to true (default). It might make sense to compute these bounds only when we explicitly know there is a consumer that will use them. ## What changes are included in this PR? As suggested in #17527 (comment), this PR adds an is_used() method to DynamicFilterPhysicalExpr that checks if any consumers are holding a reference to the filter using Arc::strong_count(). During filter pushdown, consumers that accept the filter and use it later in execution have to retain a reference to Arc. For example, scan nodes like ParquetSource. ## Are these changes tested? I added a unit test in dynamic_filters.rs (test_is_used) that verifies the Arc reference counting behavior. Existing integration tests in datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs validate the end-to-end behavior. These tests verify that dynamic filters are computed and filled when consumers are present. ## Are there any user-facing changes? new is_used() function
1 parent 4e45c19 commit f1e5c94

File tree

3 files changed

+188
-3
lines changed

3 files changed

+188
-3
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3512,3 +3512,91 @@ async fn test_hashjoin_hash_table_pushdown_integer_keys() {
35123512
",
35133513
);
35143514
}
3515+
3516+
#[tokio::test]
3517+
async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
3518+
use datafusion_common::JoinType;
3519+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
3520+
3521+
// Test both cases: probe side with and without filter pushdown support
3522+
for (probe_supports_pushdown, expected_is_used) in [(false, false), (true, true)] {
3523+
let build_side_schema = Arc::new(Schema::new(vec![
3524+
Field::new("a", DataType::Utf8, false),
3525+
Field::new("b", DataType::Utf8, false),
3526+
]));
3527+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
3528+
.with_support(true)
3529+
.with_batches(vec![
3530+
record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Utf8, ["ba", "bb"]))
3531+
.unwrap(),
3532+
])
3533+
.build();
3534+
3535+
let probe_side_schema = Arc::new(Schema::new(vec![
3536+
Field::new("a", DataType::Utf8, false),
3537+
Field::new("b", DataType::Utf8, false),
3538+
]));
3539+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
3540+
.with_support(probe_supports_pushdown)
3541+
.with_batches(vec![
3542+
record_batch!(
3543+
("a", Utf8, ["aa", "ab", "ac", "ad"]),
3544+
("b", Utf8, ["ba", "bb", "bc", "bd"])
3545+
)
3546+
.unwrap(),
3547+
])
3548+
.build();
3549+
3550+
let on = vec![
3551+
(
3552+
col("a", &build_side_schema).unwrap(),
3553+
col("a", &probe_side_schema).unwrap(),
3554+
),
3555+
(
3556+
col("b", &build_side_schema).unwrap(),
3557+
col("b", &probe_side_schema).unwrap(),
3558+
),
3559+
];
3560+
let plan = Arc::new(
3561+
HashJoinExec::try_new(
3562+
build_scan,
3563+
probe_scan,
3564+
on,
3565+
None,
3566+
&JoinType::Inner,
3567+
None,
3568+
PartitionMode::CollectLeft,
3569+
datafusion_common::NullEquality::NullEqualsNothing,
3570+
)
3571+
.unwrap(),
3572+
) as Arc<dyn ExecutionPlan>;
3573+
3574+
// Apply filter pushdown optimization
3575+
let mut config = ConfigOptions::default();
3576+
config.execution.parquet.pushdown_filters = true;
3577+
config.optimizer.enable_dynamic_filter_pushdown = true;
3578+
let plan = FilterPushdown::new_post_optimization()
3579+
.optimize(plan, &config)
3580+
.unwrap();
3581+
3582+
// Get the HashJoinExec to check the dynamic filter
3583+
let hash_join = plan
3584+
.as_any()
3585+
.downcast_ref::<HashJoinExec>()
3586+
.expect("Plan should be HashJoinExec");
3587+
3588+
// Verify that a dynamic filter was created
3589+
let dynamic_filter = hash_join
3590+
.dynamic_filter_for_test()
3591+
.expect("Dynamic filter should be created");
3592+
3593+
// Verify that is_used() returns the expected value based on probe side support.
3594+
// When probe_supports_pushdown=false: no consumer holds a reference (is_used=false)
3595+
// When probe_supports_pushdown=true: probe side holds a reference (is_used=true)
3596+
assert_eq!(
3597+
dynamic_filter.is_used(),
3598+
expected_is_used,
3599+
"is_used() should return {expected_is_used} when probe side support is {probe_supports_pushdown}"
3600+
);
3601+
}
3602+
}

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,17 @@ impl DynamicFilterPhysicalExpr {
278278

279279
/// Wait asynchronously until this dynamic filter is marked as complete.
280280
///
281-
/// This method returns immediately if the filter is already complete.
281+
/// This method returns immediately if the filter is already complete or if the filter
282+
/// is not being used by any consumers.
282283
/// Otherwise, it waits until [`Self::mark_complete`] is called.
283284
///
284285
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
285286
/// the filter is fully complete with no more updates expected.
286-
pub async fn wait_complete(&self) {
287+
pub async fn wait_complete(self: &Arc<Self>) {
288+
if !self.is_used() {
289+
return;
290+
}
291+
287292
if self.inner.read().is_complete {
288293
return;
289294
}
@@ -294,6 +299,22 @@ impl DynamicFilterPhysicalExpr {
294299
.await;
295300
}
296301

302+
/// Check if this dynamic filter is being actively used by any consumers.
303+
///
304+
/// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec
305+
/// that created the filter). This is useful to avoid computing expensive filter
306+
/// expressions when no consumer will actually use them.
307+
///
308+
/// Note: We check the inner Arc's strong_count, not the outer Arc's count, because
309+
/// when filters are transformed (e.g., via reassign_expr_columns during filter pushdown),
310+
/// new outer Arc instances are created via with_new_children(), but they all share the
311+
/// same inner `Arc<RwLock<Inner>>`. This is what allows filter updates to propagate to
312+
/// consumers even after transformation.
313+
pub fn is_used(self: &Arc<Self>) -> bool {
314+
// Strong count > 1 means at least one consumer is holding a reference beyond the producer.
315+
Arc::strong_count(&self.inner) > 1
316+
}
317+
297318
fn render(
298319
&self,
299320
f: &mut std::fmt::Formatter<'_>,
@@ -691,4 +712,45 @@ mod test {
691712
"Expected b + d = [1010, 2020, 3030], got {arr_2:?}",
692713
);
693714
}
715+
716+
#[test]
717+
fn test_is_used() {
718+
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
719+
vec![],
720+
lit(true) as Arc<dyn PhysicalExpr>,
721+
));
722+
723+
// Initially, only one reference to the inner Arc exists
724+
assert!(
725+
!filter.is_used(),
726+
"Filter should not be used with only one inner reference"
727+
);
728+
729+
// Simulate a consumer created via transformation (what happens during filter pushdown).
730+
// When filters are pushed down and transformed via reassign_expr_columns/transform_down,
731+
// with_new_children() is called which creates a new outer Arc but clones the inner Arc.
732+
let consumer1_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap();
733+
let _consumer1 = consumer1_expr
734+
.as_any()
735+
.downcast_ref::<DynamicFilterPhysicalExpr>()
736+
.expect("Should be DynamicFilterPhysicalExpr");
737+
738+
// Now the inner Arc is shared (inner_count = 2)
739+
assert!(
740+
filter.is_used(),
741+
"Filter should be used when inner Arc is shared with transformed consumer"
742+
);
743+
744+
// Create another transformed consumer
745+
let consumer2_expr = Arc::clone(&filter).with_new_children(vec![]).unwrap();
746+
let _consumer2 = consumer2_expr
747+
.as_any()
748+
.downcast_ref::<DynamicFilterPhysicalExpr>()
749+
.expect("Should be DynamicFilterPhysicalExpr");
750+
751+
assert!(
752+
filter.is_used(),
753+
"Filter should still be used with multiple consumers"
754+
);
755+
}
694756
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,17 @@ impl HashJoinExec {
508508
self.null_equality
509509
}
510510

511+
/// Get the dynamic filter expression for testing purposes.
512+
/// Returns `None` if no dynamic filter has been set.
513+
///
514+
/// This method is intended for testing only and should not be used in production code.
515+
#[doc(hidden)]
516+
pub fn dynamic_filter_for_test(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
517+
self.dynamic_filter
518+
.as_ref()
519+
.map(|df| Arc::clone(&df.filter))
520+
}
521+
511522
/// Calculate order preservation flags for this hash join.
512523
fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
513524
vec![
@@ -921,7 +932,21 @@ impl ExecutionPlan for HashJoinExec {
921932
consider using CoalescePartitionsExec or the EnforceDistribution rule"
922933
);
923934

924-
let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
935+
// Only enable dynamic filter pushdown if:
936+
// - The session config enables dynamic filter pushdown
937+
// - A dynamic filter exists
938+
// - At least one consumer is holding a reference to it, this avoids expensive filter
939+
// computation when disabled or when no consumer will use it.
940+
let enable_dynamic_filter_pushdown = context
941+
.session_config()
942+
.options()
943+
.optimizer
944+
.enable_join_dynamic_filter_pushdown
945+
&& self
946+
.dynamic_filter
947+
.as_ref()
948+
.map(|df| df.filter.is_used())
949+
.unwrap_or(false);
925950

926951
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
927952
let left_fut = match self.mode {
@@ -4610,6 +4635,11 @@ mod tests {
46104635
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
46114636
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
46124637

4638+
// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
4639+
let _consumer = Arc::clone(&dynamic_filter)
4640+
.with_new_children(vec![])
4641+
.unwrap();
4642+
46134643
// Create HashJoinExec with the dynamic filter
46144644
let mut join = HashJoinExec::try_new(
46154645
left,
@@ -4658,6 +4688,11 @@ mod tests {
46584688
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
46594689
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
46604690

4691+
// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
4692+
let _consumer = Arc::clone(&dynamic_filter)
4693+
.with_new_children(vec![])
4694+
.unwrap();
4695+
46614696
// Create HashJoinExec with the dynamic filter
46624697
let mut join = HashJoinExec::try_new(
46634698
left,

0 commit comments

Comments
 (0)