Skip to content

Commit cedcb24

Browse files
Struct layout eval with sub-expression slicing and push down (#1893)
Co-authored-by: Nicholas Gates <[email protected]>
1 parent e8228c0 commit cedcb24

File tree

12 files changed

+236
-28
lines changed

12 files changed

+236
-28
lines changed

vortex-expr/src/transform/partition.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ use crate::{get_item, ident, pack, ExprRef, GetItem, Identity, Select, SelectFie
1313
/// The results of each partition can then be recombined to reproduce the result of the original
1414
/// expression.
1515
///
16+
/// ## Note
17+
///
18+
/// This function currently respects the validity of each field in the scope, but the not validity
19+
/// of the scope itself. The fix would be for the returned `PartitionedExpr` to include a partition
20+
/// expression for computing the validity, or to include that expression as part of the root.
21+
///
22+
/// See <https://github.com/spiraldb/vortex/issues/1907>.
23+
///
1624
// TODO(ngates): document the behaviour of conflicting `Field::Index` and `Field::Name`.
1725
pub fn partition(expr: ExprRef, scope_dtype: &StructDType) -> VortexResult<PartitionedExpr> {
1826
StructFieldExpressionSplitter::split(expr, scope_dtype)

vortex-layout/src/layouts/chunked/eval_expr.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use vortex_scalar::Scalar;
1010
use vortex_scan::RowMask;
1111

1212
use crate::layouts::chunked::reader::ChunkedReader;
13-
use crate::reader::LayoutScanExt;
13+
use crate::reader::LayoutReaderExt;
1414
use crate::ExprEvaluator;
1515

1616
#[async_trait(?Send)]
@@ -128,7 +128,7 @@ mod test {
128128
}
129129

130130
#[test]
131-
fn test_chunked_scan() {
131+
fn test_chunked_evaluator() {
132132
block_on(async {
133133
let (segments, layout) = chunked_layout();
134134

vortex-layout/src/layouts/chunked/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use vortex_error::VortexResult;
1313
use crate::data::LayoutData;
1414
use crate::encoding::{LayoutEncoding, LayoutId};
1515
use crate::layouts::chunked::reader::ChunkedReader;
16-
use crate::reader::{LayoutReader, LayoutScanExt};
16+
use crate::reader::{LayoutReader, LayoutReaderExt};
1717
use crate::segments::AsyncSegmentReader;
1818
use crate::CHUNKED_LAYOUT_ID;
1919

vortex-layout/src/layouts/chunked/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ impl ChunkedReader {
4242
}
4343

4444
// Construct a lazy scan for each chunk of the layout.
45-
let chunk_scans = (0..nchunks).map(|_| OnceLock::new()).collect();
45+
let chunk_readers = (0..nchunks).map(|_| OnceLock::new()).collect();
4646

4747
Ok(Self {
4848
layout,
4949
ctx,
5050
segments,
5151
stats_table: Arc::new(OnceCell::new()),
52-
chunk_readers: chunk_scans,
52+
chunk_readers,
5353
})
5454
}
5555

vortex-layout/src/layouts/flat/eval_expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use vortex_flatbuffers::{array as fba, FlatBuffer};
1010
use vortex_scan::RowMask;
1111

1212
use crate::layouts::flat::reader::FlatReader;
13-
use crate::reader::LayoutScanExt;
13+
use crate::reader::LayoutReaderExt;
1414
use crate::{ExprEvaluator, LayoutReader};
1515

1616
#[async_trait(?Send)]

vortex-layout/src/layouts/flat/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vortex_error::VortexResult;
1212

1313
use crate::encoding::{LayoutEncoding, LayoutId};
1414
use crate::layouts::flat::reader::FlatReader;
15-
use crate::reader::{LayoutReader, LayoutScanExt};
15+
use crate::reader::{LayoutReader, LayoutReaderExt};
1616
use crate::segments::AsyncSegmentReader;
1717
use crate::{LayoutData, FLAT_LAYOUT_ID};
1818

Lines changed: 145 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,155 @@
11
use async_trait::async_trait;
2-
use vortex_array::ArrayData;
2+
use futures::future::try_join_all;
3+
use itertools::Itertools;
4+
use vortex_array::array::StructArray;
5+
use vortex_array::validity::Validity;
6+
use vortex_array::{ArrayData, IntoArrayData};
37
use vortex_error::VortexResult;
8+
use vortex_expr::transform::partition::partition;
49
use vortex_expr::ExprRef;
510
use vortex_scan::RowMask;
611

7-
use crate::layouts::struct_::reader::StructScan;
12+
use crate::layouts::struct_::reader::StructReader;
813
use crate::ExprEvaluator;
914

1015
#[async_trait(?Send)]
11-
impl ExprEvaluator for StructScan {
12-
async fn evaluate_expr(&self, _row_mask: RowMask, _expr: ExprRef) -> VortexResult<ArrayData> {
13-
todo!()
16+
impl ExprEvaluator for StructReader {
17+
async fn evaluate_expr(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult<ArrayData> {
18+
// Partition the expression into expressions that can be evaluated over individual fields
19+
let partitioned = partition(expr, self.struct_dtype())?;
20+
let field_readers: Vec<_> = partitioned
21+
.partitions
22+
.iter()
23+
.map(|partition| self.child(&partition.field))
24+
.try_collect()?;
25+
26+
let arrays = try_join_all(
27+
field_readers
28+
.iter()
29+
.zip_eq(partitioned.partitions.iter())
30+
.map(|(reader, partition)| {
31+
reader.evaluate_expr(row_mask.clone(), partition.expr.clone())
32+
}),
33+
)
34+
.await?;
35+
36+
let row_count = row_mask.true_count();
37+
debug_assert!(arrays.iter().all(|a| a.len() == row_count));
38+
39+
let root_scope = StructArray::try_new(
40+
partitioned
41+
.partitions
42+
.iter()
43+
.map(|p| p.name.clone())
44+
.collect::<Vec<_>>()
45+
.into(),
46+
arrays,
47+
row_count,
48+
Validity::NonNullable,
49+
)?
50+
.into_array();
51+
52+
// Recombine the partitioned expressions into a single expression
53+
partitioned.root.evaluate(&root_scope)
54+
}
55+
}
56+
57+
#[cfg(test)]
58+
mod tests {
59+
use std::sync::Arc;
60+
61+
use futures::executor::block_on;
62+
use vortex_array::array::StructArray;
63+
use vortex_array::compute::FilterMask;
64+
use vortex_array::{IntoArrayData, IntoArrayVariant};
65+
use vortex_buffer::buffer;
66+
use vortex_dtype::PType::I32;
67+
use vortex_dtype::{DType, Nullability, StructDType};
68+
use vortex_expr::{get_item, gt, ident};
69+
use vortex_scan::RowMask;
70+
71+
use crate::layouts::flat::writer::FlatLayoutWriter;
72+
use crate::layouts::struct_::writer::StructLayoutWriter;
73+
use crate::segments::test::TestSegments;
74+
use crate::strategies::LayoutWriterExt;
75+
use crate::LayoutData;
76+
77+
/// Create a chunked layout with three chunks of primitive arrays.
78+
fn struct_layout() -> (Arc<TestSegments>, LayoutData) {
79+
let mut segments = TestSegments::default();
80+
81+
let layout = StructLayoutWriter::new(
82+
DType::Struct(
83+
StructDType::new(
84+
vec!["a".into(), "b".into(), "c".into()].into(),
85+
vec![I32.into(), I32.into(), I32.into()],
86+
),
87+
Nullability::NonNullable,
88+
),
89+
vec![
90+
Box::new(FlatLayoutWriter::new(I32.into())),
91+
Box::new(FlatLayoutWriter::new(I32.into())),
92+
Box::new(FlatLayoutWriter::new(I32.into())),
93+
],
94+
)
95+
.push_all(
96+
&mut segments,
97+
[StructArray::from_fields(
98+
[
99+
("a", buffer![7, 2, 3].into_array()),
100+
("b", buffer![4, 5, 6].into_array()),
101+
("c", buffer![4, 5, 6].into_array()),
102+
]
103+
.as_slice(),
104+
)
105+
.map(IntoArrayData::into_array)],
106+
)
107+
.unwrap();
108+
(Arc::new(segments), layout)
109+
}
110+
111+
#[test]
112+
fn test_struct_layout() {
113+
let (segments, layout) = struct_layout();
114+
115+
let reader = layout.reader(segments, Default::default()).unwrap();
116+
let expr = gt(get_item("a", ident()), get_item("b", ident()));
117+
let result =
118+
block_on(reader.evaluate_expr(RowMask::new_valid_between(0, 3), expr)).unwrap();
119+
assert_eq!(
120+
vec![true, false, false],
121+
result
122+
.into_bool()
123+
.unwrap()
124+
.boolean_buffer()
125+
.iter()
126+
.collect::<Vec<_>>()
127+
);
128+
}
129+
130+
#[test]
131+
fn test_struct_layout_row_mask() {
132+
let (segments, layout) = struct_layout();
133+
134+
let reader = layout.reader(segments, Default::default()).unwrap();
135+
let expr = gt(get_item("a", ident()), get_item("b", ident()));
136+
let result = block_on(reader.evaluate_expr(
137+
// Take rows 0 and 1, skip row 2, and anything after that
138+
RowMask::new(FilterMask::from_iter([true, true, false]), 0),
139+
expr,
140+
))
141+
.unwrap();
142+
143+
assert_eq!(result.len(), 2);
144+
145+
assert_eq!(
146+
vec![true, false],
147+
result
148+
.into_bool()
149+
.unwrap()
150+
.boolean_buffer()
151+
.iter()
152+
.collect::<Vec<_>>()
153+
);
14154
}
15155
}

vortex-layout/src/layouts/struct_/eval_stats.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ use vortex_array::stats::{Stat, StatsSet};
33
use vortex_dtype::FieldPath;
44
use vortex_error::VortexResult;
55

6-
use crate::layouts::struct_::reader::StructScan;
6+
use crate::layouts::struct_::reader::StructReader;
77
use crate::StatsEvaluator;
88

99
#[async_trait(?Send)]
10-
impl StatsEvaluator for StructScan {
10+
impl StatsEvaluator for StructReader {
1111
async fn evaluate_stats(
1212
&self,
1313
field_paths: &[FieldPath],

vortex-layout/src/layouts/struct_/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ pub mod writer;
66
use std::collections::BTreeSet;
77
use std::sync::Arc;
88

9-
use reader::StructScan;
9+
use reader::StructReader;
1010
use vortex_array::ContextRef;
1111
use vortex_error::VortexResult;
1212

1313
use crate::data::LayoutData;
1414
use crate::encoding::{LayoutEncoding, LayoutId};
15-
use crate::reader::{LayoutReader, LayoutScanExt};
15+
use crate::reader::{LayoutReader, LayoutReaderExt};
1616
use crate::segments::AsyncSegmentReader;
1717
use crate::COLUMNAR_LAYOUT_ID;
1818

@@ -28,9 +28,9 @@ impl LayoutEncoding for StructLayout {
2828
&self,
2929
layout: LayoutData,
3030
ctx: ContextRef,
31-
_segments: Arc<dyn AsyncSegmentReader>,
31+
segments: Arc<dyn AsyncSegmentReader>,
3232
) -> VortexResult<Arc<dyn LayoutReader>> {
33-
Ok(StructScan::try_new(layout, ctx)?.into_arc())
33+
Ok(StructReader::try_new(layout, segments, ctx)?.into_arc())
3434
}
3535

3636
fn register_splits(

vortex-layout/src/layouts/struct_/reader.rs

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,86 @@
1+
use std::sync::{Arc, OnceLock};
2+
3+
use vortex_array::aliases::hash_map::HashMap;
14
use vortex_array::ContextRef;
2-
use vortex_error::{vortex_panic, VortexResult};
5+
use vortex_dtype::{DType, Field, FieldName, StructDType};
6+
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
37

48
use crate::layouts::struct_::StructLayout;
5-
use crate::{LayoutData, LayoutEncoding, LayoutReader};
9+
use crate::segments::AsyncSegmentReader;
10+
use crate::{LayoutData, LayoutEncoding, LayoutReader, LayoutReaderExt};
611

7-
#[derive(Debug)]
8-
pub struct StructScan {
12+
#[derive(Clone)]
13+
pub struct StructReader {
914
layout: LayoutData,
15+
ctx: ContextRef,
16+
17+
segments: Arc<dyn AsyncSegmentReader>,
18+
19+
field_readers: Arc<[OnceLock<Arc<dyn LayoutReader>>]>,
20+
field_lookup: HashMap<FieldName, usize>,
1021
}
1122

12-
impl StructScan {
13-
pub(super) fn try_new(layout: LayoutData, _ctx: ContextRef) -> VortexResult<Self> {
23+
impl StructReader {
24+
pub(super) fn try_new(
25+
layout: LayoutData,
26+
segments: Arc<dyn AsyncSegmentReader>,
27+
ctx: ContextRef,
28+
) -> VortexResult<Self> {
1429
if layout.encoding().id() != StructLayout.id() {
1530
vortex_panic!("Mismatched layout ID")
1631
}
1732

33+
let dtype = layout.dtype();
34+
let DType::Struct(struct_dt, _) = dtype else {
35+
vortex_panic!("Mismatched dtype {} for struct layout", dtype);
36+
};
37+
38+
let field_readers = struct_dt.names().iter().map(|_| OnceLock::new()).collect();
39+
40+
let field_lookup = struct_dt
41+
.names()
42+
.iter()
43+
.enumerate()
44+
.map(|(i, name)| (name.clone(), i))
45+
.collect();
46+
1847
// This is where we need to do some complex things with the scan in order to split it into
1948
// different scans for different fields.
20-
Ok(Self { layout })
49+
Ok(Self {
50+
layout,
51+
ctx,
52+
segments,
53+
field_readers,
54+
field_lookup,
55+
})
56+
}
57+
58+
/// Return the [`StructDType`] of this layout.
59+
pub(crate) fn struct_dtype(&self) -> &StructDType {
60+
self.dtype()
61+
.as_struct()
62+
.vortex_expect("Struct layout must have a struct DType, verified at construction")
63+
}
64+
65+
/// Return the child reader for the chunk.
66+
pub(crate) fn child(&self, field: &Field) -> VortexResult<&Arc<dyn LayoutReader>> {
67+
let idx = match field {
68+
Field::Name(n) => *self
69+
.field_lookup
70+
.get(n)
71+
.ok_or_else(|| vortex_err!("Field {} not found in struct layout", n))?,
72+
Field::Index(idx) => *idx,
73+
};
74+
self.field_readers[idx].get_or_try_init(|| {
75+
let child_layout = self
76+
.layout
77+
.child(idx, self.struct_dtype().field_dtype(idx)?)?;
78+
child_layout.reader(self.segments.clone(), self.ctx.clone())
79+
})
2180
}
2281
}
2382

24-
impl LayoutReader for StructScan {
83+
impl LayoutReader for StructReader {
2584
fn layout(&self) -> &LayoutData {
2685
&self.layout
2786
}

0 commit comments

Comments
 (0)