Skip to content

Commit b706f21

Browse files
authored
Segment Alignment (#1883)
Instead of storing these in the segment map, we could store these in the Array flatbuffer, and then can pass them into `SegmentReader::get(id, alignment)`. The problem with this is that it would require two round trips to load a FlatLayout. One to fetch the array flatbuffer, then we know the alignment of the data buffers, and then a second RT to fetch the data buffers. Unless of course we store data buffer alignment in the FlatLayout metadata? That could work actually... FLUP: move ArrayData flatbuffer into the IPC definition as an ArrayMessage. Move row_count out of ArrayParts. Open Questions: * Who should control / configure compression + encryption? I don't think this should be handled inside the segment writer, since it doesn't know what type of segment it is (data, flatbuffer, etc.). Instead, if the layouts themselves request encryption / compression, then we can configure different properties for different layouts. e.g. different encryption key for each column.
1 parent aeadc23 commit b706f21

File tree

17 files changed

+487
-311
lines changed

17 files changed

+487
-311
lines changed

vortex-array/src/parts.rs

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use std::fmt::{Debug, Formatter};
22

3-
use flatbuffers::Follow;
3+
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
4+
use itertools::Itertools;
45
use vortex_buffer::ByteBuffer;
56
use vortex_dtype::DType;
6-
use vortex_error::{vortex_panic, VortexResult};
7-
use vortex_flatbuffers::{array as fba, FlatBuffer};
7+
use vortex_error::{vortex_panic, VortexExpect, VortexResult};
8+
use vortex_flatbuffers::{
9+
array as fba, FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt,
10+
};
811

12+
use crate::stats::ArrayStatistics;
913
use crate::{ArrayData, ContextRef};
1014

1115
/// [`ArrayParts`] represents the information from an [`ArrayData`] that makes up the serialized
@@ -14,6 +18,7 @@ use crate::{ArrayData, ContextRef};
1418
///
1519
/// An [`ArrayParts`] can be fully decoded into an [`ArrayData`] using the `decode` function.
1620
pub struct ArrayParts {
21+
// TODO(ngates): I think we should remove this. It's not required in the serialized form.
1722
row_count: usize,
1823
// Typed as fb::Array
1924
flatbuffer: FlatBuffer,
@@ -74,3 +79,99 @@ impl ArrayParts {
7479
)
7580
}
7681
}
82+
83+
/// Convert an [`ArrayData`] into [`ArrayParts`].
84+
impl From<ArrayData> for ArrayParts {
85+
fn from(array: ArrayData) -> Self {
86+
let flatbuffer = ArrayPartsFlatBuffer {
87+
array: &array,
88+
buffer_idx: 0,
89+
}
90+
.write_flatbuffer_bytes();
91+
let mut buffers: Vec<ByteBuffer> = vec![];
92+
for child in array.depth_first_traversal() {
93+
for buffer in child.byte_buffers() {
94+
buffers.push(buffer);
95+
}
96+
}
97+
Self {
98+
row_count: array.len(),
99+
flatbuffer,
100+
flatbuffer_loc: 0,
101+
buffers,
102+
}
103+
}
104+
}
105+
106+
/// A utility struct for creating an [`fba::Array`] flatbuffer.
107+
pub struct ArrayPartsFlatBuffer<'a> {
108+
array: &'a ArrayData,
109+
buffer_idx: u16,
110+
}
111+
112+
impl<'a> ArrayPartsFlatBuffer<'a> {
113+
pub fn new(array: &'a ArrayData) -> Self {
114+
Self {
115+
array,
116+
buffer_idx: 0,
117+
}
118+
}
119+
}
120+
121+
impl FlatBufferRoot for ArrayPartsFlatBuffer<'_> {}
122+
123+
impl WriteFlatBuffer for ArrayPartsFlatBuffer<'_> {
124+
type Target<'t> = fba::Array<'t>;
125+
126+
fn write_flatbuffer<'fb>(
127+
&self,
128+
fbb: &mut FlatBufferBuilder<'fb>,
129+
) -> WIPOffset<Self::Target<'fb>> {
130+
let encoding = self.array.encoding().id().code();
131+
let metadata = self
132+
.array
133+
.metadata_bytes()
134+
.vortex_expect("IPCArray is missing metadata during serialization");
135+
let metadata = Some(fbb.create_vector(metadata.as_ref()));
136+
137+
// Assign buffer indices for all child arrays.
138+
let nbuffers = u16::try_from(self.array.nbuffers())
139+
.vortex_expect("Array can have at most u16::MAX buffers");
140+
let child_buffer_idx = self.buffer_idx + nbuffers;
141+
142+
let children = self
143+
.array
144+
.children()
145+
.iter()
146+
.scan(child_buffer_idx, |buffer_idx, child| {
147+
// Update the number of buffers required.
148+
let msg = ArrayPartsFlatBuffer {
149+
array: child,
150+
buffer_idx: *buffer_idx,
151+
}
152+
.write_flatbuffer(fbb);
153+
*buffer_idx = u16::try_from(child.cumulative_nbuffers())
154+
.ok()
155+
.and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
156+
.vortex_expect("Too many buffers (u16) for ArrayData");
157+
Some(msg)
158+
})
159+
.collect_vec();
160+
let children = Some(fbb.create_vector(&children));
161+
162+
let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
163+
164+
let stats = Some(self.array.statistics().write_flatbuffer(fbb));
165+
166+
fba::Array::create(
167+
fbb,
168+
&fba::ArrayArgs {
169+
encoding,
170+
metadata,
171+
children,
172+
buffers,
173+
stats,
174+
},
175+
)
176+
}
177+
}

vortex-buffer/src/alignment.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,22 @@ impl Alignment {
5858
// of trailing zeros in the binary representation of the alignment is greater or equal.
5959
self.0.trailing_zeros() >= other.0.trailing_zeros()
6060
}
61+
62+
/// Returns the log2 of the alignment.
63+
pub fn exponent(&self) -> u8 {
64+
u8::try_from(self.0.trailing_zeros())
65+
.vortex_expect("alignment fits into u16, so exponent fits in u7")
66+
}
67+
68+
/// Create from the log2 exponent of the alignment.
69+
///
70+
/// ## Panics
71+
///
72+
/// Panics if `alignment` is not a power of 2, or is greater than `u16::MAX`.
73+
#[inline]
74+
pub const fn from_exponent(exponent: u8) -> Self {
75+
Self::new(1 << exponent)
76+
}
6177
}
6278

6379
impl Display for Alignment {
@@ -120,6 +136,13 @@ mod test {
120136
Alignment::new(3);
121137
}
122138

139+
#[test]
140+
fn alignment_exponent() {
141+
let alignment = Alignment::new(1024);
142+
assert_eq!(alignment.exponent(), 10);
143+
assert_eq!(Alignment::from_exponent(10), alignment);
144+
}
145+
123146
#[test]
124147
fn is_aligned_to() {
125148
assert!(Alignment::new(1).is_aligned_to(Alignment::new(1)));

vortex-file/src/v2/footer/file_layout.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,7 @@ impl WriteFlatBuffer for FileLayout {
2222
fbb: &mut flatbuffers::FlatBufferBuilder<'fb>,
2323
) -> flatbuffers::WIPOffset<Self::Target<'fb>> {
2424
let root_layout = self.root_layout.write_flatbuffer(fbb);
25-
26-
let segments = self
27-
.segments
28-
.iter()
29-
.map(|segment| segment.write_flatbuffer(fbb))
30-
.collect::<Vec<_>>();
31-
let segments = fbb.create_vector(&segments);
32-
25+
let segments = fbb.create_vector_from_iter(self.segments.iter().map(fb::Segment::from));
3326
fb::FileLayout::create(
3427
fbb,
3528
&fb::FileLayoutArgs {

vortex-file/src/v2/footer/postscript.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ impl WriteFlatBuffer for Postscript {
1919
&self,
2020
fbb: &mut flatbuffers::FlatBufferBuilder<'fb>,
2121
) -> flatbuffers::WIPOffset<Self::Target<'fb>> {
22-
let dtype = self.dtype.write_flatbuffer(fbb);
23-
let file_layout = self.file_layout.write_flatbuffer(fbb);
22+
let dtype = fb::Segment::from(&self.dtype);
23+
let file_layout = fb::Segment::from(&self.file_layout);
2424
fb::Postscript::create(
2525
fbb,
2626
&fb::PostscriptArgs {
27-
dtype: Some(dtype),
28-
file_layout: Some(file_layout),
27+
dtype: Some(&dtype),
28+
file_layout: Some(&file_layout),
2929
},
3030
)
3131
}
@@ -39,12 +39,12 @@ impl ReadFlatBuffer for Postscript {
3939
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
4040
) -> Result<Self, Self::Error> {
4141
Ok(Self {
42-
dtype: Segment::read_flatbuffer(
43-
&fb.dtype()
42+
dtype: Segment::try_from(
43+
fb.dtype()
4444
.ok_or_else(|| vortex_err!("Postscript missing dtype segment"))?,
4545
)?,
46-
file_layout: Segment::read_flatbuffer(
47-
&fb.file_layout()
46+
file_layout: Segment::try_from(
47+
fb.file_layout()
4848
.ok_or_else(|| vortex_err!("Postscript missing file_layout segment"))?,
4949
)?,
5050
})
Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,29 @@
1-
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
2-
use vortex_error::{vortex_err, VortexError};
3-
use vortex_flatbuffers::{footer2 as fb, ReadFlatBuffer, WriteFlatBuffer};
1+
use vortex_buffer::Alignment;
2+
use vortex_error::VortexError;
3+
use vortex_flatbuffers::footer2 as fb;
44

55
/// The location of a segment within a Vortex file.
66
#[derive(Clone, Debug)]
77
pub(crate) struct Segment {
88
pub(crate) offset: u64,
9-
pub(crate) length: usize,
9+
pub(crate) length: u32,
10+
pub(crate) alignment: Alignment,
1011
}
1112

12-
impl WriteFlatBuffer for Segment {
13-
type Target<'a> = fb::Segment<'a>;
14-
15-
fn write_flatbuffer<'fb>(
16-
&self,
17-
fbb: &mut FlatBufferBuilder<'fb>,
18-
) -> WIPOffset<Self::Target<'fb>> {
19-
fb::Segment::create(
20-
fbb,
21-
&fb::SegmentArgs {
22-
offset: self.offset,
23-
length: self.length as u64,
24-
},
25-
)
13+
impl From<&Segment> for fb::Segment {
14+
fn from(value: &Segment) -> Self {
15+
fb::Segment::new(value.offset, value.length, value.alignment.exponent(), 0, 0)
2616
}
2717
}
2818

29-
impl ReadFlatBuffer for Segment {
30-
type Source<'a> = fb::Segment<'a>;
19+
impl TryFrom<&fb::Segment> for Segment {
3120
type Error = VortexError;
3221

33-
fn read_flatbuffer<'buf>(
34-
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
35-
) -> Result<Self, Self::Error> {
22+
fn try_from(value: &fb::Segment) -> Result<Self, Self::Error> {
3623
Ok(Self {
37-
offset: fb.offset(),
38-
length: usize::try_from(fb.length())
39-
.map_err(|_| vortex_err!("segment length exceeds maximum usize"))?,
24+
offset: value.offset(),
25+
length: value.length(),
26+
alignment: Alignment::from_exponent(value.alignment_exponent()),
4027
})
4128
}
4229
}

vortex-file/src/v2/open/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ impl OpenOptions {
183183
) -> VortexResult<DType> {
184184
let offset = usize::try_from(dtype.offset - initial_offset)?;
185185
let sliced_buffer =
186-
FlatBuffer::align_from(initial_read.slice(offset..offset + dtype.length));
186+
FlatBuffer::align_from(initial_read.slice(offset..offset + (dtype.length as usize)));
187187
let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;
188188

189189
DType::try_from_view(fbd_dtype, sliced_buffer.clone())
@@ -198,7 +198,7 @@ impl OpenOptions {
198198
dtype: DType,
199199
) -> VortexResult<FileLayout> {
200200
let offset = usize::try_from(segment.offset - initial_offset)?;
201-
let bytes = initial_read.slice(offset..offset + segment.length);
201+
let bytes = initial_read.slice(offset..offset + (segment.length as usize));
202202

203203
let fb = root::<fb::FileLayout>(&bytes)?;
204204
let fb_root_layout = fb
@@ -226,10 +226,7 @@ impl OpenOptions {
226226
let fb_segments = fb
227227
.segments()
228228
.ok_or_else(|| vortex_err!("FileLayout missing segments"))?;
229-
let segments = fb_segments
230-
.iter()
231-
.map(|s| Segment::read_flatbuffer(&s))
232-
.try_collect()?;
229+
let segments = fb_segments.iter().map(Segment::try_from).try_collect()?;
233230

234231
Ok(FileLayout {
235232
root_layout,
@@ -253,9 +250,9 @@ impl OpenOptions {
253250
let segment_id = SegmentId::from(u32::try_from(idx)?);
254251

255252
let offset = usize::try_from(segment.offset - initial_offset)?;
256-
let bytes = initial_read.slice(offset..offset + segment.length);
253+
let buffer = initial_read.slice(offset..offset + (segment.length as usize));
257254

258-
segments.set(segment_id, bytes.into_inner())?;
255+
segments.set(segment_id, buffer)?;
259256
}
260257
Ok(())
261258
}

vortex-file/src/v2/segments/cache.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
use std::sync::{Arc, RwLock};
44

55
use async_trait::async_trait;
6-
use bytes::Bytes;
76
use futures::channel::oneshot;
87
use futures_util::future::try_join_all;
8+
use futures_util::TryFutureExt;
99
use itertools::Itertools;
1010
use vortex_array::aliases::hash_map::HashMap;
11+
use vortex_buffer::ByteBuffer;
1112
use vortex_error::{vortex_err, VortexResult};
1213
use vortex_io::VortexReadAt;
1314
use vortex_layout::segments::{AsyncSegmentReader, SegmentId};
@@ -17,7 +18,7 @@ use crate::v2::footer::Segment;
1718
pub(crate) struct SegmentCache<R> {
1819
read: R,
1920
segments: Arc<[Segment]>,
20-
inflight: RwLock<HashMap<SegmentId, Vec<oneshot::Sender<Bytes>>>>,
21+
inflight: RwLock<HashMap<SegmentId, Vec<oneshot::Sender<ByteBuffer>>>>,
2122
}
2223

2324
impl<R> SegmentCache<R> {
@@ -29,7 +30,7 @@ impl<R> SegmentCache<R> {
2930
}
3031
}
3132

32-
pub fn set(&mut self, _segment_id: SegmentId, _bytes: Bytes) -> VortexResult<()> {
33+
pub fn set(&mut self, _segment_id: SegmentId, _bytes: ByteBuffer) -> VortexResult<()> {
3334
// Do nothing for now
3435
Ok(())
3536
}
@@ -55,6 +56,7 @@ impl<R: VortexReadAt> SegmentCache<R> {
5556
let segment = &self.segments[**id as usize];
5657
self.read
5758
.read_byte_range(segment.offset, segment.length as u64)
59+
.map_ok(|bytes| ByteBuffer::from(bytes).aligned(segment.alignment))
5860
}))
5961
.await?;
6062

@@ -77,7 +79,7 @@ impl<R: VortexReadAt> SegmentCache<R> {
7779

7880
#[async_trait]
7981
impl<R: VortexReadAt> AsyncSegmentReader for SegmentCache<R> {
80-
async fn get(&self, id: SegmentId) -> VortexResult<Bytes> {
82+
async fn get(&self, id: SegmentId) -> VortexResult<ByteBuffer> {
8183
let (send, recv) = oneshot::channel();
8284
self.inflight
8385
.write()

0 commit comments

Comments
 (0)