Skip to content

Commit 4f802b6

Browse files
gatesnasubiottopalaskasherlockbeardconnortsui20
authored
Lazy array expression evaluation (#5625)
This long-lived feature branch contain changes required for the "operators RFC", "Lazy Arrays", whatever we're calling it. The change is hidden behind the `VORTEX_OPERATORS=true` environment variable allowing us to run parallel benchmarks until we reach parity between legacy compute function dispatch and the new optimizer rules framework. Summary of changes: * Rename `execute` -> `execute_vector`, add similar `execute_mask` function. This pattern will be used when turning a Vortex Array into concrete physical data via whatever means, e.g. `execute_arrow`, `execute_duckdb`, and so on. * Introduces `trait Kernel` to represent a physical tree of CPU-based vector execution * `Array::batch_execute -> Array::bind_kernel` * Implements ScalarFnArray execution using a ScalarFnKernel, binding constant children to scalar datum inputs for the ScalarFn. * Reverts the change to pull out a `ScalarFnVTable`, in favor of allowing all `expr::VTable` to be used as scalar functions. This provides a smoother migration for end-users, at the expense of having to special-case the `root` expression during execution. * Expression now holds ScalarFn instance (vtable + options), as well as children. * Removes `ExpressionView`, easier to pass separate Expression + options into vtable. * `expr::VTable::execute` takes arguments by ownership. * Implements all `expr::VTable::execute` functions. * Adds `Array::filter` to vtable, providing an O(1) function for masking arrays. Fallback is to wrap up the child in a FilterArray. * Adds `Datum::ensure_vector` and fleshes out `Scalar::repeat` implementations. * Fixes expression display to not use debug formatting of options. * Removes untyped simplification, it was never called. * Removes ExprOptimizer in favor of a simple reduce_child `Expr::simplify` function. Complex reduction rules can be implemented over the array tree only to avoid duplication. * Introduces `Array::apply(&Expression)` to turn expressions into arrays. This function is the new API that will live in vortex-array and allow us to invert the expr -> array dependency to become array -> expr. --------- Signed-off-by: Nicholas Gates <[email protected]> Signed-off-by: Siddharth Kumar <[email protected]> Co-authored-by: Alfonso Subiotto Marqués <[email protected]> Co-authored-by: Baris Palaska <[email protected]> Co-authored-by: sherlockbeard <[email protected]> Co-authored-by: Connor Tsui <[email protected]>
1 parent ba0e97a commit 4f802b6

File tree

173 files changed

+5410
-5707
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

173 files changed

+5410
-5707
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/alp/src/alp/array.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use vortex_array::DeserializeMetadata;
1515
use vortex_array::Precision;
1616
use vortex_array::ProstMetadata;
1717
use vortex_array::SerializeMetadata;
18-
use vortex_array::execution::ExecutionCtx;
18+
use vortex_array::kernel::BindCtx;
19+
use vortex_array::kernel::KernelRef;
20+
use vortex_array::kernel::kernel;
1921
use vortex_array::patches::Patches;
2022
use vortex_array::patches::PatchesMetadata;
2123
use vortex_array::serde::ArrayChildren;
@@ -41,7 +43,6 @@ use vortex_error::VortexExpect;
4143
use vortex_error::VortexResult;
4244
use vortex_error::vortex_bail;
4345
use vortex_error::vortex_ensure;
44-
use vortex_vector::Vector;
4546

4647
use crate::ALPFloat;
4748
use crate::alp::Exponents;
@@ -140,17 +141,16 @@ impl VTable for ALPVTable {
140141
)
141142
}
142143

143-
fn batch_execute(array: &ALPArray, ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
144-
let encoded_vector = array.encoded().batch_execute(ctx)?;
145-
146-
let patches_vectors = if let Some(patches) = array.patches() {
144+
fn bind_kernel(array: &ALPArray, ctx: &mut BindCtx) -> VortexResult<KernelRef> {
145+
let encoded = array.encoded().bind_kernel(ctx)?;
146+
let patches_kernels = if let Some(patches) = array.patches() {
147147
Some((
148-
patches.indices().batch_execute(ctx)?,
149-
patches.values().batch_execute(ctx)?,
148+
patches.indices().bind_kernel(ctx)?,
149+
patches.values().bind_kernel(ctx)?,
150150
patches
151151
.chunk_offsets()
152152
.as_ref()
153-
.map(|co| co.batch_execute(ctx))
153+
.map(|co| co.bind_kernel(ctx))
154154
.transpose()?,
155155
))
156156
} else {
@@ -161,7 +161,24 @@ impl VTable for ALPVTable {
161161
let exponents = array.exponents();
162162

163163
match_each_alp_float_ptype!(array.dtype().as_ptype(), |T| {
164-
decompress_into_vector::<T>(encoded_vector, exponents, patches_vectors, patches_offset)
164+
Ok(kernel(move || {
165+
let encoded_vector = encoded.execute()?;
166+
let patches_vectors = match patches_kernels {
167+
Some((idx_kernel, val_kernel, co_kernel)) => Some((
168+
idx_kernel.execute()?,
169+
val_kernel.execute()?,
170+
co_kernel.map(|k| k.execute()).transpose()?,
171+
)),
172+
None => None,
173+
};
174+
175+
decompress_into_vector::<T>(
176+
encoded_vector,
177+
exponents,
178+
patches_vectors,
179+
patches_offset,
180+
)
181+
}))
165182
})
166183
}
167184
}
@@ -456,15 +473,18 @@ mod tests {
456473
use std::sync::LazyLock;
457474

458475
use rstest::rstest;
476+
use vortex_array::VectorExecutor;
459477
use vortex_array::arrays::PrimitiveArray;
478+
use vortex_array::session::ArraySession;
460479
use vortex_array::vtable::ValidityHelper;
461480
use vortex_dtype::PTypeDowncast;
462481
use vortex_session::VortexSession;
463482
use vortex_vector::VectorOps;
464483

465484
use super::*;
466485

467-
static SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::empty);
486+
static SESSION: LazyLock<VortexSession> =
487+
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
468488

469489
#[rstest]
470490
#[case(0)]
@@ -480,7 +500,7 @@ mod tests {
480500
let values = PrimitiveArray::from_iter((0..size).map(|i| i as f32));
481501
let encoded = alp_encode(&values, None).unwrap();
482502

483-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
503+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
484504
// Compare against the traditional array-based decompress path
485505
let expected = decompress_into_array(encoded);
486506

@@ -504,7 +524,7 @@ mod tests {
504524
let values = PrimitiveArray::from_iter((0..size).map(|i| i as f64));
505525
let encoded = alp_encode(&values, None).unwrap();
506526

507-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
527+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
508528
// Compare against the traditional array-based decompress path
509529
let expected = decompress_into_array(encoded);
510530

@@ -534,7 +554,7 @@ mod tests {
534554
let encoded = alp_encode(&array, None).unwrap();
535555
assert!(encoded.patches().unwrap().array_len() > 0);
536556

537-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
557+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
538558
// Compare against the traditional array-based decompress path
539559
let expected = decompress_into_array(encoded);
540560

@@ -562,7 +582,7 @@ mod tests {
562582
let array = PrimitiveArray::from_option_iter(values);
563583
let encoded = alp_encode(&array, None).unwrap();
564584

565-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
585+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
566586
// Compare against the traditional array-based decompress path
567587
let expected = decompress_into_array(encoded);
568588

@@ -601,7 +621,7 @@ mod tests {
601621
let encoded = alp_encode(&array, None).unwrap();
602622
assert!(encoded.patches().unwrap().array_len() > 0);
603623

604-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
624+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
605625
// Compare against the traditional array-based decompress path
606626
let expected = decompress_into_array(encoded);
607627

@@ -643,7 +663,7 @@ mod tests {
643663
let slice_len = slice_end - slice_start;
644664
let sliced_encoded = encoded.slice(slice_start..slice_end);
645665

646-
let result_vector = sliced_encoded.execute(&SESSION).unwrap();
666+
let result_vector = sliced_encoded.execute_vector_optimized(&SESSION).unwrap();
647667
let result_primitive = result_vector.into_primitive().into_f64();
648668

649669
for idx in 0..slice_len {

encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ mod tests {
204204
use std::sync::LazyLock;
205205

206206
use vortex_array::IntoArray;
207+
use vortex_array::VectorExecutor;
207208
use vortex_array::assert_arrays_eq;
208209
use vortex_array::validity::Validity;
209210
use vortex_buffer::Buffer;
@@ -536,7 +537,7 @@ mod tests {
536537
let unpacked_array = unpack_array(&bitpacked);
537538

538539
// Method 3: Using the execute() method (this is what would be used in production).
539-
let executed = bitpacked.into_array().execute(&SESSION).unwrap();
540+
let executed = bitpacked.into_array().execute_vector(&SESSION).unwrap();
540541

541542
// All three should produce the same length.
542543
assert_eq!(vector_result.len(), array.len(), "vector length mismatch");
@@ -556,7 +557,10 @@ mod tests {
556557

557558
// Verify that the execute() method works correctly by comparing with unpack_array.
558559
// We convert unpack_array result to a vector using execute() to compare.
559-
let unpacked_executed = unpacked_array.into_array().execute(&SESSION).unwrap();
560+
let unpacked_executed = unpacked_array
561+
.into_array()
562+
.execute_vector(&SESSION)
563+
.unwrap();
560564
match (&executed, &unpacked_executed) {
561565
(Vector::Primitive(exec_pv), Vector::Primitive(unpack_pv)) => {
562566
assert_eq!(
@@ -593,7 +597,7 @@ mod tests {
593597
let sliced_bp = sliced.as_::<BitPackedVTable>();
594598
let vector_result = unpack_to_primitive_vector(sliced_bp);
595599
let unpacked_array = unpack_array(sliced_bp);
596-
let executed = sliced.execute(&SESSION).unwrap();
600+
let executed = sliced.execute_vector(&SESSION).unwrap();
597601

598602
assert_eq!(
599603
vector_result.len(),

encodings/fastlanes/src/bitpacking/vtable/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
use vortex_array::DeserializeMetadata;
55
use vortex_array::ProstMetadata;
66
use vortex_array::SerializeMetadata;
7-
use vortex_array::execution::ExecutionCtx;
7+
use vortex_array::kernel::BindCtx;
8+
use vortex_array::kernel::KernelRef;
9+
use vortex_array::kernel::kernel;
810
use vortex_array::patches::Patches;
911
use vortex_array::patches::PatchesMetadata;
1012
use vortex_array::serde::ArrayChildren;
@@ -23,7 +25,6 @@ use vortex_error::VortexError;
2325
use vortex_error::VortexResult;
2426
use vortex_error::vortex_bail;
2527
use vortex_error::vortex_err;
26-
use vortex_vector::Vector;
2728
use vortex_vector::VectorMutOps;
2829

2930
use crate::BitPackedArray;
@@ -172,8 +173,11 @@ impl VTable for BitPackedVTable {
172173
)
173174
}
174175

175-
fn batch_execute(array: &BitPackedArray, _ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
176-
Ok(unpack_to_primitive_vector(array).freeze().into())
176+
fn bind_kernel(array: &BitPackedArray, _ctx: &mut BindCtx) -> VortexResult<KernelRef> {
177+
let array = array.clone();
178+
Ok(kernel(move || {
179+
Ok(unpack_to_primitive_vector(&array).freeze().into())
180+
}))
177181
}
178182
}
179183

encodings/runend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ vortex-dtype = { workspace = true }
2424
vortex-error = { workspace = true }
2525
vortex-mask = { workspace = true }
2626
vortex-scalar = { workspace = true }
27+
vortex-session = { workspace = true }
2728

2829
[lints]
2930
workspace = true

encodings/runend/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod compress;
1111
mod compute;
1212
mod iter;
1313
mod ops;
14+
mod rules;
1415

1516
#[doc(hidden)]
1617
pub mod _benchmarking {
@@ -23,11 +24,17 @@ pub mod _benchmarking {
2324
use vortex_array::ArrayBufferVisitor;
2425
use vortex_array::ArrayChildVisitor;
2526
use vortex_array::Canonical;
27+
use vortex_array::session::ArraySession;
28+
use vortex_array::session::ArraySessionExt;
29+
use vortex_array::vtable::ArrayVTableExt;
2630
use vortex_array::vtable::EncodeVTable;
2731
use vortex_array::vtable::VisitorVTable;
2832
use vortex_error::VortexResult;
33+
use vortex_session::SessionExt;
34+
use vortex_session::VortexSession;
2935

3036
use crate::compress::runend_encode;
37+
use crate::rules::RunEndScalarFnRule;
3138

3239
impl EncodeVTable<RunEndVTable> for RunEndVTable {
3340
fn encode(
@@ -59,6 +66,15 @@ impl VisitorVTable<RunEndVTable> for RunEndVTable {
5966
}
6067
}
6168

69+
/// Initialize run-end encoding in the given session.
70+
pub fn initialize(session: &mut VortexSession) {
71+
session.arrays().register(RunEndVTable.as_vtable());
72+
session
73+
.get_mut::<ArraySession>()
74+
.optimizer_mut()
75+
.register_parent_rule(RunEndScalarFnRule);
76+
}
77+
6278
#[cfg(test)]
6379
mod tests {
6480
use vortex_array::ProstMetadata;

encodings/runend/src/rules.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_array::ArrayRef;
5+
use vortex_array::IntoArray;
6+
use vortex_array::arrays::AnyScalarFn;
7+
use vortex_array::arrays::ConstantArray;
8+
use vortex_array::arrays::ConstantVTable;
9+
use vortex_array::arrays::ScalarFnArray;
10+
use vortex_array::optimizer::rules::ArrayParentReduceRule;
11+
use vortex_array::optimizer::rules::Exact;
12+
use vortex_dtype::DType;
13+
use vortex_error::VortexResult;
14+
15+
use crate::RunEndArray;
16+
use crate::RunEndVTable;
17+
18+
/// A rule to push down scalar functions through run-end encoding into the values array.
19+
///
20+
/// This only works if all other children of the scalar function array are constants.
21+
#[derive(Debug)]
22+
pub(crate) struct RunEndScalarFnRule;
23+
24+
impl ArrayParentReduceRule<Exact<RunEndVTable>, AnyScalarFn> for RunEndScalarFnRule {
25+
fn child(&self) -> Exact<RunEndVTable> {
26+
Exact::from(&RunEndVTable)
27+
}
28+
29+
fn parent(&self) -> AnyScalarFn {
30+
AnyScalarFn
31+
}
32+
33+
fn reduce_parent(
34+
&self,
35+
run_end: &RunEndArray,
36+
parent: &ScalarFnArray,
37+
child_idx: usize,
38+
) -> VortexResult<Option<ArrayRef>> {
39+
for (idx, child) in parent.children().iter().enumerate() {
40+
if idx == child_idx {
41+
// Skip ourselves
42+
continue;
43+
}
44+
45+
if !child.is::<ConstantVTable>() {
46+
// We can only push down if all other children are constants
47+
return Ok(None);
48+
}
49+
}
50+
51+
// TODO(ngates): relax this constraint and implement run-end decoding for all vector types.
52+
if !matches!(parent.dtype(), DType::Bool(_) | DType::Primitive(..)) {
53+
return Ok(None);
54+
}
55+
56+
let values_len = run_end.values().len();
57+
let mut new_children = parent.children();
58+
for (idx, child) in new_children.iter_mut().enumerate() {
59+
if idx == child_idx {
60+
// Replace ourselves with run end values
61+
*child = run_end.values().clone();
62+
continue;
63+
}
64+
65+
// Replace other children with their constant scalar value with length adjusted
66+
// to the length of the run end values.
67+
let constant = child.as_::<ConstantVTable>();
68+
*child = ConstantArray::new(constant.scalar().clone(), values_len).into_array();
69+
}
70+
71+
let new_values =
72+
ScalarFnArray::try_new(parent.scalar_fn().clone(), new_children, values_len)?
73+
.into_array();
74+
75+
Ok(Some(
76+
RunEndArray::try_new_offset_length(
77+
run_end.ends().clone(),
78+
new_values,
79+
run_end.offset(),
80+
run_end.len(),
81+
)?
82+
.into_array(),
83+
))
84+
}
85+
}

0 commit comments

Comments
 (0)