Skip to content

Commit 93d658f

Browse files
Split by layout chunks of fields used in query (#2022)
Co-authored-by: Nicholas Gates <[email protected]>
1 parent b1ebd68 commit 93d658f

File tree

22 files changed

+351
-94
lines changed

22 files changed

+351
-94
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use vortex_error::VortexResult;
1515
use vortex_expr::datafusion::convert_expr_to_vortex;
1616
use vortex_expr::transform::simplify_typed::simplify_typed;
1717
use vortex_expr::{and, ident, lit, select, ExprRef};
18-
use vortex_file::{ExecutionMode, Scan, SplitBy, VortexOpenOptions};
18+
use vortex_file::{ExecutionMode, Scan, VortexOpenOptions};
1919
use vortex_io::ObjectStoreReadAt;
2020

2121
use super::cache::FileLayoutCache;
@@ -52,7 +52,7 @@ impl VortexFileOpener {
5252
.reduce(and)
5353
.unwrap_or_else(|| lit(true));
5454

55-
simplify_typed(expr, dtype)
55+
simplify_typed(expr, &dtype)
5656
})
5757
.transpose()?;
5858

@@ -88,8 +88,6 @@ impl FileOpener for VortexFileOpener {
8888
.try_get(&file_meta.object_meta, this.object_store.clone())
8989
.await?,
9090
)
91-
// Create larger splits in so that each chunk has more rows
92-
.with_split_by(SplitBy::RowCount(2 << 15))
9391
.with_execution_mode(ExecutionMode::TokioRuntime(Handle::current()))
9492
.open(read_at)
9593
.await?;

vortex-dtype/src/field.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,17 +127,27 @@ impl FieldPath {
127127
}
128128

129129
/// Pushes a new field selector to the end of this path
130-
pub fn push<F: Into<Field>>(&mut self, field: F) {
130+
pub fn push<F: Into<Field>>(mut self, field: F) -> Self {
131131
self.0.push(field.into());
132+
self
133+
}
134+
135+
/// Whether the path starts with the given field name
136+
/// TODO(joe): handle asserts better.
137+
pub fn starts_with_field(&self, field: &Field) -> bool {
138+
assert!(matches!(field, Field::Name(_)));
139+
let first = self.0.first();
140+
assert!(matches!(first, Some(Field::Name(_))));
141+
first.is_some_and(|f| f == field)
132142
}
133143

134144
/// Steps into the next field in the path
135-
pub fn step_into(mut self) -> VortexResult<Self> {
145+
pub fn step_into(mut self) -> Option<Self> {
136146
if self.0.is_empty() {
137-
return Err(vortex_err!("Cannot step into root path"));
147+
return None;
138148
}
139149
self.0 = self.0.iter().skip(1).cloned().collect();
140-
Ok(self)
150+
Some(self)
141151
}
142152
}
143153

@@ -171,9 +181,7 @@ mod tests {
171181

172182
#[test]
173183
fn test_field_path() {
174-
let mut path = FieldPath::from_name("A");
175-
path.push("B");
176-
path.push("C");
184+
let path = FieldPath::from_name("A").push("B").push("C");
177185
assert_eq!(path.to_string(), "$A.$B.$C");
178186

179187
let fields = vec!["A", "B", "C"]

vortex-dtype/src/field_mask.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
//! Field mask represents a field projection, which leads to a set of field paths under a given layout.
2+
3+
use vortex_error::{vortex_bail, VortexResult};
4+
5+
use crate::{Field, FieldPath};
6+
7+
/// Represents a field mask, which is a projection of fields under a layout.
8+
#[derive(Debug, Clone, PartialEq, Eq)]
9+
pub enum FieldMask {
10+
/// Select all fields in the layout
11+
All,
12+
/// Select all with the `FieldPath` prefix
13+
Prefix(FieldPath),
14+
/// Select a field matching exactly the `FieldPath`
15+
Exact(FieldPath),
16+
}
17+
18+
impl FieldMask {
19+
/// Creates a new field mask stepping one level into the layout structure.
20+
pub fn step_into(self) -> VortexResult<Self> {
21+
match self {
22+
FieldMask::All => Ok(FieldMask::All),
23+
FieldMask::Prefix(fp) => {
24+
let Some(stepped_fp) = fp.step_into() else {
25+
return Ok(FieldMask::All);
26+
};
27+
if stepped_fp.is_root() {
28+
Ok(FieldMask::All)
29+
} else {
30+
Ok(FieldMask::Prefix(stepped_fp))
31+
}
32+
}
33+
FieldMask::Exact(fp) => {
34+
if let Some(stepped_fp) = fp.step_into() {
35+
Ok(FieldMask::Exact(stepped_fp))
36+
} else {
37+
vortex_bail!("Cannot step into exact root field path");
38+
}
39+
}
40+
}
41+
}
42+
43+
/// Returns the first field explicit select mask, if there is one, failing if mask = `All`.
44+
pub fn starting_field(&self) -> VortexResult<Option<&Field>> {
45+
match self {
46+
FieldMask::All => vortex_bail!("Cannot get starting field from All mask"),
47+
// We know that fp is non-empty
48+
FieldMask::Prefix(fp) | FieldMask::Exact(fp) => Ok(fp.path().first()),
49+
}
50+
}
51+
52+
/// True iff all fields are selected (including self).
53+
pub fn matches_all(&self) -> bool {
54+
match self {
55+
FieldMask::All => true,
56+
FieldMask::Prefix(path) => path.is_root(),
57+
FieldMask::Exact(_) => false,
58+
}
59+
}
60+
61+
/// True if the mask matches the root field.
62+
pub fn matches_root(&self) -> bool {
63+
match self {
64+
FieldMask::All => true,
65+
FieldMask::Prefix(path) | FieldMask::Exact(path) => path.is_root(),
66+
}
67+
}
68+
}

vortex-dtype/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
pub use dtype::*;
1010
pub use extension::*;
1111
pub use field::*;
12+
pub use field_mask::*;
1213
pub use half;
1314
pub use nullability::*;
1415
pub use ptype::*;
@@ -19,6 +20,7 @@ mod arbitrary;
1920
mod dtype;
2021
mod extension;
2122
mod field;
23+
mod field_mask;
2224
mod nullability;
2325
mod ptype;
2426
mod serde;

vortex-dtype/src/struct_.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,14 @@ impl StructDType {
238238
&self.names
239239
}
240240

241+
/// Find the index of a field.
242+
pub fn find(&self, field: &Field) -> Option<usize> {
243+
match field {
244+
Field::Name(name) => self.find_name(name),
245+
Field::Index(idx) => Some(*idx),
246+
}
247+
}
248+
241249
/// Find the index of a field by name
242250
/// Returns `None` if the field is not found
243251
pub fn find_name(&self, name: &str) -> Option<usize> {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use vortex_array::aliases::hash_set::HashSet;
2+
use vortex_dtype::{DType, Field, FieldPath};
3+
use vortex_error::{vortex_bail, VortexResult};
4+
5+
use crate::traversal::{FoldUp, Folder, Node};
6+
use crate::{ExprRef, GetItem, Identity, Select};
7+
8+
/// Returns the field mask for the given expression.
9+
///
10+
/// This defines a mask over the scope of the fields that are accessed by the expression.
11+
pub fn field_mask(expr: &ExprRef, scope_dtype: &DType) -> VortexResult<HashSet<FieldPath>> {
12+
// I know it's unused now, but we will for sure need the scope DType for future expressions.
13+
let DType::Struct(_scope_dtype, _) = scope_dtype else {
14+
vortex_bail!("Mismatched dtype {} for struct layout", scope_dtype);
15+
};
16+
17+
Ok(match expr.accept_with_context(&mut FieldMaskFolder, ())? {
18+
FoldUp::Abort(out) => out,
19+
FoldUp::Continue(out) => out,
20+
})
21+
}
22+
23+
struct FieldMaskFolder;
24+
25+
impl<'a> Folder<'a> for FieldMaskFolder {
26+
type NodeTy = ExprRef;
27+
type Out = HashSet<FieldPath>;
28+
type Context = ();
29+
30+
fn visit_up(
31+
&mut self,
32+
node: &'a Self::NodeTy,
33+
_context: Self::Context,
34+
children: Vec<Self::Out>,
35+
) -> VortexResult<FoldUp<Self::Out>> {
36+
// The identity returns a field path covering the root.
37+
if node.as_any().is::<Identity>() {
38+
return Ok(FoldUp::Continue([FieldPath::root()].into()));
39+
}
40+
41+
// GetItem pushes an element to each field path
42+
if let Some(getitem) = node.as_any().downcast_ref::<GetItem>() {
43+
let fields = children
44+
.into_iter()
45+
.flat_map(|field_mask| field_mask.into_iter())
46+
.map(|field_path| field_path.push(Field::Name(getitem.field().clone())))
47+
.collect();
48+
return Ok(FoldUp::Continue(fields));
49+
}
50+
51+
if node.as_any().is::<Select>() {
52+
vortex_bail!("Expression must be simplified")
53+
}
54+
55+
// Otherwise, return the field paths from the children
56+
Ok(FoldUp::Continue(children.into_iter().flatten().collect()))
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod test {
62+
use std::iter;
63+
64+
use itertools::Itertools;
65+
use vortex_dtype::Nullability::NonNullable;
66+
use vortex_dtype::{DType, FieldPath, PType, StructDType};
67+
68+
use crate::transform::field_mask::field_mask;
69+
use crate::{get_item, ident};
70+
71+
fn dtype() -> DType {
72+
DType::Struct(
73+
StructDType::new(
74+
["A".into(), "B".into(), "C".into()].into(),
75+
iter::repeat(DType::Primitive(PType::I32, NonNullable))
76+
.take(3)
77+
.collect(),
78+
),
79+
NonNullable,
80+
)
81+
}
82+
83+
#[test]
84+
fn field_mask_ident() {
85+
let mask = field_mask(&ident(), &dtype())
86+
.unwrap()
87+
.into_iter()
88+
.collect_vec();
89+
assert_eq!(mask.as_slice(), &[FieldPath::root()]);
90+
}
91+
92+
#[test]
93+
fn field_mask_get_item() {
94+
let mask = field_mask(&get_item("A", ident()), &dtype())
95+
.unwrap()
96+
.into_iter()
97+
.collect_vec();
98+
assert_eq!(mask.as_slice(), &[FieldPath::from_name("A")]);
99+
}
100+
101+
#[test]
102+
fn field_mask_get_item_nested() {
103+
let mask = field_mask(&get_item("B", get_item("A", ident())), &dtype())
104+
.unwrap()
105+
.into_iter()
106+
.collect_vec();
107+
assert_eq!(mask.as_slice(), &[FieldPath::from_name("A").push("B")]);
108+
}
109+
}

vortex-expr/src/transform/immediate_access.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use itertools::Itertools;
22
use vortex_array::aliases::hash_map::HashMap;
33
use vortex_array::aliases::hash_set::HashSet;
44
use vortex_dtype::{FieldName, StructDType};
5-
use vortex_error::VortexResult;
5+
use vortex_error::{vortex_err, VortexResult};
66

77
use crate::traversal::{Node, NodeVisitor, TraversalOrder};
88
use crate::{ExprRef, GetItem, Identity, Select};
@@ -23,6 +23,19 @@ pub fn immediate_scope_accesses<'a>(
2323
ImmediateScopeAccessesAnalysis::<'a>::analyze(expr, scope_dtype)
2424
}
2525

26+
/// This returns the immediate scope_access (as explained `immediate_scope_accesses`) for `expr`.
27+
pub fn immediate_scope_access<'a>(
28+
expr: &'a ExprRef,
29+
scope_dtype: &'a StructDType,
30+
) -> VortexResult<HashSet<FieldName>> {
31+
ImmediateScopeAccessesAnalysis::<'a>::analyze(expr, scope_dtype)?
32+
.get(expr)
33+
.ok_or_else(|| {
34+
vortex_err!("Expression missing from scope accesses, this is a internal bug")
35+
})
36+
.cloned()
37+
}
38+
2639
struct ImmediateScopeAccessesAnalysis<'a> {
2740
sub_expressions: FieldAccesses<'a>,
2841
scope_dtype: &'a StructDType,

vortex-expr/src/transform/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! A collection of transformations that can be applied to a [`crate::ExprRef`].
2-
mod immediate_access;
2+
pub mod field_mask;
3+
pub mod immediate_access;
34
pub mod partition;
45
pub(crate) mod remove_select;
56
pub mod simplify;

vortex-expr/src/transform/partition.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl<'a> StructFieldExpressionSplitter<'a> {
114114
};
115115
VortexResult::Ok(Partition {
116116
name,
117-
expr: simplify_typed(expr, field_dtype)?,
117+
expr: simplify_typed(expr, &field_dtype)?,
118118
})
119119
})
120120
.try_collect()?;
@@ -131,7 +131,7 @@ impl<'a> StructFieldExpressionSplitter<'a> {
131131
.transform(&mut ReplaceAccessesWithChild(remove_accesses))?;
132132

133133
Ok(PartitionedExpr {
134-
root: simplify_typed(split.result, dtype.clone())?,
134+
root: simplify_typed(split.result, dtype)?,
135135
partitions: partitions.into_boxed_slice(),
136136
})
137137
}
@@ -376,7 +376,7 @@ mod tests {
376376
get_item("b", get_item("a", ident())),
377377
select(vec!["a".into(), "b".into()], ident()),
378378
);
379-
let expr = simplify_typed(expr, dtype.clone()).unwrap();
379+
let expr = simplify_typed(expr, &dtype).unwrap();
380380
let partitioned = StructFieldExpressionSplitter::split(expr, &dtype).unwrap();
381381

382382
// One for id.a and id.b

vortex-expr/src/transform/remove_select.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,24 @@ use crate::{get_item, pack, ExprRef, Select};
66

77
/// Select is a useful expression, however it can be defined in terms of get_item & pack,
88
/// once the expression type is known, this simplifications pass removes the select expression.
9-
pub fn remove_select(e: ExprRef, scope_dt: DType) -> VortexResult<ExprRef> {
10-
let mut transform = RemoveSelectTransform::new(scope_dt);
9+
pub fn remove_select(e: ExprRef, scope_dt: &DType) -> VortexResult<ExprRef> {
10+
let mut transform = RemoveSelectTransform {
11+
scope_dtype: scope_dt,
12+
};
1113
e.transform(&mut transform).map(|e| e.result)
1214
}
1315

14-
struct RemoveSelectTransform {
15-
ident_dtype: DType,
16+
struct RemoveSelectTransform<'a> {
17+
scope_dtype: &'a DType,
1618
}
1719

18-
impl RemoveSelectTransform {
19-
fn new(ident_dtype: DType) -> Self {
20-
Self { ident_dtype }
21-
}
22-
}
23-
24-
impl MutNodeVisitor for RemoveSelectTransform {
20+
impl MutNodeVisitor for RemoveSelectTransform<'_> {
2521
type NodeTy = ExprRef;
2622

2723
fn visit_up(&mut self, node: ExprRef) -> VortexResult<TransformResult<Self::NodeTy>> {
2824
if let Some(select) = node.as_any().downcast_ref::<Select>() {
2925
let child = select.child();
30-
let child_dtype = child.return_dtype(&self.ident_dtype)?;
26+
let child_dtype = child.return_dtype(self.scope_dtype)?;
3127
let child_dtype = child_dtype.as_struct().ok_or_else(|| {
3228
vortex_err!(
3329
"Select child must return a struct dtype, however it was a {}",
@@ -76,7 +72,7 @@ mod tests {
7672
NonNullable,
7773
);
7874
let e = select(["a".into(), "b".into()], ident());
79-
let e = remove_select(e, dtype).unwrap();
75+
let e = remove_select(e, &dtype).unwrap();
8076

8177
assert!(e.as_any().is::<Pack>());
8278
}

0 commit comments

Comments
 (0)