Skip to content

Commit 9b5823c

Browse files
committed
End DataFusion stream early if file can be pruned away by dynamic expr
Signed-off-by: Adam Gutglick <[email protected]>
1 parent 6b526d3 commit 9b5823c

File tree

3 files changed

+46
-1
lines changed

3 files changed

+46
-1
lines changed

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod metrics;
99
mod opener;
1010
mod sink;
1111
mod source;
12+
mod stream;
1213

1314
pub use access_plan::VortexAccessPlan;
1415
pub use format::VortexFormat;

vortex-datafusion/src/persistent/opener.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use crate::VortexAccessPlan;
5151
use crate::convert::exprs::ExpressionConvertor;
5252
use crate::convert::exprs::can_be_pushed_down;
5353
use crate::convert::exprs::make_vortex_predicate;
54+
use crate::persistent::stream::PrunableStream;
5455

5556
#[derive(Clone)]
5657
pub(crate) struct VortexOpener {
@@ -336,7 +337,11 @@ impl FileOpener for VortexOpener {
336337
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
337338
.boxed();
338339

339-
Ok(stream)
340+
if let Some(file_pruner) = file_pruner {
341+
Ok(PrunableStream::new(file_pruner, stream).boxed())
342+
} else {
343+
Ok(stream)
344+
}
340345
}
341346
.in_current_span()
342347
.boxed())
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use std::pin::Pin;
2+
use std::task::Context;
3+
use std::task::Poll;
4+
5+
use datafusion_common::Result as DFResult;
6+
use datafusion_common::arrow::array::RecordBatch;
7+
use datafusion_pruning::FilePruner;
8+
use futures::Stream;
9+
use futures::StreamExt;
10+
use futures::stream::BoxStream;
11+
12+
/// Utility to end a stream early if its backing [`PartitionFile`] can be pruned away by an updated dynamic expression.
13+
pub(crate) struct PrunableStream {
14+
file_pruner: FilePruner,
15+
stream: BoxStream<'static, DFResult<RecordBatch>>,
16+
}
17+
18+
impl PrunableStream {
19+
pub fn new(file_pruner: FilePruner, stream: BoxStream<'static, DFResult<RecordBatch>>) -> Self {
20+
Self {
21+
file_pruner,
22+
stream,
23+
}
24+
}
25+
}
26+
27+
impl Stream for PrunableStream {
28+
type Item = DFResult<RecordBatch>;
29+
30+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
31+
match self.as_mut().file_pruner.should_prune() {
32+
Ok(true) => return Poll::Ready(None),
33+
Ok(false) => {}
34+
Err(e) => return Poll::Ready(Some(Err(e))),
35+
};
36+
37+
self.stream.poll_next_unpin(cx)
38+
}
39+
}

0 commit comments

Comments
 (0)