Skip to content

Commit a3cde9c

Browse files
feat: expr -> operator conversion and use in layouts (#4388)
Signed-off-by: Joe Isaacs <[email protected]>
1 parent 7cccc54 commit a3cde9c

File tree

20 files changed

+343
-143
lines changed

20 files changed

+343
-143
lines changed

encodings/fastlanes/benches/pipeline_bitpacking_compare_scalar.rs

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,20 @@
33

44
#![allow(clippy::unwrap_used)]
55
#![allow(unexpected_cfgs)]
6-
use std::rc::Rc;
76

87
use arrow_buffer::BooleanBuffer;
98
use divan::Bencher;
109
use mimalloc::MiMalloc;
1110
use rand::prelude::StdRng;
1211
use rand::{Rng, SeedableRng};
13-
use vortex_array::arrays::ConstantOperator;
14-
use vortex_array::compute::{Operator as BinaryOperator, filter};
15-
use vortex_array::pipeline::operators::{CompareOperator, ScalarCompareOperator};
12+
use vortex_array::compute::filter;
1613
use vortex_array::pipeline::{Element, export_canonical_pipeline_expr};
1714
use vortex_array::{Array, ArrayRef, IntoArray, ToCanonical};
1815
use vortex_buffer::BufferMut;
1916
use vortex_dtype::Nullability::NonNullable;
2017
use vortex_dtype::{DType, NativePType};
2118
use vortex_error::VortexResult;
22-
use vortex_expr::{Scope, lit, lt, reduce_operator, root};
19+
use vortex_expr::{Scope, VortexExprExt, lit, lt, root};
2320
use vortex_fastlanes::{FoRArray, bitpack_to_best_bit_width};
2421
use vortex_mask::Mask;
2522
use vortex_scalar::Scalar;
@@ -91,16 +88,8 @@ pub fn pipeline<T: Element + NativePType + Into<Scalar>>(bencher: Bencher, fract
9188
.map(|_| rng.random_bool(fraction_kept))
9289
.collect::<BooleanBuffer>();
9390

94-
// Get the operator from the FoR+BitPacked array
95-
let array_operator = array.to_operator().unwrap().unwrap();
96-
let constant_operator = Rc::new(ConstantOperator::new(T::from_i32(2).unwrap().into()));
97-
98-
// Create scalar compare operator: array < T::from_i32(2)
99-
let operator = Rc::new(CompareOperator::new(
100-
array_operator,
101-
constant_operator,
102-
BinaryOperator::Lt,
103-
));
91+
let expr = lt(root(), lit(T::from_i32(2).unwrap()));
92+
let operator = expr.to_operator_unoptimized(&array).unwrap().unwrap();
10493

10594
bencher
10695
.with_inputs(|| Mask::from_buffer(mask.clone()))
@@ -127,19 +116,8 @@ pub fn pipeline_opt<T: Element + NativePType + Into<Scalar>>(bencher: Bencher, f
127116
.map(|_| rng.random_bool(fraction_kept))
128117
.collect::<BooleanBuffer>();
129118

130-
// Get the operator from the FoR+BitPacked array
131-
let array_operator = array.to_operator().unwrap().unwrap();
132-
133-
// Create scalar compare operator: array < T::from_i32(2)
134-
let compare_scalar = T::from_i32(2).unwrap().into();
135-
let unoptimized_operator = Rc::new(ScalarCompareOperator::new(
136-
array_operator,
137-
BinaryOperator::Lt,
138-
compare_scalar,
139-
));
140-
141-
// Apply optimizations
142-
let operator = reduce_operator(unoptimized_operator).unwrap();
119+
let expr = lt(root(), lit(T::from_i32(2).unwrap()));
120+
let operator = expr.to_operator(&array).unwrap().unwrap();
143121

144122
bencher
145123
.with_inputs(|| (Mask::from_buffer(mask.clone()), operator.clone()))

encodings/fastlanes/src/bitpacking/pipeline/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ mod unaligned_kernel;
66

77
use std::any::Any;
88
use std::hash::{Hash, Hasher};
9-
use std::rc::Rc;
9+
use std::sync::Arc;
1010

1111
use fastlanes::FastLanes;
1212
pub use kernel::BitPackedKernel;
1313
pub use unaligned_kernel::BitPackedUnalignedKernel;
14-
use vortex_array::pipeline::operators::{BindContext, Operator};
14+
use vortex_array::pipeline::operators::{BindContext, Operator, OperatorRef};
1515
use vortex_array::pipeline::{Kernel, PipelineVTable, VType};
1616
use vortex_buffer::Buffer;
1717
use vortex_dtype::{PhysicalPType, match_each_integer_ptype};
@@ -20,7 +20,7 @@ use vortex_error::VortexResult;
2020
use crate::{BitPackedArray, BitPackedVTable};
2121

2222
impl PipelineVTable<BitPackedVTable> for BitPackedVTable {
23-
fn to_operator(array: &BitPackedArray) -> VortexResult<Option<Rc<dyn Operator>>> {
23+
fn to_operator(array: &BitPackedArray) -> VortexResult<Option<OperatorRef>> {
2424
if array.dtype.is_nullable() {
2525
log::trace!("BitPackedVTable does not support nullable arrays");
2626
return Ok(None);
@@ -30,7 +30,7 @@ impl PipelineVTable<BitPackedVTable> for BitPackedVTable {
3030
return Ok(None);
3131
}
3232

33-
Ok(Some(Rc::new(array.clone())))
33+
Ok(Some(Arc::new(array.clone())))
3434
}
3535
}
3636

@@ -43,12 +43,12 @@ impl Operator for BitPackedArray {
4343
VType::Primitive(self.ptype())
4444
}
4545

46-
fn children(&self) -> &[Rc<dyn Operator>] {
46+
fn children(&self) -> &[OperatorRef] {
4747
&[]
4848
}
4949

50-
fn with_children(&self, _children: Vec<Rc<dyn Operator>>) -> Rc<dyn Operator> {
51-
Rc::new(self.clone())
50+
fn with_children(&self, _children: Vec<OperatorRef>) -> OperatorRef {
51+
Arc::new(self.clone())
5252
}
5353

5454
fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {

encodings/fastlanes/src/for/pipeline.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
use std::any::Any;
55
use std::hash::Hash;
66
use std::marker::PhantomData;
7-
use std::rc::Rc;
7+
use std::sync::Arc;
88

99
use num_traits::WrappingAdd;
1010
use vortex_array::Array;
1111
use vortex_array::compute::Operator as BinaryOperator;
1212
use vortex_array::pipeline::bits::BitView;
13-
use vortex_array::pipeline::operators::{BindContext, Operator, ScalarCompareOperator};
13+
use vortex_array::pipeline::operators::{
14+
BindContext, Operator, OperatorRef, ScalarCompareOperator,
15+
};
1416
use vortex_array::pipeline::vec::VectorId;
1517
use vortex_array::pipeline::view::ViewMut;
1618
use vortex_array::pipeline::{Element, Kernel, KernelContext, PipelineVTable, VType};
@@ -21,11 +23,11 @@ use vortex_scalar::Scalar;
2123
use crate::{FoRArray, FoRVTable};
2224

2325
impl PipelineVTable<FoRVTable> for FoRVTable {
24-
fn to_operator(array: &FoRArray) -> VortexResult<Option<Rc<dyn Operator>>> {
26+
fn to_operator(array: &FoRArray) -> VortexResult<Option<OperatorRef>> {
2527
let Some(op) = array.encoded.to_operator()? else {
2628
return Ok(None);
2729
};
28-
Ok(Some(Rc::new(FoROperator {
30+
Ok(Some(Arc::new(FoROperator {
2931
child: [op],
3032
reference: array.reference.clone(),
3133
ptype: array.ptype(),
@@ -36,7 +38,7 @@ impl PipelineVTable<FoRVTable> for FoRVTable {
3638

3739
#[derive(Debug, Hash)]
3840
pub struct FoROperator {
39-
child: [Rc<dyn Operator>; 1],
41+
child: [OperatorRef; 1],
4042
reference: Scalar,
4143
ptype: PType,
4244
encoded_ptype: PType,
@@ -51,13 +53,13 @@ impl Operator for FoROperator {
5153
VType::Primitive(self.ptype)
5254
}
5355

54-
fn children(&self) -> &[Rc<dyn Operator>] {
56+
fn children(&self) -> &[OperatorRef] {
5557
&self.child
5658
}
5759

58-
fn with_children(&self, mut children: Vec<Rc<dyn Operator>>) -> Rc<dyn Operator> {
60+
fn with_children(&self, mut children: Vec<OperatorRef>) -> OperatorRef {
5961
assert_eq!(children.len(), 1);
60-
Rc::new(FoROperator {
62+
Arc::new(FoROperator {
6163
child: [children.remove(0)],
6264
reference: self.reference.clone(),
6365
ptype: self.ptype,
@@ -90,7 +92,7 @@ impl Operator for FoROperator {
9092
})
9193
}
9294

93-
fn reduce_parent(&self, parent: Rc<dyn Operator>) -> Option<Rc<dyn Operator>> {
95+
fn reduce_parent(&self, parent: OperatorRef) -> Option<OperatorRef> {
9496
let compare = parent.as_any().downcast_ref::<ScalarCompareOperator>()?;
9597
if compare.op != BinaryOperator::Eq && compare.op != BinaryOperator::NotEq {
9698
return None;
@@ -111,7 +113,7 @@ impl Operator for FoROperator {
111113
Scalar::from(compare.wrapping_sub(reference))
112114
});
113115

114-
Some(Rc::new(ScalarCompareOperator::new(
116+
Some(Arc::new(ScalarCompareOperator::new(
115117
self.children()[0].clone(),
116118
compare.op,
117119
new_ref,

vortex-array/src/array/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ mod visitor;
77
use std::any::Any;
88
use std::fmt::{Debug, Formatter};
99
use std::ops::Range;
10-
use std::rc::Rc;
1110
use std::sync::Arc;
1211

1312
pub use visitor::*;
@@ -23,7 +22,7 @@ use crate::arrays::{
2322
};
2423
use crate::builders::ArrayBuilder;
2524
use crate::compute::{ComputeFn, Cost, InvocationArgs, IsConstantOpts, Output, is_constant_opts};
26-
use crate::pipeline::{Operator, PipelineVTable};
25+
use crate::pipeline::{OperatorRef, PipelineVTable};
2726
use crate::serde::ArrayChildren;
2827
use crate::stats::{Precision, Stat, StatsProviderExt, StatsSetRef};
2928
use crate::vtable::{
@@ -155,7 +154,7 @@ pub trait Array: 'static + private::Sealed + Send + Sync + Debug + ArrayVisitor
155154
/// Convert the array to a pipeline operator if supported by the encoding.
156155
///
157156
/// Returns `None` if the encoding does not support pipeline operations.
158-
fn to_operator(&self) -> VortexResult<Option<Rc<dyn Operator>>>;
157+
fn to_operator(&self) -> VortexResult<Option<OperatorRef>>;
159158
}
160159

161160
impl Array for Arc<dyn Array> {
@@ -243,7 +242,7 @@ impl Array for Arc<dyn Array> {
243242
self.as_ref().invoke(compute_fn, args)
244243
}
245244

246-
fn to_operator(&self) -> VortexResult<Option<Rc<dyn Operator>>> {
245+
fn to_operator(&self) -> VortexResult<Option<OperatorRef>> {
247246
self.as_ref().to_operator()
248247
}
249248
}
@@ -618,7 +617,7 @@ impl<V: VTable> Array for ArrayAdapter<V> {
618617
<V::ComputeVTable as ComputeVTable<V>>::invoke(&self.0, compute_fn, args)
619618
}
620619

621-
fn to_operator(&self) -> VortexResult<Option<Rc<dyn Operator>>> {
620+
fn to_operator(&self) -> VortexResult<Option<OperatorRef>> {
622621
<V::PipelineVTable as PipelineVTable<V>>::to_operator(&self.0)
623622
}
624623
}

vortex-array/src/arrays/constant/compute/pipeline.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,21 @@
33

44
use std::any::Any;
55
use std::hash::Hash;
6-
use std::rc::Rc;
6+
use std::sync::Arc;
77

88
use vortex_dtype::{DType, NativePType, match_each_native_ptype};
99
use vortex_error::{VortexExpect, VortexResult};
1010
use vortex_scalar::Scalar;
1111

1212
use crate::arrays::{ConstantArray, ConstantVTable};
1313
use crate::pipeline::bits::BitView;
14-
use crate::pipeline::operators::{BindContext, Operator};
14+
use crate::pipeline::operators::{BindContext, Operator, OperatorRef};
1515
use crate::pipeline::view::ViewMut;
1616
use crate::pipeline::{Element, Kernel, KernelContext, PipelineVTable, VType};
1717

1818
impl PipelineVTable<ConstantVTable> for ConstantVTable {
19-
fn to_operator(array: &ConstantArray) -> VortexResult<Option<Rc<dyn Operator>>> {
20-
Ok(ConstantOperator::maybe_new(array.scalar.clone())
21-
.map(|c| Rc::new(c) as Rc<dyn Operator>))
19+
fn to_operator(array: &ConstantArray) -> VortexResult<Option<OperatorRef>> {
20+
Ok(ConstantOperator::maybe_new(array.scalar.clone()).map(|c| Arc::new(c) as OperatorRef))
2221
}
2322
}
2423

@@ -56,12 +55,12 @@ impl Operator for ConstantOperator {
5655
}
5756
}
5857

59-
fn children(&self) -> &[Rc<dyn Operator>] {
58+
fn children(&self) -> &[OperatorRef] {
6059
&[]
6160
}
6261

63-
fn with_children(&self, _children: Vec<Rc<dyn Operator>>) -> Rc<dyn Operator> {
64-
Rc::new(ConstantOperator::new(self.scalar.clone()))
62+
fn with_children(&self, _children: Vec<OperatorRef>) -> OperatorRef {
63+
Arc::new(ConstantOperator::new(self.scalar.clone()))
6564
}
6665

6766
fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {

vortex-array/src/arrays/primitive/compute/pipeline.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,27 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::any::Any;
5-
use std::rc::Rc;
5+
use std::sync::Arc;
66

7+
use log::info;
78
use vortex_buffer::{Buffer, ByteBuffer};
89
use vortex_dtype::{NativePType, PType, match_each_native_ptype};
9-
use vortex_error::{VortexResult, vortex_bail};
10+
use vortex_error::VortexResult;
1011

1112
use crate::arrays::{PrimitiveArray, PrimitiveVTable};
1213
use crate::pipeline::bits::BitView;
13-
use crate::pipeline::operators::{BindContext, Operator};
14+
use crate::pipeline::operators::{BindContext, Operator, OperatorRef};
1415
use crate::pipeline::view::ViewMut;
1516
use crate::pipeline::{Element, Kernel, KernelContext, N, PipelineVTable, VType};
1617
use crate::vtable::ValidityHelper;
1718

1819
impl PipelineVTable<PrimitiveVTable> for PrimitiveVTable {
19-
fn to_operator(array: &PrimitiveArray) -> VortexResult<Option<Rc<dyn Operator>>> {
20+
fn to_operator(array: &PrimitiveArray) -> VortexResult<Option<OperatorRef>> {
2021
if !array.validity().all_valid() {
21-
vortex_bail!(
22-
"PipelineVTable::to_operator is not supported for arrays with invalid values"
23-
);
22+
info!("PipelineVTable::to_operator is not supported for arrays with invalid values");
23+
return Ok(None);
2424
}
25-
Ok(Some(Rc::new(PrimitiveOperator::new(
25+
Ok(Some(Arc::new(PrimitiveOperator::new(
2626
array.ptype(),
2727
array.byte_buffer().clone(),
2828
))))
@@ -51,12 +51,12 @@ impl Operator for PrimitiveOperator {
5151
VType::Primitive(self.ptype)
5252
}
5353

54-
fn children(&self) -> &[Rc<dyn Operator>] {
54+
fn children(&self) -> &[OperatorRef] {
5555
&[]
5656
}
5757

58-
fn with_children(&self, _children: Vec<Rc<dyn Operator>>) -> Rc<dyn Operator> {
59-
Rc::new(self.clone())
58+
fn with_children(&self, _children: Vec<OperatorRef>) -> OperatorRef {
59+
Arc::new(self.clone())
6060
}
6161

6262
fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {

vortex-array/src/pipeline/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ pub const N: usize = 1024;
3535
pub const N_WORDS: usize = N / usize::BITS as usize;
3636

3737
use std::cell::RefCell;
38-
use std::rc::Rc;
3938

4039
pub use canonical::*;
41-
pub use operators::Operator;
40+
pub use operators::{Operator, OperatorRef};
4241
pub use types::*;
4342
use vec::{VectorId, VectorRef};
4443
use vortex_error::VortexResult;
@@ -123,11 +122,11 @@ use crate::vtable::{NotSupported, VTable};
123122
pub trait PipelineVTable<V: VTable> {
124123
/// Convert the current array into a [`Operator`].
125124
/// Returns `None` if the array cannot be converted to an operator.
126-
fn to_operator(array: &V::Array) -> VortexResult<Option<Rc<dyn Operator>>>;
125+
fn to_operator(array: &V::Array) -> VortexResult<Option<OperatorRef>>;
127126
}
128127

129128
impl<V: VTable> PipelineVTable<V> for NotSupported {
130-
fn to_operator(_array: &V::Array) -> VortexResult<Option<Rc<dyn Operator>>> {
129+
fn to_operator(_array: &V::Array) -> VortexResult<Option<OperatorRef>> {
131130
Ok(None)
132131
}
133132
}

0 commit comments

Comments
 (0)