Skip to content

Commit 5b3b5d7

Browse files
authored
Pass typed flat buffers into layout builders (#1563)
1 parent a1ecb12 commit 5b3b5d7

File tree

10 files changed

+77
-135
lines changed

10 files changed

+77
-135
lines changed

vortex-datafusion/src/persistent/format.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ use vortex_array::arrow::infer_schema;
2222
use vortex_array::Context;
2323
use vortex_file::metadata::fetch_metadata;
2424
use vortex_file::{
25-
read_initial_bytes, read_layout_from_initial, LayoutContext, LayoutDeserializer,
26-
LayoutMessageCache, RelativeLayoutCache, Scan, VORTEX_FILE_EXTENSION,
25+
read_initial_bytes, LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache,
26+
Scan, VORTEX_FILE_EXTENSION,
2727
};
2828
use vortex_io::{IoDispatcher, ObjectStoreReadAt};
2929

@@ -106,9 +106,8 @@ impl FileFormat for VortexFormat {
106106
initial_read.lazy_dtype().into(),
107107
);
108108

109-
let root_layout = read_layout_from_initial(
110-
&initial_read,
111-
&layout_deserializer,
109+
let root_layout = layout_deserializer.read_layout(
110+
initial_read.fb_layout(),
112111
Scan::empty(),
113112
relative_message_cache,
114113
)?;

vortex-file/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@
128128
//! example, the VortexFileArrayStream reads an array, evaluates the row filter, and then reads the
129129
//! array again with the filter mask.
130130
//!
131-
//! [`read_layout_from_initial`] produces a [LayoutReader] which assembles one or more Vortex arrays
132-
//! by reading the serialized data and metadata.
131+
//! A [LayoutReader] then assembles one or more Vortex arrays by reading the serialized data
132+
//! and metadata.
133133
//!
134134
//! # Apache Arrow
135135
//!

vortex-file/src/read/builder/initial_read.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ use vortex_error::{vortex_bail, vortex_err, VortexResult};
66
use vortex_flatbuffers::{footer, message};
77
use vortex_io::VortexReadAt;
88

9-
use crate::{
10-
LayoutDeserializer, LayoutReader, LazyDType, RelativeLayoutCache, Scan, EOF_SIZE,
11-
INITIAL_READ_SIZE, MAGIC_BYTES, VERSION,
12-
};
9+
use crate::{LazyDType, EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION};
1310

1411
#[derive(Debug)]
1512
pub struct InitialRead {
@@ -56,17 +53,6 @@ impl InitialRead {
5653
}
5754
}
5855

59-
pub fn read_layout_from_initial(
60-
initial_read: &InitialRead,
61-
layout_serde: &LayoutDeserializer,
62-
scan: Scan,
63-
message_cache: RelativeLayoutCache,
64-
) -> VortexResult<Box<dyn LayoutReader>> {
65-
let layout_bytes = initial_read.buf.slice(initial_read.fb_layout_byte_range());
66-
let fb_loc = initial_read.fb_layout()._tab.loc();
67-
layout_serde.read_layout(layout_bytes, fb_loc, scan, message_cache)
68-
}
69-
7056
pub async fn read_initial_bytes<R: VortexReadAt>(
7157
read: &R,
7258
file_size: u64,

vortex-file/src/read/builder/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::{Arc, RwLock};
22

3-
use initial_read::{read_initial_bytes, read_layout_from_initial};
3+
use initial_read::read_initial_bytes;
44
use vortex_array::{ArrayDType, ArrayData};
55
use vortex_error::VortexResult;
66
use vortex_expr::Select;
@@ -129,9 +129,8 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
129129
};
130130

131131
let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
132-
let layout_reader = read_layout_from_initial(
133-
&initial_read,
134-
&self.layout_serde,
132+
let layout_reader = self.layout_serde.read_layout(
133+
initial_read.fb_layout(),
135134
Scan::new(match self.projection {
136135
Projection::All => None,
137136
Projection::Flat(p) => Some(Arc::new(Select::include(p))),
@@ -142,9 +141,8 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
142141
let filter_reader = self
143142
.row_filter
144143
.map(|row_filter| {
145-
read_layout_from_initial(
146-
&initial_read,
147-
&self.layout_serde,
144+
self.layout_serde.read_layout(
145+
initial_read.fb_layout(),
148146
Scan::new(Some(Arc::new(row_filter))),
149147
RelativeLayoutCache::new(message_cache.clone(), lazy_dtype),
150148
)

vortex-file/src/read/context.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::fmt::{Debug, Display, Formatter};
22
use std::sync::Arc;
33

4-
use bytes::Bytes;
54
use vortex_array::aliases::hash_map::HashMap;
65
use vortex_array::Context;
76
use vortex_error::{vortex_err, VortexResult};
@@ -24,8 +23,7 @@ pub trait Layout: Debug + Send + Sync {
2423

2524
fn reader(
2625
&self,
27-
fb_bytes: Bytes,
28-
fb_loc: usize,
26+
layout: fb::Layout,
2927
scan: Scan,
3028
layout_serde: LayoutDeserializer,
3129
message_cache: RelativeLayoutCache,
@@ -73,20 +71,15 @@ impl LayoutDeserializer {
7371

7472
pub fn read_layout(
7573
&self,
76-
fb_bytes: Bytes,
77-
fb_loc: usize,
74+
layout: fb::Layout,
7875
scan: Scan,
7976
message_cache: RelativeLayoutCache,
8077
) -> VortexResult<Box<dyn LayoutReader>> {
81-
let fb_layout = unsafe {
82-
let tab = flatbuffers::Table::new(&fb_bytes, fb_loc);
83-
fb::Layout::init_from_table(tab)
84-
};
85-
let layout_id = LayoutId(fb_layout.encoding());
78+
let layout_id = LayoutId(layout.encoding());
8679
self.layout_ctx
8780
.lookup_layout(&layout_id)
8881
.ok_or_else(|| vortex_err!("Unknown layout definition {layout_id}"))?
89-
.reader(fb_bytes, fb_loc, scan, self.clone(), message_cache)
82+
.reader(layout, scan, self.clone(), message_cache)
9083
}
9184

9285
pub(crate) fn ctx(&self) -> Arc<Context> {

vortex-file/src/read/layouts/chunked.rs

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::collections::BTreeSet;
22
use std::sync::{Arc, OnceLock, RwLock};
33

4-
use bytes::Bytes;
54
use itertools::Itertools;
65
use vortex_array::aliases::hash_map::HashMap;
76
use vortex_array::array::ChunkedArray;
@@ -11,7 +10,7 @@ use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
1110
use vortex_dtype::{DType, Nullability, StructDType};
1211
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect as _, VortexResult};
1312
use vortex_expr::Select;
14-
use vortex_flatbuffers::footer;
13+
use vortex_flatbuffers::footer as fb;
1514

1615
use crate::layouts::RangedLayoutReader;
1716
use crate::pruning::PruningPredicate;
@@ -36,16 +35,14 @@ impl Layout for ChunkedLayout {
3635

3736
fn reader(
3837
&self,
39-
fb_bytes: Bytes,
40-
fb_loc: usize,
38+
layout: fb::Layout,
4139
scan: Scan,
4240
layout_builder: LayoutDeserializer,
4341
message_cache: RelativeLayoutCache,
4442
) -> VortexResult<Box<dyn LayoutReader>> {
4543
Ok(Box::new(
4644
ChunkedLayoutBuilder {
47-
fb_bytes,
48-
fb_loc,
45+
layout,
4946
scan,
5047
layout_builder,
5148
message_cache,
@@ -61,35 +58,26 @@ const METADATA_LAYOUT_PART_ID: LayoutPartId = 0;
6158
///
6259
/// First child in the list is the metadata table
6360
/// Subsequent children are consecutive chunks of this layout
64-
struct ChunkedLayoutBuilder {
65-
fb_bytes: Bytes,
66-
fb_loc: usize,
61+
struct ChunkedLayoutBuilder<'a> {
62+
layout: fb::Layout<'a>,
6763
scan: Scan,
6864
layout_builder: LayoutDeserializer,
6965
message_cache: RelativeLayoutCache,
7066
}
7167

72-
impl ChunkedLayoutBuilder {
73-
fn flatbuffer(&self) -> footer::Layout {
74-
unsafe {
75-
let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc);
76-
footer::Layout::init_from_table(tab)
77-
}
78-
}
79-
68+
impl ChunkedLayoutBuilder<'_> {
8069
fn metadata_layout(&self) -> VortexResult<Option<Box<dyn LayoutReader>>> {
81-
self.flatbuffer()
70+
self.layout
8271
.metadata()
8372
.map(|m| {
8473
let set_stats = stats_from_bitset_bytes(m.bytes());
8574
let metadata_fb = self
86-
.flatbuffer()
75+
.layout
8776
.children()
8877
.ok_or_else(|| vortex_err!("Must have children if layout has metadata"))?
8978
.get(0);
9079
self.layout_builder.read_layout(
91-
self.fb_bytes.clone(),
92-
metadata_fb._tab.loc(),
80+
metadata_fb,
9381
Scan::new(Some(Arc::new(Select::include(
9482
set_stats.iter().map(|s| s.to_string().into()).collect(),
9583
)))),
@@ -105,13 +93,13 @@ impl ChunkedLayoutBuilder {
10593
.transpose()
10694
}
10795

108-
fn children(&self) -> impl Iterator<Item = (usize, footer::Layout)> {
109-
self.flatbuffer()
96+
fn children(&self) -> impl Iterator<Item = (usize, fb::Layout)> {
97+
self.layout
11098
.children()
11199
.unwrap_or_default()
112100
.iter()
113101
.enumerate()
114-
.skip(if self.flatbuffer().metadata().is_some() {
102+
.skip(if self.layout.metadata().is_some() {
115103
1
116104
} else {
117105
0
@@ -134,8 +122,7 @@ impl ChunkedLayoutBuilder {
134122
.zip_eq(self.children_ranges())
135123
.map(|((i, c), (begin, end))| {
136124
let layout = self.layout_builder.read_layout(
137-
self.fb_bytes.clone(),
138-
c._tab.loc(),
125+
c,
139126
self.scan.clone(),
140127
self.message_cache
141128
.relative(i as u16, self.message_cache.dtype().clone()),
@@ -420,7 +407,7 @@ mod tests {
420407

421408
use arrow_buffer::BooleanBufferBuilder;
422409
use bytes::Bytes;
423-
use flatbuffers::{root_unchecked, FlatBufferBuilder};
410+
use flatbuffers::{root, FlatBufferBuilder};
424411
use futures_util::TryStreamExt;
425412
use vortex_array::array::{BoolArray, ChunkedArray, PrimitiveArray};
426413
use vortex_array::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant};
@@ -474,30 +461,26 @@ mod tests {
474461
let written = writer.into_inner();
475462

476463
let mut fb = FlatBufferBuilder::new();
464+
// FIXME(ngates): impl From<LayoutSpec> for fb::Layout
477465
let chunked_layout = write::LayoutSpec::chunked(flat_layouts.into(), len as u64, None);
478466
let flat_buf = chunked_layout.write_flatbuffer(&mut fb);
479467
fb.finish_minimal(flat_buf);
480468
let fb_bytes = Bytes::copy_from_slice(fb.finished_data());
481-
482-
let fb_loc = (unsafe { root_unchecked::<footer::Layout>(&fb_bytes) })
483-
._tab
484-
.loc();
469+
let layout = root::<footer::Layout>(&fb_bytes).unwrap();
485470

486471
let dtype = Arc::new(LazyDType::from_dtype(PType::I32.into()));
487472
let layout_builder = LayoutDeserializer::default();
488473
(
489474
ChunkedLayoutBuilder {
490-
fb_bytes: fb_bytes.clone(),
491-
fb_loc,
475+
layout,
492476
scan,
493477
layout_builder: layout_builder.clone(),
494478
message_cache: RelativeLayoutCache::new(cache.clone(), dtype.clone()),
495479
}
496480
.build()
497481
.unwrap(),
498482
ChunkedLayoutBuilder {
499-
fb_bytes,
500-
fb_loc,
483+
layout,
501484
scan: Scan::new(None),
502485
layout_builder,
503486
message_cache: RelativeLayoutCache::new(cache, dtype),

0 commit comments

Comments
 (0)