Skip to content

Commit 7674a6c

Browse files
authored
[datafusion] Split scan by layout then flat_map record batches to batch_size (#3157)
1 parent 94c0ef8 commit 7674a6c

File tree

1 file changed

+26
-7
lines changed

1 file changed

+26
-7
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use vortex_error::VortexError;
1414
use vortex_expr::{ExprRef, VortexExpr};
1515
use vortex_file::scan::ScanBuilder;
1616
use vortex_layout::LayoutReader;
17-
use vortex_layout::scan::SplitBy;
1817
use vortex_metrics::VortexMetrics;
1918

2019
use super::cache::VortexFileCache;
@@ -97,19 +96,39 @@ impl FileOpener for VortexFileOpener {
9796
let scan_builder = ScanBuilder::new(layout_reader);
9897
let scan_builder = apply_byte_range(file_meta, vxf.row_count(), scan_builder);
9998

100-
Ok(scan_builder
99+
let stream = scan_builder
101100
.with_tokio_executor(Handle::current())
102101
.with_metrics(metrics)
103102
.with_projection(projection)
104103
.with_some_filter(filter)
105-
// DataFusion likes ~8k row batches. Ideally we would respect the config,
106-
// but at the moment our scanner has too much overhead to process small
107-
// batches efficiently.
108-
.with_split_by(SplitBy::RowCount(8 * batch_size))
109104
.map_to_record_batch(projected_arrow_schema.clone())
110105
.into_stream()?
106+
.map_ok(move |rb| {
107+
// We try and slice the stream into respecting datafusion's configured batch size.
108+
futures::stream::iter(
109+
(0..rb.num_rows().div_ceil(batch_size * 2))
110+
.flat_map(move |block_idx| {
111+
let offset = block_idx * batch_size * 2;
112+
113+
// If we have less than two batches worth of rows left, we keep them together as a single batch.
114+
if rb.num_rows() - offset < 2 * batch_size {
115+
let length = rb.num_rows() - offset;
116+
[Some(rb.slice(offset, length)), None].into_iter()
117+
} else {
118+
let first = rb.slice(offset, batch_size);
119+
let second = rb.slice(offset + batch_size, batch_size);
120+
[Some(first), Some(second)].into_iter()
121+
}
122+
})
123+
.flatten()
124+
.map(Ok),
125+
)
126+
})
111127
.map_err(|e: VortexError| ArrowError::from(e))
112-
.boxed())
128+
.try_flatten()
129+
.boxed();
130+
131+
Ok(stream)
113132
}
114133
.boxed())
115134
}

0 commit comments

Comments
 (0)