Skip to content

Commit c16e4de

Browse files
authored
Add SplitBy to VortexOpenOptions (#1858)
Although it's not currently respected
1 parent 4982ebe commit c16e4de

File tree

10 files changed

+173
-3
lines changed

10 files changed

+173
-3
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-file/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ futures = { workspace = true, features = ["std"] }
2323
futures-executor = { workspace = true }
2424
futures-util = { workspace = true }
2525
itertools = { workspace = true }
26-
once_cell = { workspace = true }
2726
tokio = { workspace = true, features = ["rt"] }
2827
tracing = { workspace = true, optional = true }
2928
vortex-array = { workspace = true }

vortex-file/src/v2/file.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::io::Read;
2+
use std::ops::Range;
23
use std::sync::Arc;
34

45
use futures_util::stream;
@@ -20,8 +21,13 @@ pub struct VortexFile<R> {
2021
pub(crate) layout: LayoutData,
2122
pub(crate) segments: Vec<Segment>,
2223
pub(crate) segment_cache: SegmentCache,
24+
// TODO(ngates): not yet used by the file reader
25+
#[allow(dead_code)]
26+
pub(crate) splits: Vec<Range<u64>>,
2327
}
2428

29+
impl<R> VortexFile<R> {}
30+
2531
/// Async implementation of Vortex File.
2632
impl<R: VortexReadAt> VortexFile<R> {
2733
/// Returns the number of rows in the file.
@@ -36,7 +42,7 @@ impl<R: VortexReadAt> VortexFile<R> {
3642

3743
/// Performs a scan operation over the file.
3844
pub fn scan(&self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + '_> {
39-
// Create a shared reader
45+
// Create a shared reader for the scan.
4046
let reader: Arc<dyn LayoutReader> = self.layout.reader(self.ctx.clone())?;
4147
let result_dtype = scan.result_dtype(self.dtype())?;
4248

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
mod split_by;
2+
13
use std::io::Read;
4+
use std::ops::Range;
25

36
use flatbuffers::root;
47
use itertools::Itertools;
8+
pub use split_by::*;
59
use vortex_array::ContextRef;
610
use vortex_buffer::{ByteBuffer, ByteBufferMut};
711
use vortex_dtype::DType;
@@ -32,6 +36,7 @@ pub struct OpenOptions {
3236
// additional caching, metrics, or other intercepts, etc. It should support synchronous
3337
// read + write of Map<MessageId, ByteBuffer> or similar.
3438
initial_read_size: u64,
39+
split_by: SplitBy,
3540
}
3641

3742
impl OpenOptions {
@@ -42,6 +47,7 @@ impl OpenOptions {
4247
file_layout: None,
4348
dtype: None,
4449
initial_read_size: INITIAL_READ_SIZE,
50+
split_by: SplitBy::Layout,
4551
}
4652
}
4753

@@ -53,6 +59,14 @@ impl OpenOptions {
5359
self.initial_read_size = initial_read_size;
5460
Ok(self)
5561
}
62+
63+
/// Configure how to split the file into batches for reading.
64+
///
65+
/// Defaults to [`SplitBy::Layout`].
66+
pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
67+
self.split_by = split_by;
68+
self
69+
}
5670
}
5771

5872
impl OpenOptions {
@@ -120,13 +134,17 @@ impl OpenOptions {
120134
&mut segment_cache,
121135
)?;
122136

137+
// Compute the splits of the file.
138+
let splits: Vec<Range<u64>> = self.split_by.splits(&file_layout.root_layout)?;
139+
123140
// Finally, create the VortexFile.
124141
Ok(VortexFile {
125142
read,
126143
ctx: self.ctx.clone(),
127144
layout: file_layout.root_layout,
128145
segments: file_layout.segments,
129146
segment_cache,
147+
splits,
130148
})
131149
}
132150

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use std::collections::BTreeSet;
2+
use std::ops::Range;
3+
4+
use itertools::Itertools;
5+
use vortex_error::VortexResult;
6+
use vortex_layout::LayoutData;
7+
8+
/// Defines how the Vortex file is split into batches for reading.
9+
///
10+
/// Note that each split must fit into the platform's maximum usize.
11+
#[derive(Copy, Clone)]
12+
pub enum SplitBy {
13+
/// Splits any time there is a chunk boundary in the file.
14+
Layout,
15+
/// Splits every n rows.
16+
RowCount(usize),
17+
// UncompressedSize(u64),
18+
}
19+
20+
impl SplitBy {
21+
/// Compute the splits for the given layout.
22+
pub(crate) fn splits(&self, layout: &LayoutData) -> VortexResult<Vec<Range<u64>>> {
23+
Ok(match *self {
24+
SplitBy::Layout => {
25+
let mut row_splits = BTreeSet::<u64>::new();
26+
// Make sure we always have the first and last row.
27+
row_splits.insert(0);
28+
row_splits.insert(layout.row_count());
29+
// Register the splits for all the layouts.
30+
layout.register_splits(0, &mut row_splits)?;
31+
row_splits
32+
.into_iter()
33+
.tuple_windows()
34+
.map(|(start, end)| start..end)
35+
.collect()
36+
}
37+
SplitBy::RowCount(n) => {
38+
let row_count = layout.row_count();
39+
let mut splits =
40+
Vec::with_capacity(usize::try_from((row_count + n as u64) / n as u64)?);
41+
for start in (0..row_count).step_by(n) {
42+
let end = (start + n as u64).min(row_count);
43+
splits.push(start..end);
44+
}
45+
splits
46+
}
47+
})
48+
}
49+
}
50+
51+
#[cfg(test)]
52+
mod test {
53+
use vortex_array::IntoArrayData;
54+
use vortex_buffer::buffer;
55+
use vortex_dtype::DType;
56+
use vortex_dtype::Nullability::NonNullable;
57+
use vortex_layout::layouts::flat::writer::FlatLayoutWriter;
58+
use vortex_layout::strategies::LayoutWriterExt;
59+
60+
use super::*;
61+
use crate::v2::segments::BufferedSegmentWriter;
62+
63+
#[test]
64+
fn test_layout_splits_flat() {
65+
let mut segments = BufferedSegmentWriter::default();
66+
let layout = FlatLayoutWriter::new(DType::Bool(NonNullable))
67+
.push_one(&mut segments, buffer![1; 10].into_array())
68+
.unwrap();
69+
let splits = SplitBy::Layout.splits(&layout).unwrap();
70+
assert_eq!(splits, vec![0..10]);
71+
}
72+
73+
#[test]
74+
fn test_row_count_splits() {
75+
let mut segments = BufferedSegmentWriter::default();
76+
let layout = FlatLayoutWriter::new(DType::Bool(NonNullable))
77+
.push_one(&mut segments, buffer![1; 10].into_array())
78+
.unwrap();
79+
let splits = SplitBy::RowCount(3).splits(&layout).unwrap();
80+
assert_eq!(splits, vec![0..3, 3..6, 6..9, 9..10]);
81+
}
82+
}

vortex-layout/src/data.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::BTreeSet;
12
use std::ops::Deref;
23
use std::sync::Arc;
34

@@ -251,6 +252,11 @@ impl LayoutData {
251252
pub fn reader(&self, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader + 'static>> {
252253
self.encoding().reader(self.clone(), ctx)
253254
}
255+
256+
/// Register splits for this layout.
257+
pub fn register_splits(&self, row_offset: u64, splits: &mut BTreeSet<u64>) -> VortexResult<()> {
258+
self.encoding().register_splits(self, row_offset, splits)
259+
}
254260
}
255261

256262
impl FlatBufferRoot for LayoutData {}

vortex-layout/src/encoding.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::BTreeSet;
12
use std::fmt::{Debug, Display, Formatter};
23
use std::sync::Arc;
34

@@ -23,6 +24,22 @@ pub trait LayoutEncoding: Debug + Send + Sync {
2324
///
2425
/// May panic if the provided `LayoutData` is not the same encoding as this `LayoutEncoding`.
2526
fn reader(&self, layout: LayoutData, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader>>;
27+
28+
/// Register the row splits for this layout, these represent natural boundaries at which
29+
/// a reader can split the layout for independent processing.
30+
///
31+
/// For example, a ChunkedLayout would register a boundary at the end of every chunk.
32+
///
33+
/// The layout is passed a `row_offset` that identifies the starting row of the layout within
34+
/// the file.
35+
// TODO(ngates): we should check whether this is actually performant enough since we visit
36+
// all nodes of the layout tree, often registering the same splits many times.
37+
fn register_splits(
38+
&self,
39+
layout: &LayoutData,
40+
row_offset: u64,
41+
splits: &mut BTreeSet<u64>,
42+
) -> VortexResult<()>;
2643
}
2744

2845
pub type LayoutEncodingRef = &'static dyn LayoutEncoding;

vortex-layout/src/layouts/chunked/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod reader;
44
pub mod stats_table;
55
pub mod writer;
66

7+
use std::collections::BTreeSet;
78
use std::sync::Arc;
89

910
use vortex_array::ContextRef;
@@ -30,4 +31,21 @@ impl LayoutEncoding for ChunkedLayout {
3031
fn reader(&self, layout: LayoutData, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader>> {
3132
Ok(ChunkedReader::try_new(layout, ctx)?.into_arc())
3233
}
34+
35+
fn register_splits(
36+
&self,
37+
layout: &LayoutData,
38+
row_offset: u64,
39+
splits: &mut BTreeSet<u64>,
40+
) -> VortexResult<()> {
41+
let nchunks = layout.nchildren() - (if layout.metadata().is_some() { 1 } else { 0 });
42+
let mut offset = row_offset;
43+
for i in 0..nchunks {
44+
let child = layout.child(i, layout.dtype().clone())?;
45+
child.register_splits(offset, splits)?;
46+
offset += child.row_count();
47+
splits.insert(offset);
48+
}
49+
Ok(())
50+
}
3351
}

vortex-layout/src/layouts/flat/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod reader;
33
// mod stats;
44
pub mod writer;
55

6+
use std::collections::BTreeSet;
67
use std::sync::Arc;
78

89
use vortex_array::ContextRef;
@@ -24,4 +25,14 @@ impl LayoutEncoding for FlatLayout {
2425
fn reader(&self, layout: LayoutData, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader>> {
2526
Ok(FlatReader::try_new(layout, ctx)?.into_arc())
2627
}
28+
29+
fn register_splits(
30+
&self,
31+
layout: &LayoutData,
32+
row_offset: u64,
33+
splits: &mut BTreeSet<u64>,
34+
) -> VortexResult<()> {
35+
splits.insert(row_offset + layout.row_count());
36+
Ok(())
37+
}
2738
}

vortex-layout/src/layouts/struct_/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod scan;
22
pub mod writer;
33

4+
use std::collections::BTreeSet;
45
use std::sync::Arc;
56

67
use vortex_array::ContextRef;
@@ -23,4 +24,17 @@ impl LayoutEncoding for StructLayout {
2324
fn reader(&self, layout: LayoutData, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader>> {
2425
Ok(StructScan::try_new(layout, ctx)?.into_arc())
2526
}
27+
28+
fn register_splits(
29+
&self,
30+
layout: &LayoutData,
31+
row_offset: u64,
32+
splits: &mut BTreeSet<u64>,
33+
) -> VortexResult<()> {
34+
for child_idx in 0..layout.nchildren() {
35+
let child = layout.child(child_idx, layout.dtype().clone())?;
36+
child.register_splits(row_offset, splits)?;
37+
}
38+
Ok(())
39+
}
2640
}

0 commit comments

Comments
 (0)