Skip to content

Commit ab03bab

Browse files
committed
Add independent configs for topk/join dynamic filter (apache#18090)
* Add independent configs for topk/join dynamic filter * fix ci * update doc * fix typo
1 parent d799780 commit ab03bab

File tree

6 files changed

+374
-5
lines changed

6 files changed

+374
-5
lines changed

datafusion/common/src/config.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,11 +731,21 @@ config_namespace! {
731731
/// past window functions, if possible
732732
pub enable_window_limits: bool, default = true
733733

734-
/// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
734+
/// When set to true, the optimizer will attempt to push down TopK dynamic filters
735+
/// into the file scan phase.
736+
pub enable_topk_dynamic_filter_pushdown: bool, default = true
737+
738+
/// When set to true, the optimizer will attempt to push down Join dynamic filters
739+
/// into the file scan phase.
740+
pub enable_join_dynamic_filter_pushdown: bool, default = true
741+
742+
/// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase.
735743
/// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
736744
/// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
737745
/// This means that if we already have 10 timestamps in the year 2025
738746
/// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
747+
/// The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown`
748+
/// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
739749
pub enable_dynamic_filter_pushdown: bool, default = true
740750

741751
/// When set to true, the optimizer will insert filters before a join between
@@ -1025,6 +1035,20 @@ impl ConfigOptions {
10251035
};
10261036

10271037
if prefix == "datafusion" {
1038+
if key == "optimizer.enable_dynamic_filter_pushdown" {
1039+
let bool_value = value.parse::<bool>().map_err(|e| {
1040+
DataFusionError::Configuration(format!(
1041+
"Failed to parse '{value}' as bool: {e}",
1042+
))
1043+
})?;
1044+
1045+
{
1046+
self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1047+
self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1048+
self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1049+
}
1050+
return Ok(());
1051+
}
10281052
return ConfigField::set(self, key, value);
10291053
}
10301054

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,7 @@ impl ExecutionPlan for HashJoinExec {
11371137

11381138
// Add dynamic filters in Post phase if enabled
11391139
if matches!(phase, FilterPushdownPhase::Post)
1140-
&& config.optimizer.enable_dynamic_filter_pushdown
1140+
&& config.optimizer.enable_join_dynamic_filter_pushdown
11411141
{
11421142
// Add actual dynamic filter to right side (probe side)
11431143
let dynamic_filter = Self::create_dynamic_filter(&self.on);

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,7 @@ impl ExecutionPlan for SortExec {
13761376
ChildFilterDescription::from_child(&parent_filters, self.input())?;
13771377

13781378
if let Some(filter) = &self.filter {
1379-
if config.optimizer.enable_dynamic_filter_pushdown {
1379+
if config.optimizer.enable_topk_dynamic_filter_pushdown {
13801380
child = child.with_self_filter(filter.read().expr());
13811381
}
13821382
}

0 commit comments

Comments
 (0)