Skip to content

Commit 9d0430a

Browse files
committed
More operator rules
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 8ebf37a commit 9d0430a

File tree

9 files changed

+202
-18
lines changed

9 files changed

+202
-18
lines changed

encodings/fastlanes/src/for/vtable/rules.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@
33

44
use vortex_array::Array;
55
use vortex_array::ArrayRef;
6+
use vortex_array::ArrayVisitor;
67
use vortex_array::IntoArray;
8+
use vortex_array::arrays::ConstantVTable;
9+
use vortex_array::arrays::ExactScalarFn;
710
use vortex_array::arrays::FilterArray;
811
use vortex_array::arrays::FilterVTable;
12+
use vortex_array::arrays::ScalarFnArrayView;
13+
use vortex_array::expr::Binary;
914
use vortex_array::matchers::Exact;
15+
use vortex_array::matchers::Matcher;
1016
use vortex_array::optimizer::rules::ArrayParentReduceRule;
1117
use vortex_array::optimizer::rules::ParentRuleSet;
1218
use vortex_error::VortexResult;

encodings/runend/src/array.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ impl RunEndArray {
283283
ends.scalar_at(ends.len() - 1).as_ref().try_into()?
284284
};
285285

286+
if values.len() == 1 {
287+
vortex_panic!("RunEndArray CONSTANT");
288+
}
289+
286290
Self::try_new_offset_length(ends, values, 0, length)
287291
}
288292

@@ -297,6 +301,10 @@ impl RunEndArray {
297301
) -> VortexResult<Self> {
298302
Self::validate(&ends, &values, offset, length)?;
299303

304+
if values.len() == 1 {
305+
vortex_panic!("RunEndArray CONSTANT");
306+
}
307+
300308
Ok(Self {
301309
ends,
302310
values,
@@ -320,6 +328,9 @@ impl RunEndArray {
320328
offset: usize,
321329
length: usize,
322330
) -> Self {
331+
if values.len() == 1 {
332+
vortex_panic!("RunEndArray CONSTANT");
333+
}
323334
Self {
324335
ends,
325336
values,

vortex-array/src/arrays/dict/vtable/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::DeserializeMetadata;
1919
use crate::ProstMetadata;
2020
use crate::SerializeMetadata;
2121
use crate::VectorExecutor;
22+
use crate::arrays::vtable::rules::PARENT_RULES;
2223
use crate::executor::ExecutionCtx;
2324
use crate::serde::ArrayChildren;
2425
use crate::vtable;
@@ -32,6 +33,7 @@ mod array;
3233
mod canonical;
3334
mod encode;
3435
mod operations;
36+
mod rules;
3537
mod validity;
3638
mod visitor;
3739

@@ -134,4 +136,12 @@ impl VTable for DictVTable {
134136
let codes = array.codes().execute(ctx)?.into_primitive();
135137
Ok(values.take(&codes))
136138
}
139+
140+
fn reduce_parent(
141+
array: &Self::Array,
142+
parent: &ArrayRef,
143+
child_idx: usize,
144+
) -> VortexResult<Option<ArrayRef>> {
145+
PARENT_RULES.evaluate(array, parent, child_idx)
146+
}
137147
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::VortexResult;
5+
6+
use crate::Array;
7+
use crate::ArrayEq;
8+
use crate::ArrayRef;
9+
use crate::IntoArray;
10+
use crate::Precision;
11+
use crate::arrays::AnyScalarFn;
12+
use crate::arrays::ConstantArray;
13+
use crate::arrays::ConstantVTable;
14+
use crate::arrays::DictArray;
15+
use crate::arrays::DictVTable;
16+
use crate::arrays::ScalarFnArray;
17+
use crate::optimizer::ArrayOptimizer;
18+
use crate::optimizer::rules::ArrayParentReduceRule;
19+
use crate::optimizer::rules::ParentRuleSet;
20+
21+
pub(super) const PARENT_RULES: ParentRuleSet<DictVTable> = ParentRuleSet::new(&[
22+
ParentRuleSet::lift(&DictionaryScalarFnValuesPushDownRule),
23+
ParentRuleSet::lift(&DictionaryScalarFnCodesPullUpRule),
24+
]);
25+
26+
/// Push down a scalar function to run only over the values of a dictionary array.
27+
#[derive(Debug)]
28+
struct DictionaryScalarFnValuesPushDownRule;
29+
30+
impl ArrayParentReduceRule<DictVTable> for DictionaryScalarFnValuesPushDownRule {
31+
type Parent = AnyScalarFn;
32+
33+
fn parent(&self) -> Self::Parent {
34+
AnyScalarFn
35+
}
36+
37+
fn reduce_parent(
38+
&self,
39+
array: &DictArray,
40+
parent: &ScalarFnArray,
41+
child_idx: usize,
42+
) -> VortexResult<Option<ArrayRef>> {
43+
// Check that the scalar function can actually be pushed down.
44+
let sig = parent.scalar_fn().signature();
45+
46+
// If the scalar function is fallible, we cannot push it down since it may fail over a
47+
// value that isn't referenced by any code.
48+
if !array.all_values_referenced && sig.is_fallible() {
49+
tracing::trace!(
50+
"Not pushing down fallible scalar function {} over dictionary with sparse codes {}",
51+
parent.scalar_fn(),
52+
array.display_tree(),
53+
);
54+
return Ok(None);
55+
}
56+
57+
// Check that all siblings are constant
58+
// TODO(ngates): we can also support other dictionaries if the values are the same!
59+
if !parent
60+
.children()
61+
.iter()
62+
.enumerate()
63+
.all(|(idx, c)| idx == child_idx || c.is::<ConstantVTable>())
64+
{
65+
return Ok(None);
66+
}
67+
68+
// If the scalar function is null-sensitive, then we cannot push it down to values if
69+
// we have any nulls in the codes.
70+
if array.codes.dtype().is_nullable() && !array.codes.all_valid() && sig.is_null_sensitive()
71+
{
72+
tracing::trace!(
73+
"Not pushing down null-sensitive scalar function {} over dictionary with null codes {}",
74+
parent.scalar_fn(),
75+
array.display_tree(),
76+
);
77+
return Ok(None);
78+
}
79+
80+
// Now we push the parent scalar function into the dictionary values.
81+
let values_len = array.values().len();
82+
let mut new_children = Vec::with_capacity(parent.children().len());
83+
for (idx, child) in parent.children().iter().enumerate() {
84+
if idx == child_idx {
85+
new_children.push(array.values().clone());
86+
} else {
87+
let scalar = child.as_::<ConstantVTable>().scalar().clone();
88+
new_children.push(ConstantArray::new(scalar, values_len).into_array());
89+
}
90+
}
91+
92+
let new_values =
93+
ScalarFnArray::try_new(parent.scalar_fn().clone(), new_children, values_len)?
94+
.into_array()
95+
.optimize()?;
96+
97+
let new_dict =
98+
unsafe { DictArray::new_unchecked(array.codes().clone(), new_values) }.into_array();
99+
100+
Ok(Some(new_dict))
101+
}
102+
}
103+
104+
#[derive(Debug)]
105+
struct DictionaryScalarFnCodesPullUpRule;
106+
107+
impl ArrayParentReduceRule<DictVTable> for DictionaryScalarFnCodesPullUpRule {
108+
type Parent = AnyScalarFn;
109+
110+
fn parent(&self) -> Self::Parent {
111+
AnyScalarFn
112+
}
113+
114+
fn reduce_parent(
115+
&self,
116+
array: &DictArray,
117+
parent: &ScalarFnArray,
118+
child_idx: usize,
119+
) -> VortexResult<Option<ArrayRef>> {
120+
// Check that all siblings are dictionaries with the same codes as us.
121+
if !parent.children().iter().enumerate().all(|(idx, c)| {
122+
idx == child_idx
123+
|| c.as_opt::<DictVTable>().is_some_and(|c| {
124+
c.values().len() == array.values().len()
125+
&& c.codes().array_eq(array.codes(), Precision::Value)
126+
})
127+
}) {
128+
return Ok(None);
129+
}
130+
131+
let mut new_children = Vec::with_capacity(parent.children().len());
132+
for (idx, child) in parent.children().iter().enumerate() {
133+
if idx == child_idx {
134+
new_children.push(array.values().clone());
135+
} else {
136+
new_children.push(child.as_::<DictVTable>().values().clone());
137+
}
138+
}
139+
140+
let new_values =
141+
ScalarFnArray::try_new(parent.scalar_fn().clone(), new_children, array.values.len())?
142+
.into_array()
143+
.optimize()?;
144+
145+
let new_dict =
146+
unsafe { DictArray::new_unchecked(array.codes().clone(), new_values) }.into_array();
147+
148+
Ok(Some(new_dict))
149+
}
150+
}

vortex-array/src/arrays/scalar_fn/vtable/mod.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -140,20 +140,16 @@ impl VTable for ScalarFnVTable {
140140
}
141141

142142
fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
143-
let datums: Vec<_> = array
144-
.children()
145-
.iter()
146-
.map(|child| match child.as_opt::<ConstantVTable>() {
147-
None => child.execute(ctx).map(Datum::Vector),
148-
Some(constant) => Ok(Datum::Scalar(constant.scalar().to_vector_scalar())),
149-
})
150-
.try_collect()?;
151-
152-
let input_dtypes: Vec<_> = array
153-
.children()
154-
.iter()
155-
.map(|child| child.dtype().clone())
156-
.collect();
143+
// NOTE: we don't use iterators here to make the profiles easier to read!
144+
let mut datums = Vec::with_capacity(array.children.len());
145+
let mut input_dtypes = Vec::with_capacity(array.children.len());
146+
for child in array.children.iter() {
147+
match child.as_opt::<ConstantVTable>() {
148+
None => datums.push(child.execute(ctx).map(Datum::Vector)?),
149+
Some(constant) => datums.push(Datum::Scalar(constant.scalar().to_vector_scalar())),
150+
}
151+
input_dtypes.push(child.dtype().clone());
152+
}
157153

158154
let args = ExecutionArgs {
159155
datums,

vortex-array/src/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl VectorExecutor for ArrayRef {
6767
}
6868

6969
let mut ctx = ExecutionCtx::new(session.clone());
70-
tracing::debug!("Executing array:\n{}", self.display_tree());
70+
tracing::debug!("Executing array {}:\n{}", self, self.display_tree());
7171
Ok(Datum::Vector(self.execute(&mut ctx)?))
7272
}
7373

vortex-array/src/expr/exprs/list_contains.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ impl VTable for ListContains {
199199
fn is_null_sensitive(&self, _instance: &Self::Options) -> bool {
200200
true
201201
}
202+
203+
fn is_fallible(&self, _options: &Self::Options) -> bool {
204+
false
205+
}
202206
}
203207

204208
/// Creates an expression that checks if a value is contained in a list.

vortex-array/src/optimizer/rules.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub trait ArrayParentReduceRule<V: VTable>: Debug + Send + Sync + 'static {
3838
/// - `Err(e)` if an error occurred
3939
fn reduce_parent(
4040
&self,
41-
child: &V::Array,
41+
array: &V::Array,
4242
parent: <Self::Parent as Matcher>::View<'_>,
4343
child_idx: usize,
4444
) -> VortexResult<Option<ArrayRef>>;

vortex-layout/src/layouts/dict/reader.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@ use vortex_array::Array;
1515
use vortex_array::ArrayRef;
1616
use vortex_array::IntoArray;
1717
use vortex_array::MaskFuture;
18+
use vortex_array::VectorExecutor;
1819
use vortex_array::arrays::DictArray;
1920
use vortex_array::compute::MinMaxResult;
2021
use vortex_array::compute::min_max;
2122
use vortex_array::compute::take;
2223
use vortex_array::expr::Expression;
2324
use vortex_array::expr::root;
2425
use vortex_array::mask::MaskExecutor;
26+
use vortex_array::optimizer::ArrayOptimizer;
27+
use vortex_array::vectors::VectorIntoArray;
2528
use vortex_dtype::DType;
2629
use vortex_dtype::FieldMask;
2730
use vortex_error::VortexError;
@@ -107,13 +110,16 @@ impl DictReader {
107110
}
108111

109112
fn values_eval(&self, expr: Expression) -> SharedArrayFuture {
113+
let session = self.session.clone();
110114
self.values_evals
111115
.entry(expr.clone())
112116
.or_insert_with(|| {
113117
self.values_array()
114118
.map(move |array| {
115119
if *USE_VORTEX_OPERATORS {
116-
array?.apply(&expr).map_err(Arc::new)
120+
let array = array?.apply(&expr)?;
121+
// We execute the array to avoid re-evaluating for every split.
122+
Ok(array.execute_vector(&session)?.into_array(array.dtype()))
117123
} else {
118124
expr.evaluate(&array?).map_err(Arc::new)
119125
}
@@ -245,7 +251,8 @@ impl LayoutReader for DictReader {
245251
DictArray::new_unchecked(codes, values)
246252
.set_all_values_referenced(all_values_referenced)
247253
}
248-
.to_array();
254+
.to_array()
255+
.optimize()?;
249256

250257
if *USE_VORTEX_OPERATORS {
251258
array.apply(&expr)

0 commit comments

Comments
 (0)