Skip to content

Commit e501fac

Browse files
committed
Do not buffer if a dynamic filter crossed BufferExec
1 parent 36508a9 commit e501fac

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

datafusion/physical-plan/src/buffer.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,20 @@ impl ExecutionPlan for BufferExec {
262262
child_pushdown_result: ChildPushdownResult,
263263
_config: &ConfigOptions,
264264
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
265-
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
265+
// If there is a dynamic filter being pushed down through this node, we don't want to buffer,
266+
// we prefer to give a chance to the dynamic filter to be populated with something rather
267+
// than eagerly polling data with an empty dynamic filter.
268+
let has_dynamic_filter = child_pushdown_result
269+
.parent_filters
270+
.iter()
271+
.any(|v| is_dynamic_physical_expr(&v.filter));
272+
if has_dynamic_filter {
273+
let mut result = FilterPushdownPropagation::if_all(child_pushdown_result);
274+
result.updated_node = Some(Arc::clone(self.input()));
275+
Ok(result)
276+
} else {
277+
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
278+
}
266279
}
267280

268281
fn try_pushdown_sort(

0 commit comments

Comments
 (0)