Skip to content

Commit e3a59b7

Browse files
committed
feat: write nested struct layouts
Part of the push to fix deeply nested data in Vortex. Signed-off-by: Andrew Duffy <[email protected]>
1 parent 60ce4b5 commit e3a59b7

File tree

6 files changed

+197
-85
lines changed

6 files changed

+197
-85
lines changed

vortex-file/src/strategy.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::sync::Arc;
88
use vortex_layout::LayoutStrategy;
99
use vortex_layout::layouts::buffered::BufferedStrategy;
1010
use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
11+
use vortex_layout::layouts::collect::CollectStrategy;
1112
use vortex_layout::layouts::compressed::{CompressingStrategy, CompressorPlugin};
1213
use vortex_layout::layouts::dict::writer::DictStrategy;
1314
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
@@ -100,7 +101,7 @@ impl WriteStrategyBuilder {
100101
// 2. calculate stats for each row group
101102
let stats = ZonedStrategy::new(
102103
dict,
103-
compress_then_flat,
104+
compress_then_flat.clone(),
104105
ZonedLayoutOptions {
105106
block_size: self.row_block_size,
106107
..Default::default()
@@ -120,6 +121,8 @@ impl WriteStrategyBuilder {
120121
);
121122

122123
// 0. start with splitting columns
123-
Arc::new(StructStrategy::new(repartition))
124+
let validity_strategy = CollectStrategy::new(compress_then_flat);
125+
126+
Arc::new(StructStrategy::new(repartition, validity_strategy))
124127
}
125128
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use async_stream::try_stream;
7+
use async_trait::async_trait;
8+
use futures::{StreamExt, pin_mut};
9+
use vortex_array::ArrayContext;
10+
use vortex_array::arrays::ChunkedArray;
11+
use vortex_error::{VortexExpect, VortexResult};
12+
use vortex_io::runtime::Handle;
13+
14+
use crate::segments::SegmentSinkRef;
15+
use crate::sequence::{
16+
SendableSequentialStream, SequencePointer, SequentialStream, SequentialStreamAdapter,
17+
};
18+
use crate::{LayoutRef, LayoutStrategy};
19+
20+
/// A strategy that collects all chunks and turns them into a single array chunk to pass into
21+
/// a child strategy.
22+
pub struct CollectStrategy {
23+
child: Arc<dyn LayoutStrategy>,
24+
}
25+
26+
impl CollectStrategy {
27+
pub fn new<S: LayoutStrategy>(child: S) -> CollectStrategy {
28+
CollectStrategy {
29+
child: Arc::new(child),
30+
}
31+
}
32+
}
33+
34+
#[async_trait]
35+
impl LayoutStrategy for CollectStrategy {
36+
async fn write_stream(
37+
&self,
38+
ctx: ArrayContext,
39+
segment_sink: SegmentSinkRef,
40+
stream: SendableSequentialStream,
41+
eof: SequencePointer,
42+
handle: Handle,
43+
) -> VortexResult<LayoutRef> {
44+
// Read the whole stream, then write one Chunked stream to the inner thing
45+
let dtype = stream.dtype().clone();
46+
47+
let _dtype = dtype.clone();
48+
let collected_stream = try_stream! {
49+
pin_mut!(stream);
50+
51+
let mut chunks = Vec::new();
52+
let mut latest_sequence_id = None;
53+
while let Some(chunk) = stream.next().await {
54+
let (sequence_id, chunk) = chunk?;
55+
latest_sequence_id = Some(sequence_id);
56+
chunks.push(chunk);
57+
}
58+
59+
let collected = ChunkedArray::try_new(chunks, _dtype)?.to_array();
60+
yield (latest_sequence_id.vortex_expect("must have visited at least one chunk"), collected);
61+
};
62+
63+
let adapted = Box::pin(SequentialStreamAdapter::new(dtype, collected_stream));
64+
65+
self.child
66+
.write_stream(ctx, segment_sink, adapted, eof, handle)
67+
.await
68+
}
69+
70+
fn buffered_bytes(&self) -> u64 {
71+
todo!()
72+
}
73+
}

vortex-layout/src/layouts/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use vortex_error::SharedVortexResult;
99

1010
pub mod buffered;
1111
pub mod chunked;
12+
pub mod collect;
1213
#[cfg(feature = "zstd")]
1314
pub mod compact;
1415
pub mod compressed;

vortex-layout/src/layouts/struct_/mod.rs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::sync::Arc;
88

99
use reader::StructReader;
1010
use vortex_array::{ArrayContext, DeserializeMetadata, EmptyMetadata};
11-
use vortex_dtype::{DType, Field, FieldMask, StructFields};
11+
use vortex_dtype::{DType, Field, FieldMask, Nullability, StructFields};
1212
use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic};
1313

1414
use crate::children::{LayoutChildren, OwnedLayoutChildren};
@@ -52,24 +52,43 @@ impl VTable for StructVTable {
5252
layout.struct_fields().nfields()
5353
}
5454

55-
fn child(layout: &Self::Layout, idx: usize) -> VortexResult<LayoutRef> {
56-
layout.children.child(
57-
idx,
58-
&layout
55+
fn child(layout: &Self::Layout, index: usize) -> VortexResult<LayoutRef> {
56+
let schema_index = if layout.dtype.is_nullable() {
57+
index.saturating_sub(1)
58+
} else {
59+
index
60+
};
61+
62+
let child_dtype = if index == 0 && layout.dtype.is_nullable() {
63+
DType::Bool(Nullability::NonNullable)
64+
} else {
65+
layout
5966
.struct_fields()
60-
.field_by_index(idx)
61-
.ok_or_else(|| vortex_err!("Missing field {idx}"))?,
62-
)
67+
.field_by_index(schema_index)
68+
.ok_or_else(|| vortex_err!("Missing field {schema_index}"))?
69+
};
70+
71+
layout.children.child(index, &child_dtype)
6372
}
6473

6574
fn child_type(layout: &Self::Layout, idx: usize) -> LayoutChildType {
66-
LayoutChildType::Field(
67-
layout
68-
.struct_fields()
69-
.field_name(idx)
70-
.vortex_expect("Field index out of bounds")
71-
.clone(),
72-
)
75+
let schema_index = if layout.dtype.is_nullable() {
76+
idx.saturating_sub(1)
77+
} else {
78+
idx
79+
};
80+
81+
if idx == 0 && layout.dtype.is_nullable() {
82+
LayoutChildType::Auxiliary("validity".into())
83+
} else {
84+
LayoutChildType::Field(
85+
layout
86+
.struct_fields()
87+
.field_name(schema_index)
88+
.vortex_expect("Field index out of bounds")
89+
.clone(),
90+
)
91+
}
7392
}
7493

7594
fn new_reader(

vortex-layout/src/layouts/struct_/reader.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ mod tests {
277277
let ctx = ArrayContext::empty();
278278
let segments = Arc::new(TestSegments::default());
279279
let (ptr, eof) = SequenceId::root().split();
280-
let strategy = StructStrategy::new(FlatLayoutStrategy::default());
280+
let strategy =
281+
StructStrategy::new(FlatLayoutStrategy::default(), FlatLayoutStrategy::default());
281282
let layout = block_on(|handle| {
282283
strategy.write_stream(
283284
ctx,

0 commit comments

Comments
 (0)