Skip to content

Commit 4153adf

Browse files
authored
Add independent configs for topk/join dynamic filter (#18090)
* Add independent configs for topk/join dynamic filter * fix ci * update doc * fix typo
1 parent 264030c commit 4153adf

File tree

6 files changed

+374
-5
lines changed

6 files changed

+374
-5
lines changed

datafusion/common/src/config.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -745,11 +745,21 @@ config_namespace! {
745745
/// past window functions, if possible
746746
pub enable_window_limits: bool, default = true
747747

748-
/// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
748+
/// When set to true, the optimizer will attempt to push down TopK dynamic filters
749+
/// into the file scan phase.
750+
pub enable_topk_dynamic_filter_pushdown: bool, default = true
751+
752+
/// When set to true, the optimizer will attempt to push down Join dynamic filters
753+
/// into the file scan phase.
754+
pub enable_join_dynamic_filter_pushdown: bool, default = true
755+
756+
/// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase.
749757
/// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
750758
/// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
751759
/// This means that if we already have 10 timestamps in the year 2025
752760
/// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
761+
/// The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown`
762+
/// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
753763
pub enable_dynamic_filter_pushdown: bool, default = true
754764

755765
/// When set to true, the optimizer will insert filters before a join between
@@ -1039,6 +1049,20 @@ impl ConfigOptions {
10391049
};
10401050

10411051
if prefix == "datafusion" {
1052+
if key == "optimizer.enable_dynamic_filter_pushdown" {
1053+
let bool_value = value.parse::<bool>().map_err(|e| {
1054+
DataFusionError::Configuration(format!(
1055+
"Failed to parse '{value}' as bool: {e}",
1056+
))
1057+
})?;
1058+
1059+
{
1060+
self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1061+
self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1062+
self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1063+
}
1064+
return Ok(());
1065+
}
10421066
return ConfigField::set(self, key, value);
10431067
}
10441068

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,7 @@ impl ExecutionPlan for HashJoinExec {
11371137

11381138
// Add dynamic filters in Post phase if enabled
11391139
if matches!(phase, FilterPushdownPhase::Post)
1140-
&& config.optimizer.enable_dynamic_filter_pushdown
1140+
&& config.optimizer.enable_join_dynamic_filter_pushdown
11411141
{
11421142
// Add actual dynamic filter to right side (probe side)
11431143
let dynamic_filter = Self::create_dynamic_filter(&self.on);

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1355,7 +1355,7 @@ impl ExecutionPlan for SortExec {
13551355
ChildFilterDescription::from_child(&parent_filters, self.input())?;
13561356

13571357
if let Some(filter) = &self.filter {
1358-
if config.optimizer.enable_dynamic_filter_pushdown {
1358+
if config.optimizer.enable_topk_dynamic_filter_pushdown {
13591359
child = child.with_self_filter(filter.read().expr());
13601360
}
13611361
}

0 commit comments

Comments
 (0)