Skip to content

Commit 1c01fae

Browse files
authored
feat: write nested struct layouts (#4942)
Part of #4889 To better support writing nested data, here we update our builtin StructLayout in a backwards-compatible fashion. - `StructStrategy` will shred struct fields into their own StructLayout recurseively - StructLayout can support nullable structs now. It does this by writing a new child layout containing the validity buffer for nullable arrays I use the `RealNest` dataset to evaluate, which contains a copy of ~200k github pull request webhook events. Nested struct layout reduces file size over the previous strategy by about ~10%, and also makes pushdown into the nested columns possible. Some open questions * The validity child requires some extra handling. It seems like the validity handling is very dependent on the expression being pushed down. For example if I'm doing a simple project of a child field, then adding the validity to the result is a simple masking operation. If I'm pushing down an `UNNEST` or something else that increases the result size, it is hard to map the validity buffer onto the projection_eval result * I collect all validity chunks into a single buffer at write time. The idea being that it's better to access the struct validity as a single unit since it is much smaller than the data size. Assuming an 8MB target segment size, this lets us comfortably fit ~64mm rows into a single segment. Another alternative is to bring back roaring, or enable some other boolean compressors. --------- Signed-off-by: Andrew Duffy <[email protected]>
1 parent 2c31a75 commit 1c01fae

File tree

19 files changed

+596
-198
lines changed

19 files changed

+596
-198
lines changed

bench-vortex/src/realnest/gharchive.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ impl Benchmark for GithubArchive {
230230
fn data_url(&self) -> &Url {
231231
&self.data_url
232232
}
233+
234+
fn expected_row_counts(&self) -> Option<&[usize]> {
235+
Some(&[1, 2, 100, 10, 82468])
236+
}
233237
}
234238

235239
pub async fn register_table(

vortex-cxx/cpp/tests/basic_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ TEST_F(VortexTest, ScanBuilderWithRowRangeWithIncludeByIndex) {
242242

243243
RunScanBuilderTest(
244244
[&include_by_index](vortex::ScanBuilder &scan_builder) {
245-
return std::move(scan_builder.WithRowRange(2, 6).WithIncludeByIndex(include_by_index.data(),
245+
return std::move(scan_builder.WithRowRange(2, 5).WithIncludeByIndex(include_by_index.data(),
246246
include_by_index.size()))
247247
.IntoStream();
248248
},

vortex-file/src/file.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ impl VortexFile {
137137
}
138138

139139
pub fn splits(&self) -> VortexResult<Vec<Range<u64>>> {
140+
let reader = self.layout_reader()?;
140141
Ok(SplitBy::Layout
141-
.splits(self.layout_reader()?.as_ref(), &[FieldMask::All])?
142+
.splits(reader.as_ref(), &(0..reader.row_count()), &[FieldMask::All])?
142143
.into_iter()
143144
.tuple_windows()
144145
.map(|(start, end)| start..end)

vortex-file/src/strategy.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::sync::Arc;
88
use vortex_layout::LayoutStrategy;
99
use vortex_layout::layouts::buffered::BufferedStrategy;
1010
use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11+
use vortex_layout::layouts::collect::CollectStrategy;
1112
use vortex_layout::layouts::compressed::{CompressingStrategy, CompressorPlugin};
1213
use vortex_layout::layouts::dict::writer::DictStrategy;
1314
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
@@ -100,7 +101,7 @@ impl WriteStrategyBuilder {
100101
// 2. calculate stats for each row group
101102
let stats = ZonedStrategy::new(
102103
dict,
103-
compress_then_flat,
104+
compress_then_flat.clone(),
104105
ZonedLayoutOptions {
105106
block_size: self.row_block_size,
106107
..Default::default()
@@ -120,6 +121,8 @@ impl WriteStrategyBuilder {
120121
);
121122

122123
// 0. start with splitting columns
123-
Arc::new(StructStrategy::new(repartition))
124+
let validity_strategy = CollectStrategy::new(compress_then_flat);
125+
126+
Arc::new(StructStrategy::new(repartition, validity_strategy))
124127
}
125128
}

vortex-layout/src/layouts/chunked/reader.rs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::sync::Arc;
88
use futures::future::BoxFuture;
99
use futures::stream::FuturesOrdered;
1010
use futures::{FutureExt, TryStreamExt};
11+
use itertools::Itertools;
1112
use vortex_array::arrays::ChunkedArray;
1213
use vortex_array::{ArrayRef, MaskFuture};
1314
use vortex_dtype::{DType, FieldMask};
@@ -43,7 +44,12 @@ impl ChunkedReader {
4344
}
4445
chunk_offsets[nchildren] = layout.row_count();
4546

46-
let lazy_children = LazyReaderChildren::new(layout.children.clone(), segment_source);
47+
let dtypes = vec![layout.dtype.clone(); nchildren];
48+
let names = (0..nchildren)
49+
.map(|idx| Arc::from(format!("{name}.[{idx}]")))
50+
.collect();
51+
let lazy_children =
52+
LazyReaderChildren::new(layout.children.clone(), dtypes, names, segment_source);
4753

4854
Self {
4955
layout,
@@ -55,11 +61,7 @@ impl ChunkedReader {
5561

5662
/// Return the [`LayoutReader`] for the given chunk.
5763
fn chunk_reader(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
58-
self.lazy_children.get(
59-
idx,
60-
self.layout.dtype(),
61-
&format!("{}.[{}]", self.name, idx).into(),
62-
)
64+
self.lazy_children.get(idx)
6365
}
6466

6567
fn chunk_offset(&self, idx: usize) -> u64 {
@@ -148,16 +150,35 @@ impl LayoutReader for ChunkedReader {
148150
fn register_splits(
149151
&self,
150152
field_mask: &[FieldMask],
151-
row_offset: u64,
153+
row_range: &Range<u64>,
152154
splits: &mut BTreeSet<u64>,
153155
) -> VortexResult<()> {
154-
let mut offset = row_offset;
155-
for i in 0..self.layout.nchildren() {
156-
let child = self.chunk_reader(i)?;
157-
child.register_splits(field_mask, offset, splits)?;
158-
offset += self.layout.child(i)?.row_count();
159-
splits.insert(offset);
156+
for (index, (&start, &end)) in self
157+
.chunk_offsets
158+
.iter()
159+
.tuple_windows::<(_, _)>()
160+
.enumerate()
161+
{
162+
if end < row_range.start {
163+
continue;
164+
}
165+
166+
if start >= row_range.end {
167+
break;
168+
}
169+
170+
// Child overlaps in whole or in part with split
171+
let child = self.chunk_reader(index)?;
172+
let child_range =
173+
std::cmp::max(row_range.start, start)..std::cmp::min(row_range.end, end);
174+
175+
// Register any splits from the child
176+
child.register_splits(field_mask, &child_range, splits)?;
177+
178+
// Register the split indicating the end of this chunk
179+
splits.insert(child_range.end);
160180
}
181+
161182
Ok(())
162183
}
163184

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use async_stream::try_stream;
7+
use async_trait::async_trait;
8+
use futures::{StreamExt, pin_mut};
9+
use vortex_array::ArrayContext;
10+
use vortex_array::arrays::ChunkedArray;
11+
use vortex_error::{VortexExpect, VortexResult};
12+
use vortex_io::runtime::Handle;
13+
14+
use crate::segments::SegmentSinkRef;
15+
use crate::sequence::{
16+
SendableSequentialStream, SequencePointer, SequentialStream, SequentialStreamAdapter,
17+
};
18+
use crate::{LayoutRef, LayoutStrategy};
19+
20+
/// A strategy that collects all chunks and turns them into a single array chunk to pass into
21+
/// a child strategy.
22+
pub struct CollectStrategy {
23+
child: Arc<dyn LayoutStrategy>,
24+
}
25+
26+
impl CollectStrategy {
27+
pub fn new<S: LayoutStrategy>(child: S) -> CollectStrategy {
28+
CollectStrategy {
29+
child: Arc::new(child),
30+
}
31+
}
32+
}
33+
34+
#[async_trait]
35+
impl LayoutStrategy for CollectStrategy {
36+
async fn write_stream(
37+
&self,
38+
ctx: ArrayContext,
39+
segment_sink: SegmentSinkRef,
40+
stream: SendableSequentialStream,
41+
eof: SequencePointer,
42+
handle: Handle,
43+
) -> VortexResult<LayoutRef> {
44+
// Read the whole stream, then write one Chunked stream to the inner thing
45+
let dtype = stream.dtype().clone();
46+
47+
let _dtype = dtype.clone();
48+
let collected_stream = try_stream! {
49+
pin_mut!(stream);
50+
51+
let mut chunks = Vec::new();
52+
let mut latest_sequence_id = None;
53+
while let Some(chunk) = stream.next().await {
54+
let (sequence_id, chunk) = chunk?;
55+
latest_sequence_id = Some(sequence_id);
56+
chunks.push(chunk);
57+
}
58+
59+
let collected = ChunkedArray::try_new(chunks, _dtype)?.to_array();
60+
yield (latest_sequence_id.vortex_expect("must have visited at least one chunk"), collected);
61+
};
62+
63+
let adapted = Box::pin(SequentialStreamAdapter::new(dtype, collected_stream));
64+
65+
self.child
66+
.write_stream(ctx, segment_sink, adapted, eof, handle)
67+
.await
68+
}
69+
70+
fn buffered_bytes(&self) -> u64 {
71+
todo!()
72+
}
73+
}

vortex-layout/src/layouts/dict/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,10 @@ impl LayoutReader for DictReader {
111111
fn register_splits(
112112
&self,
113113
field_mask: &[FieldMask],
114-
row_offset: u64,
114+
row_range: &Range<u64>,
115115
splits: &mut BTreeSet<u64>,
116116
) -> VortexResult<()> {
117-
self.codes.register_splits(field_mask, row_offset, splits)
117+
self.codes.register_splits(field_mask, row_range, splits)
118118
}
119119

120120
fn pruning_evaluation(

vortex-layout/src/layouts/flat/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ impl LayoutReader for FlatReader {
8484
fn register_splits(
8585
&self,
8686
_field_mask: &[FieldMask],
87-
row_offset: u64,
87+
row_range: &Range<u64>,
8888
splits: &mut BTreeSet<u64>,
8989
) -> VortexResult<()> {
90-
splits.insert(row_offset + self.layout.row_count());
90+
splits.insert(row_range.start + self.layout.row_count);
9191
Ok(())
9292
}
9393

vortex-layout/src/layouts/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use vortex_error::SharedVortexResult;
99

1010
pub mod buffered;
1111
pub mod chunked;
12+
pub mod collect;
1213
#[cfg(feature = "zstd")]
1314
pub mod compact;
1415
pub mod compressed;

vortex-layout/src/layouts/row_idx/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ impl LayoutReader for RowIdxLayoutReader {
136136
fn register_splits(
137137
&self,
138138
field_mask: &[FieldMask],
139-
row_offset: u64,
139+
row_range: &Range<u64>,
140140
splits: &mut BTreeSet<u64>,
141141
) -> VortexResult<()> {
142-
self.child.register_splits(field_mask, row_offset, splits)
142+
self.child.register_splits(field_mask, row_range, splits)
143143
}
144144

145145
fn pruning_evaluation(

0 commit comments

Comments
 (0)