Skip to content

Commit ebc401a

Browse files
committed
Compute Dynamic Filters only when a consumer supports them
1 parent 769f367 commit ebc401a

File tree

2 files changed

+69
-1
lines changed

2 files changed

+69
-1
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,17 @@ impl DynamicFilterPhysicalExpr {
294294
.await;
295295
}
296296

297+
/// Check if this dynamic filter is being actively used by any consumers.
298+
///
299+
/// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec
300+
/// that created the filter). This is useful to avoid computing expensive filter
301+
/// expressions when no consumer will actually use them.
302+
pub fn is_used(self: &Arc<Self>) -> bool {
303+
// Strong count > 1 means at least one consumer (e.g., ParquetExec, FilterExec)
304+
// is holding a reference beyond the producer.
305+
Arc::strong_count(self) > 1
306+
}
307+
297308
fn render(
298309
&self,
299310
f: &mut std::fmt::Formatter<'_>,
@@ -607,4 +618,47 @@ mod test {
607618
// wait_complete should return immediately
608619
dynamic_filter.wait_complete().await;
609620
}
621+
622+
#[test]
623+
fn test_is_used() {
624+
// Create a dynamic filter
625+
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
626+
vec![],
627+
lit(true) as Arc<dyn PhysicalExpr>,
628+
));
629+
630+
// Initially, only one reference exists (the filter itself)
631+
assert!(
632+
!filter.is_used(),
633+
"Filter should not be used with only one reference"
634+
);
635+
636+
// Simulate a consumer holding a reference (e.g., ParquetExec)
637+
let consumer1 = Arc::clone(&filter);
638+
assert!(
639+
filter.is_used(),
640+
"Filter should be used with a consumer reference"
641+
);
642+
643+
// Multiple consumers
644+
let consumer2 = Arc::clone(&filter);
645+
assert!(
646+
filter.is_used(),
647+
"Filter should still be used with multiple consumers"
648+
);
649+
650+
// Drop one consumer
651+
drop(consumer1);
652+
assert!(
653+
filter.is_used(),
654+
"Filter should still be used with remaining consumer"
655+
);
656+
657+
// Drop all consumers
658+
drop(consumer2);
659+
assert!(
660+
!filter.is_used(),
661+
"Filter should not be used after all consumers dropped"
662+
);
663+
}
610664
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,21 @@ impl ExecutionPlan for HashJoinExec {
910910
consider using CoalescePartitionsExec or the EnforceDistribution rule"
911911
);
912912

913-
let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
913+
// Only enable dynamic filter pushdown if:
914+
// - The session config enables dynamic filter pushdown
915+
// - A dynamic filter exists
916+
// - At least one consumer is holding a reference to it, this avoids expensive filter
917+
// computation when disabled or when no consumer will use it.
918+
let enable_dynamic_filter_pushdown = context
919+
.session_config()
920+
.options()
921+
.optimizer
922+
.enable_join_dynamic_filter_pushdown
923+
&& self
924+
.dynamic_filter
925+
.as_ref()
926+
.map(|df| df.filter.is_used())
927+
.unwrap_or(false);
914928

915929
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
916930
let left_fut = match self.mode {

0 commit comments

Comments
 (0)