Skip to content

Commit 606b33c

Browse files
authored
Operator Updates (#4725)
This PR changes the API a little for operators as well as creates an executor that performs CSE and pipeline conversion. It breaks many of the old operator tests and benchmarks, many of which are commented out. But to avoid long-lived feature branch @joseph-isaacs and I will continue to merge onto develop. Execution via operators is gated on a runtime environment variable for local development until we get this prototype fleshed out and performant! --------- Signed-off-by: Nicholas Gates <[email protected]>
1 parent 9524c08 commit 606b33c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+4295
-3216
lines changed

Cargo.lock

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ jiff = "0.2.0"
120120
kanal = "0.1.1"
121121
lending-iterator = "0.1.7"
122122
libfuzzer-sys = "0.4"
123-
linked_hash_set = "0.1.5"
124123
log = { version = "0.4.21" }
125124
loom = { version = "0.7", features = ["checkpoint"] }
126125
memmap2 = "0.9.5"

encodings/fastlanes/benches/pipeline_bitpacking.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use mimalloc::MiMalloc;
1010
use rand::prelude::StdRng;
1111
use rand::{Rng, SeedableRng};
1212
use vortex_array::compute::{filter, warm_up_vtables};
13-
use vortex_array::pipeline::{Element, export_canonical_pipeline_expr};
1413
use vortex_array::{IntoArray, ToCanonical};
1514
use vortex_buffer::BufferMut;
1615
use vortex_dtype::NativePType;
@@ -68,32 +67,33 @@ pub fn decompress_bitpacking_late_filter<T: NativePType>(bencher: Bencher, fract
6867
.bench_values(|mask| filter(array.to_canonical().as_ref(), &mask).unwrap());
6968
}
7069

71-
#[divan::bench(types = [i8, i16, i32, i64], args = TRUE_COUNT)]
72-
pub fn decompress_bitpacking_pipeline_filter<T: Element + NativePType>(
73-
bencher: Bencher,
74-
fraction_kept: f64,
75-
) {
76-
let mut rng = StdRng::seed_from_u64(0);
77-
let values = (0..LENGTH)
78-
.map(|_| T::from(rng.random_range(0..100)).unwrap())
79-
.collect::<BufferMut<T>>()
80-
.into_array()
81-
.to_primitive();
82-
let array = bitpack_to_best_bit_width(&values).unwrap();
83-
84-
let mask = (0..LENGTH)
85-
.map(|_| rng.random_bool(fraction_kept))
86-
.collect::<BooleanBuffer>();
87-
88-
bencher
89-
.with_inputs(|| Mask::from_buffer(mask.clone()))
90-
.bench_local_values(|mask| {
91-
export_canonical_pipeline_expr(
92-
array.dtype(),
93-
array.len(),
94-
array.to_operator().unwrap().unwrap().as_ref(),
95-
&mask,
96-
)
97-
.unwrap()
98-
});
99-
}
70+
// TODO(ngates): bring back benchmarks once operator API is stable.
71+
// #[divan::bench(types = [i8, i16, i32, i64], args = TRUE_COUNT)]
72+
// pub fn decompress_bitpacking_pipeline_filter<T: Element + NativePType>(
73+
// bencher: Bencher,
74+
// fraction_kept: f64,
75+
// ) {
76+
// let mut rng = StdRng::seed_from_u64(0);
77+
// let values = (0..LENGTH)
78+
// .map(|_| T::from(rng.random_range(0..100)).unwrap())
79+
// .collect::<BufferMut<T>>()
80+
// .into_array()
81+
// .to_primitive();
82+
// let array = bitpack_to_best_bit_width(&values).unwrap();
83+
//
84+
// let mask = (0..LENGTH)
85+
// .map(|_| rng.random_bool(fraction_kept))
86+
// .collect::<BooleanBuffer>();
87+
//
88+
// bencher
89+
// .with_inputs(|| Mask::from_buffer(mask.clone()))
90+
// .bench_local_values(|mask| {
91+
// export_canonical_pipeline_expr(
92+
// array.dtype(),
93+
// array.len(),
94+
// array.to_operator().unwrap().unwrap().as_ref(),
95+
// &mask,
96+
// )
97+
// .unwrap()
98+
// });
99+
// }

encodings/fastlanes/benches/pipeline_bitpacking_compare_scalar.rs

Lines changed: 58 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@ use mimalloc::MiMalloc;
1010
use rand::prelude::StdRng;
1111
use rand::{Rng, SeedableRng};
1212
use vortex_array::compute::{filter, warm_up_vtables};
13-
use vortex_array::pipeline::{Element, export_canonical_pipeline_expr};
1413
use vortex_array::{Array, ArrayRef, IntoArray, ToCanonical};
1514
use vortex_buffer::BufferMut;
16-
use vortex_dtype::Nullability::NonNullable;
17-
use vortex_dtype::{DType, NativePType};
15+
use vortex_dtype::NativePType;
1816
use vortex_error::VortexResult;
19-
use vortex_expr::{Scope, VortexExprExt, lit, lt, root};
17+
use vortex_expr::{Scope, lit, lt, root};
2018
use vortex_fastlanes::{FoRArray, bitpack_to_best_bit_width};
2119
use vortex_mask::Mask;
2220
use vortex_scalar::Scalar;
@@ -74,58 +72,59 @@ pub fn eval<T: NativePType + Into<Scalar>>(bencher: Bencher, fraction_kept: f64)
7472
});
7573
}
7674

77-
#[divan::bench(types = [u8, u16, u32, u64], args = TRUE_COUNT)]
78-
pub fn pipeline<T: Element + NativePType + Into<Scalar>>(bencher: Bencher, fraction_kept: f64) {
79-
let mut rng = StdRng::seed_from_u64(0);
80-
let values = (0..100_000)
81-
.map(|_| T::from(rng.random_range(10..100)).unwrap())
82-
.collect::<BufferMut<T>>();
83-
let array = create_for_bitpacked_array(values).unwrap();
84-
85-
let mask = (0..100_000)
86-
.map(|_| rng.random_bool(fraction_kept))
87-
.collect::<BooleanBuffer>();
88-
89-
let expr = lt(root(), lit(T::from_i32(2).unwrap()));
90-
let operator = expr.to_operator_unoptimized(&array).unwrap().unwrap();
91-
92-
bencher
93-
.with_inputs(|| Mask::from_buffer(mask.clone()))
94-
.bench_local_values(|mask| {
95-
export_canonical_pipeline_expr(
96-
&DType::Bool(NonNullable),
97-
array.len(),
98-
operator.as_ref(),
99-
&mask,
100-
)
101-
.unwrap()
102-
});
103-
}
104-
105-
#[divan::bench(types = [u8, u16, u32, u64], args = TRUE_COUNT)]
106-
pub fn pipeline_opt<T: Element + NativePType + Into<Scalar>>(bencher: Bencher, fraction_kept: f64) {
107-
let mut rng = StdRng::seed_from_u64(0);
108-
let values = (0..100_000)
109-
.map(|_| T::from(rng.random_range(10..100)).unwrap())
110-
.collect::<BufferMut<T>>();
111-
let array = create_for_bitpacked_array(values).unwrap();
112-
113-
let mask = (0..100_000)
114-
.map(|_| rng.random_bool(fraction_kept))
115-
.collect::<BooleanBuffer>();
116-
117-
let expr = lt(root(), lit(T::from_i32(2).unwrap()));
118-
let operator = expr.to_operator(&array).unwrap().unwrap();
119-
120-
bencher
121-
.with_inputs(|| (Mask::from_buffer(mask.clone()), operator.clone()))
122-
.bench_local_values(|(mask, operator)| {
123-
export_canonical_pipeline_expr(
124-
&DType::Bool(NonNullable),
125-
array.len(),
126-
operator.as_ref(),
127-
&mask,
128-
)
129-
.unwrap()
130-
});
131-
}
75+
// TODO(ngates): bring back benchmarks once operator API is stable.
76+
// #[divan::bench(types = [u8, u16, u32, u64], args = TRUE_COUNT)]
77+
// pub fn operator<T: Element + NativePType + Into<Scalar>>(bencher: Bencher, fraction_kept: f64) {
78+
// let mut rng = StdRng::seed_from_u64(0);
79+
// let values = (0..100_000)
80+
// .map(|_| T::from(rng.random_range(10..100)).unwrap())
81+
// .collect::<BufferMut<T>>();
82+
// let array = create_for_bitpacked_array(values).unwrap();
83+
//
84+
// let mask = (0..100_000)
85+
// .map(|_| rng.random_bool(fraction_kept))
86+
// .collect::<BooleanBuffer>();
87+
//
88+
// let expr = lt(root(), lit(T::from_i32(2).unwrap()));
89+
// let operator = expr.to_operator_unoptimized(&array).unwrap().unwrap();
90+
//
91+
// bencher
92+
// .with_inputs(|| Mask::from_buffer(mask.clone()))
93+
// .bench_local_values(|mask| {
94+
// export_canonical_pipeline_expr(
95+
// &DType::Bool(NonNullable),
96+
// array.len(),
97+
// operator.as_ref(),
98+
// &mask,
99+
// )
100+
// .unwrap()
101+
// });
102+
// }
103+
//
104+
// #[divan::bench(types = [u8, u16, u32, u64], args = TRUE_COUNT)]
105+
// pub fn pipeline_opt<T: Element + NativePType + Into<Scalar>>(bencher: Bencher, fraction_kept: f64) {
106+
// let mut rng = StdRng::seed_from_u64(0);
107+
// let values = (0..100_000)
108+
// .map(|_| T::from(rng.random_range(10..100)).unwrap())
109+
// .collect::<BufferMut<T>>();
110+
// let array = create_for_bitpacked_array(values).unwrap();
111+
//
112+
// let mask = (0..100_000)
113+
// .map(|_| rng.random_bool(fraction_kept))
114+
// .collect::<BooleanBuffer>();
115+
//
116+
// let expr = lt(root(), lit(T::from_i32(2).unwrap()));
117+
// let operator = expr.to_operator(&array).unwrap().unwrap();
118+
//
119+
// bencher
120+
// .with_inputs(|| (Mask::from_buffer(mask.clone()), operator.clone()))
121+
// .bench_local_values(|(mask, operator)| {
122+
// export_canonical_pipeline_expr(
123+
// &DType::Bool(NonNullable),
124+
// array.len(),
125+
// operator.as_ref(),
126+
// &mask,
127+
// )
128+
// .unwrap()
129+
// });
130+
// }

0 commit comments

Comments
 (0)