Skip to content

Commit 5d204a4

Browse files
committed
small cleanups
Signed-off-by: Andrew Duffy <[email protected]>
1 parent b358f0f commit 5d204a4

File tree

2 files changed

+20
-23
lines changed

2 files changed

+20
-23
lines changed

vortex-layout/src/layouts/struct_/reader.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ impl LayoutReader for StructReader {
221221
mask: Mask,
222222
) -> VortexResult<MaskFuture> {
223223
// Partition the expression into expressions that can be evaluated over individual fields
224+
// TODO(aduffy): handle validity expressions such as "IS NULL" that can be executed directly against
225+
// the validity
224226
match &self.partition_expr(expr.clone()) {
225227
Partitioned::Single(name, partition) => self
226228
.field_reader(name)?
@@ -286,6 +288,10 @@ impl LayoutReader for StructReader {
286288

287289
Ok(Box::pin(async move {
288290
if let Some(validity_fut) = validity_fut {
291+
// Apply the validity array back onto the projected rows.
292+
// NOTE: this only works while VortexExpr is linear. Once an expression
293+
// can return a result of different length (e.g. `UNNEST`) then this becomes
294+
// more complicated.
289295
let (validity, projection) = try_join!(validity_fut, projection_fut)?;
290296
vortex_array::compute::mask(
291297
projection.as_ref(),

vortex-layout/src/layouts/struct_/writer.rs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use futures::{StreamExt, TryStreamExt, pin_mut};
99
use itertools::Itertools;
1010
use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray, ToCanonical};
1111
use vortex_dtype::{DType, Nullability};
12-
use vortex_error::{VortexError, VortexExpect as _, VortexResult, vortex_bail};
12+
use vortex_error::{VortexError, VortexResult, vortex_bail};
1313
use vortex_io::kanal_ext::KanalExt;
1414
use vortex_io::runtime::Handle;
1515
use vortex_utils::aliases::DefaultHashBuilder;
@@ -79,8 +79,6 @@ impl LayoutStrategy for StructStrategy {
7979
// stream<struct_chunk> -> stream<vec<column_chunk>>
8080
let is_nullable = dtype.is_nullable();
8181

82-
// Write the entire stream as a single flat layout child
83-
8482
let columns_vec_stream = stream.map(move |chunk| {
8583
let (sequence_id, chunk) = chunk?;
8684
let mut sequence_pointer = sequence_id.descend();
@@ -103,11 +101,10 @@ impl LayoutStrategy for StructStrategy {
103101
Ok(columns)
104102
});
105103

106-
let stream_count = if is_nullable {
107-
struct_dtype.nfields() + 1
108-
} else {
109-
struct_dtype.nfields()
110-
};
104+
let mut stream_count = struct_dtype.nfields();
105+
if is_nullable {
106+
stream_count += 1;
107+
}
111108

112109
let (column_streams_tx, column_streams_rx): (Vec<_>, Vec<_>) =
113110
(0..stream_count).map(|_| kanal::bounded_async(1)).unzip();
@@ -136,23 +133,17 @@ impl LayoutStrategy for StructStrategy {
136133
})
137134
.detach();
138135

139-
let column_dtypes = (0..stream_count).map(move |idx| {
140-
let mut index = idx;
141-
// If struct is nullable, the first column stream is the validity
142-
if is_nullable {
143-
if index == 0 {
144-
return DType::Bool(Nullability::NonNullable);
145-
} else {
146-
index -= 1
147-
}
148-
}
149-
150-
struct_dtype
151-
.field_by_index(index)
152-
.vortex_expect("bound checked")
153-
});
136+
// First child column is the validity, subsequence children are the individual struct fields
137+
let column_dtypes: Vec<DType> = if is_nullable {
138+
std::iter::once(DType::Bool(Nullability::NonNullable))
139+
.chain(struct_dtype.fields())
140+
.collect()
141+
} else {
142+
struct_dtype.fields().collect()
143+
};
154144

155145
let layout_futures: Vec<_> = column_dtypes
146+
.into_iter()
156147
.zip_eq(column_streams_rx)
157148
.enumerate()
158149
.map(move |(index, (dtype, recv))| {

0 commit comments

Comments
 (0)