From 20eebb8bd9bdf3737d3b3896916ea5ef7c14fad6 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 6 Jan 2026 18:18:07 +0000 Subject: [PATCH] End DataFusion stream early if file can be pruned away by dynamic expr Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/mod.rs | 1 + vortex-datafusion/src/persistent/opener.rs | 7 +++- vortex-datafusion/src/persistent/stream.rs | 40 ++++++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 vortex-datafusion/src/persistent/stream.rs diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 2e7f419cd99..a409c4a4d27 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -9,6 +9,7 @@ pub mod metrics; mod opener; mod sink; mod source; +mod stream; pub use access_plan::VortexAccessPlan; pub use format::VortexFormat; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 5553ef994f8..b5474f7bc1e 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -51,6 +51,7 @@ use crate::VortexAccessPlan; use crate::convert::exprs::ExpressionConvertor; use crate::convert::exprs::can_be_pushed_down; use crate::convert::exprs::make_vortex_predicate; +use crate::persistent::stream::PrunableStream; #[derive(Clone)] pub(crate) struct VortexOpener { @@ -336,7 +337,11 @@ impl FileOpener for VortexOpener { .map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b))) .boxed(); - Ok(stream) + if let Some(file_pruner) = file_pruner { + Ok(PrunableStream::new(file_pruner, stream).boxed()) + } else { + Ok(stream) + } } .in_current_span() .boxed()) diff --git a/vortex-datafusion/src/persistent/stream.rs b/vortex-datafusion/src/persistent/stream.rs new file mode 100644 index 00000000000..52d9b7daecc --- /dev/null +++ b/vortex-datafusion/src/persistent/stream.rs @@ -0,0 +1,40 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use datafusion_common::Result as DFResult; +use datafusion_common::arrow::array::RecordBatch; +use datafusion_pruning::FilePruner; +use futures::Stream; +use futures::StreamExt; +use futures::stream::BoxStream; + +/// Utility to end a stream early if its backing [`PartitionFile`] can be pruned away by an updated dynamic expression. +pub(crate) struct PrunableStream { + file_pruner: FilePruner, + stream: BoxStream<'static, DFResult>, +} + +impl PrunableStream { + pub fn new(file_pruner: FilePruner, stream: BoxStream<'static, DFResult>) -> Self { + Self { + file_pruner, + stream, + } + } +} + +impl Stream for PrunableStream { + type Item = DFResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.as_mut().file_pruner.should_prune()? { + Poll::Ready(None) + } else { + self.stream.poll_next_unpin(cx) + } + } +}