Skip to content

Commit a81c606

Browse files
authored
Vortex Layouts File V2 (#1830)
Introduces a V2 file reader/writer based around Vortex Layouts
1 parent db97964 commit a81c606

File tree

22 files changed

+1248
-18
lines changed

22 files changed

+1248
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ vortex-flatbuffers = { version = "0.21.1", path = "./vortex-flatbuffers" }
149149
vortex-fsst = { version = "0.21.1", path = "./encodings/fsst" }
150150
vortex-io = { version = "0.21.1", path = "./vortex-io" }
151151
vortex-ipc = { version = "0.21.1", path = "./vortex-ipc" }
152+
vortex-layout = { version = "0.21.1", path = "./vortex-layout" }
152153
vortex-proto = { version = "0.21.1", path = "./vortex-proto" }
153154
vortex-roaring = { version = "0.21.1", path = "./encodings/roaring" }
154155
vortex-runend = { version = "0.21.1", path = "./encodings/runend" }

vortex-file/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ vortex-expr = { workspace = true }
3434
vortex-flatbuffers = { workspace = true, features = ["file"] }
3535
vortex-io = { workspace = true }
3636
vortex-ipc = { workspace = true }
37+
vortex-layout = { workspace = true }
3738
vortex-scalar = { workspace = true, features = ["flatbuffers"] }
3839

3940
[dev-dependencies]

vortex-file/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ mod write;
7272
mod byte_range;
7373
#[cfg(test)]
7474
mod tests;
75+
pub mod v2;
7576

7677
/// The current version of the Vortex file format
7778
pub const VERSION: u16 = 1;

vortex-file/src/v2/file.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use std::io::Read;
2+
3+
use futures_util::stream;
4+
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
5+
use vortex_array::ContextRef;
6+
use vortex_dtype::DType;
7+
use vortex_error::VortexResult;
8+
use vortex_io::VortexReadAt;
9+
use vortex_layout::scanner::{Poll, Scan};
10+
use vortex_layout::{LayoutData, RowMask};
11+
12+
use crate::v2::footer::Segment;
13+
use crate::v2::segments::SegmentCache;
14+
15+
pub struct VortexFile<R> {
16+
pub(crate) read: R,
17+
pub(crate) ctx: ContextRef,
18+
pub(crate) layout: LayoutData,
19+
pub(crate) segments: Vec<Segment>,
20+
pub(crate) segment_cache: SegmentCache,
21+
}
22+
23+
/// Async implementation of Vortex File.
24+
impl<R: VortexReadAt> VortexFile<R> {
25+
/// Returns the number of rows in the file.
26+
pub fn row_count(&self) -> u64 {
27+
self.layout.row_count()
28+
}
29+
30+
/// Returns the DType of the file.
31+
pub fn dtype(&self) -> &DType {
32+
self.layout.dtype()
33+
}
34+
35+
/// Performs a scan operation over the file.
36+
pub fn scan(&self, scan: Scan) -> VortexResult<impl ArrayStream + '_> {
37+
let layout_scan = self.layout.new_scan(scan, self.ctx.clone())?;
38+
let scan_dtype = layout_scan.dtype().clone();
39+
40+
// TODO(ngates): we could query the layout for splits and then process them in parallel.
41+
// For now, we just scan the entire layout with one mask.
42+
// Note that to implement this we would use stream::try_unfold
43+
let row_mask = RowMask::new_valid_between(0, layout_scan.layout().row_count());
44+
let mut scanner = layout_scan.create_scanner(row_mask)?;
45+
46+
let stream = stream::once(async move {
47+
loop {
48+
match scanner.poll(&self.segment_cache)? {
49+
Poll::Some(array) => return Ok(array),
50+
Poll::NeedMore(segment_ids) => {
51+
for segment_id in segment_ids {
52+
let segment = &self.segments[*segment_id as usize];
53+
let bytes = self
54+
.read
55+
.read_byte_range(segment.offset, segment.length as u64)
56+
.await?;
57+
self.segment_cache.set(segment_id, bytes);
58+
}
59+
}
60+
}
61+
}
62+
});
63+
64+
Ok(ArrayStreamAdapter::new(scan_dtype, stream))
65+
}
66+
}
67+
68+
/// Sync implementation of Vortex File.
69+
impl<R: Read> VortexFile<R> {}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer};
2+
use vortex_layout::LayoutData;
3+
4+
use crate::v2::footer::segment::Segment;
5+
6+
/// Captures the layout information of a Vortex file.
7+
#[derive(Clone)]
8+
pub(crate) struct FileLayout {
9+
pub(crate) root_layout: LayoutData,
10+
pub(crate) segments: Vec<Segment>,
11+
}
12+
13+
impl FlatBufferRoot for FileLayout {}
14+
15+
impl WriteFlatBuffer for FileLayout {
16+
type Target<'a> = fb::FileLayout<'a>;
17+
18+
fn write_flatbuffer<'fb>(
19+
&self,
20+
fbb: &mut flatbuffers::FlatBufferBuilder<'fb>,
21+
) -> flatbuffers::WIPOffset<Self::Target<'fb>> {
22+
let root_layout = self.root_layout.write_flatbuffer(fbb);
23+
24+
let segments = self
25+
.segments
26+
.iter()
27+
.map(|segment| segment.write_flatbuffer(fbb))
28+
.collect::<Vec<_>>();
29+
let segments = fbb.create_vector(&segments);
30+
31+
fb::FileLayout::create(
32+
fbb,
33+
&fb::FileLayoutArgs {
34+
root_layout: Some(root_layout),
35+
segments: Some(segments),
36+
},
37+
)
38+
}
39+
}

vortex-file/src/v2/footer/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mod file_layout;
2+
mod postscript;
3+
mod segment;
4+
5+
pub(crate) use file_layout::*;
6+
pub(crate) use postscript::*;
7+
pub(crate) use segment::*;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use flatbuffers::Follow;
2+
use vortex_error::{vortex_err, VortexError};
3+
use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer};
4+
5+
use crate::v2::footer::segment::Segment;
6+
7+
/// Captures the layout information of a Vortex file.
8+
pub(crate) struct Postscript {
9+
pub(crate) dtype: Segment,
10+
pub(crate) file_layout: Segment,
11+
}
12+
13+
impl FlatBufferRoot for Postscript {}
14+
15+
impl WriteFlatBuffer for Postscript {
16+
type Target<'a> = fb::Postscript<'a>;
17+
18+
fn write_flatbuffer<'fb>(
19+
&self,
20+
fbb: &mut flatbuffers::FlatBufferBuilder<'fb>,
21+
) -> flatbuffers::WIPOffset<Self::Target<'fb>> {
22+
let dtype = self.dtype.write_flatbuffer(fbb);
23+
let file_layout = self.file_layout.write_flatbuffer(fbb);
24+
fb::Postscript::create(
25+
fbb,
26+
&fb::PostscriptArgs {
27+
dtype: Some(dtype),
28+
file_layout: Some(file_layout),
29+
},
30+
)
31+
}
32+
}
33+
34+
impl ReadFlatBuffer for Postscript {
35+
type Source<'a> = fb::Postscript<'a>;
36+
type Error = VortexError;
37+
38+
fn read_flatbuffer<'buf>(
39+
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
40+
) -> Result<Self, Self::Error> {
41+
Ok(Self {
42+
dtype: Segment::read_flatbuffer(
43+
&fb.dtype()
44+
.ok_or_else(|| vortex_err!("Postscript missing dtype segment"))?,
45+
)?,
46+
file_layout: Segment::read_flatbuffer(
47+
&fb.file_layout()
48+
.ok_or_else(|| vortex_err!("Postscript missing file_layout segment"))?,
49+
)?,
50+
})
51+
}
52+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
2+
use vortex_error::{vortex_err, VortexError};
3+
use vortex_flatbuffers::{footer2 as fb, ReadFlatBuffer, WriteFlatBuffer};
4+
5+
/// The location of a segment within a Vortex file.
6+
#[derive(Clone, Debug)]
7+
pub(crate) struct Segment {
8+
pub(crate) offset: u64,
9+
pub(crate) length: usize,
10+
}
11+
12+
impl WriteFlatBuffer for Segment {
13+
type Target<'a> = fb::Segment<'a>;
14+
15+
fn write_flatbuffer<'fb>(
16+
&self,
17+
fbb: &mut FlatBufferBuilder<'fb>,
18+
) -> WIPOffset<Self::Target<'fb>> {
19+
fb::Segment::create(
20+
fbb,
21+
&fb::SegmentArgs {
22+
offset: self.offset,
23+
length: self.length as u64,
24+
},
25+
)
26+
}
27+
}
28+
29+
impl ReadFlatBuffer for Segment {
30+
type Source<'a> = fb::Segment<'a>;
31+
type Error = VortexError;
32+
33+
fn read_flatbuffer<'buf>(
34+
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
35+
) -> Result<Self, Self::Error> {
36+
Ok(Self {
37+
offset: fb.offset(),
38+
length: usize::try_from(fb.length())
39+
.map_err(|_| vortex_err!("segment length exceeds maximum usize"))?,
40+
})
41+
}
42+
}

vortex-file/src/v2/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
mod file;
2+
mod footer;
3+
mod open;
4+
mod segments;
5+
mod strategy;
6+
#[cfg(test)]
7+
mod tests;
8+
mod writer;
9+
10+
pub use file::*;
11+
pub use open::*;
12+
pub use writer::*;

0 commit comments

Comments
 (0)