Skip to content

Commit ace4717

Browse files
authored
End DataFusion stream early if file can be pruned away by dynamic expr (#5874)
This PR adds an additional pruning check on every poll of the File stream, so if the underlying dynamic expression is updated, the stream will stop. Signed-off-by: Adam Gutglick <[email protected]>
1 parent aac464c commit ace4717

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

vortex-datafusion/src/persistent/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod metrics;
99
mod opener;
1010
mod sink;
1111
mod source;
12+
mod stream;
1213

1314
pub use access_plan::VortexAccessPlan;
1415
pub use format::VortexFormat;

vortex-datafusion/src/persistent/opener.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use crate::VortexAccessPlan;
5151
use crate::convert::exprs::ExpressionConvertor;
5252
use crate::convert::exprs::can_be_pushed_down;
5353
use crate::convert::exprs::make_vortex_predicate;
54+
use crate::persistent::stream::PrunableStream;
5455

5556
#[derive(Clone)]
5657
pub(crate) struct VortexOpener {
@@ -336,7 +337,11 @@ impl FileOpener for VortexOpener {
336337
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
337338
.boxed();
338339

339-
Ok(stream)
340+
if let Some(file_pruner) = file_pruner {
341+
Ok(PrunableStream::new(file_pruner, stream).boxed())
342+
} else {
343+
Ok(stream)
344+
}
340345
}
341346
.in_current_span()
342347
.boxed())
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::pin::Pin;
5+
use std::task::Context;
6+
use std::task::Poll;
7+
8+
use datafusion_common::Result as DFResult;
9+
use datafusion_common::arrow::array::RecordBatch;
10+
use datafusion_pruning::FilePruner;
11+
use futures::Stream;
12+
use futures::StreamExt;
13+
use futures::stream::BoxStream;
14+
15+
/// Utility to end a stream early if its backing [`PartitionFile`] can be pruned away by an updated dynamic expression.
16+
pub(crate) struct PrunableStream {
17+
file_pruner: FilePruner,
18+
stream: BoxStream<'static, DFResult<RecordBatch>>,
19+
}
20+
21+
impl PrunableStream {
22+
pub fn new(file_pruner: FilePruner, stream: BoxStream<'static, DFResult<RecordBatch>>) -> Self {
23+
Self {
24+
file_pruner,
25+
stream,
26+
}
27+
}
28+
}
29+
30+
impl Stream for PrunableStream {
31+
type Item = DFResult<RecordBatch>;
32+
33+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34+
if self.as_mut().file_pruner.should_prune()? {
35+
Poll::Ready(None)
36+
} else {
37+
self.stream.poll_next_unpin(cx)
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)