Skip to content

Commit f185fdb

Browse files
committed
Arrow executor
Signed-off-by: Nicholas Gates <[email protected]>
1 parent be51fc0 commit f185fdb

File tree

21 files changed

+286
-127
lines changed

21 files changed

+286
-127
lines changed

encodings/alp/src/alp/array.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ mod tests {
473473
use std::sync::LazyLock;
474474

475475
use rstest::rstest;
476+
use vortex_array::VectorExecutor;
476477
use vortex_array::arrays::PrimitiveArray;
477478
use vortex_array::vtable::ValidityHelper;
478479
use vortex_dtype::PTypeDowncast;
@@ -497,7 +498,7 @@ mod tests {
497498
let values = PrimitiveArray::from_iter((0..size).map(|i| i as f32));
498499
let encoded = alp_encode(&values, None).unwrap();
499500

500-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
501+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
501502
// Compare against the traditional array-based decompress path
502503
let expected = decompress_into_array(encoded);
503504

@@ -521,7 +522,7 @@ mod tests {
521522
let values = PrimitiveArray::from_iter((0..size).map(|i| i as f64));
522523
let encoded = alp_encode(&values, None).unwrap();
523524

524-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
525+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
525526
// Compare against the traditional array-based decompress path
526527
let expected = decompress_into_array(encoded);
527528

@@ -551,7 +552,7 @@ mod tests {
551552
let encoded = alp_encode(&array, None).unwrap();
552553
assert!(encoded.patches().unwrap().array_len() > 0);
553554

554-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
555+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
555556
// Compare against the traditional array-based decompress path
556557
let expected = decompress_into_array(encoded);
557558

@@ -579,7 +580,7 @@ mod tests {
579580
let array = PrimitiveArray::from_option_iter(values);
580581
let encoded = alp_encode(&array, None).unwrap();
581582

582-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
583+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
583584
// Compare against the traditional array-based decompress path
584585
let expected = decompress_into_array(encoded);
585586

@@ -618,7 +619,7 @@ mod tests {
618619
let encoded = alp_encode(&array, None).unwrap();
619620
assert!(encoded.patches().unwrap().array_len() > 0);
620621

621-
let result_vector = encoded.to_array().execute(&SESSION).unwrap();
622+
let result_vector = encoded.to_array().execute_vector(&SESSION).unwrap();
622623
// Compare against the traditional array-based decompress path
623624
let expected = decompress_into_array(encoded);
624625

@@ -660,7 +661,7 @@ mod tests {
660661
let slice_len = slice_end - slice_start;
661662
let sliced_encoded = encoded.slice(slice_start..slice_end);
662663

663-
let result_vector = sliced_encoded.execute(&SESSION).unwrap();
664+
let result_vector = sliced_encoded.execute_vector_optimized(&SESSION).unwrap();
664665
let result_primitive = result_vector.into_primitive().into_f64();
665666

666667
for idx in 0..slice_len {

encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ mod tests {
204204
use std::sync::LazyLock;
205205

206206
use vortex_array::IntoArray;
207+
use vortex_array::VectorExecutor;
207208
use vortex_array::assert_arrays_eq;
208209
use vortex_array::validity::Validity;
209210
use vortex_buffer::Buffer;
@@ -536,7 +537,7 @@ mod tests {
536537
let unpacked_array = unpack_array(&bitpacked);
537538

538539
// Method 3: Using the execute() method (this is what would be used in production).
539-
let executed = bitpacked.into_array().execute(&SESSION).unwrap();
540+
let executed = bitpacked.into_array().execute_vector(&SESSION).unwrap();
540541

541542
// All three should produce the same length.
542543
assert_eq!(vector_result.len(), array.len(), "vector length mismatch");
@@ -556,7 +557,10 @@ mod tests {
556557

557558
// Verify that the execute() method works correctly by comparing with unpack_array.
558559
// We convert unpack_array result to a vector using execute() to compare.
559-
let unpacked_executed = unpacked_array.into_array().execute(&SESSION).unwrap();
560+
let unpacked_executed = unpacked_array
561+
.into_array()
562+
.execute_vector(&SESSION)
563+
.unwrap();
560564
match (&executed, &unpacked_executed) {
561565
(Vector::Primitive(exec_pv), Vector::Primitive(unpack_pv)) => {
562566
assert_eq!(
@@ -593,7 +597,7 @@ mod tests {
593597
let sliced_bp = sliced.as_::<BitPackedVTable>();
594598
let vector_result = unpack_to_primitive_vector(sliced_bp);
595599
let unpacked_array = unpack_array(sliced_bp);
596-
let executed = sliced.execute(&SESSION).unwrap();
600+
let executed = sliced.execute_vector(&SESSION).unwrap();
597601

598602
assert_eq!(
599603
vector_result.len(),

vortex-array/src/array/mod.rs

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@ use vortex_error::vortex_ensure;
2222
use vortex_error::vortex_panic;
2323
use vortex_mask::Mask;
2424
use vortex_scalar::Scalar;
25-
use vortex_session::VortexSession;
26-
use vortex_vector::Vector;
27-
use vortex_vector::VectorOps;
2825

2926
use crate::ArrayEq;
3027
use crate::ArrayHash;
@@ -372,35 +369,6 @@ impl dyn Array + '_ {
372369
}
373370
nbytes
374371
}
375-
376-
/// Execute the array and return the resulting vector.
377-
///
378-
/// This entry-point function will choose an appropriate CPU-based execution strategy.
379-
pub fn execute(&self, session: &VortexSession) -> VortexResult<Vector> {
380-
let mut ctx = BindCtx::new(session.clone());
381-
382-
// NOTE(ngates): in the future we can choose a different mode of execution, or run
383-
// optimization here, etc.
384-
let kernel = self.bind_kernel(&mut ctx)?;
385-
let result = kernel.execute()?;
386-
387-
vortex_ensure!(
388-
result.len() == self.len(),
389-
"Result length mismatch for {}",
390-
self.encoding_id()
391-
);
392-
393-
#[cfg(debug_assertions)]
394-
{
395-
vortex_ensure!(
396-
vortex_vector::vector_matches_dtype(&result, self.dtype()),
397-
"Executed vector dtype mismatch for {}",
398-
self.encoding_id(),
399-
);
400-
}
401-
402-
Ok(result)
403-
}
404372
}
405373

406374
/// Trait for converting a type into a Vortex [`ArrayRef`].

vortex-array/src/arrays/scalar_fn/vtable/canonical.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
use itertools::Itertools;
55
use vortex_error::VortexExpect;
6-
use vortex_vector::Datum;
76

87
use crate::Array;
98
use crate::Canonical;
109
use crate::arrays::LEGACY_SESSION;
1110
use crate::arrays::scalar_fn::array::ScalarFnArray;
1211
use crate::arrays::scalar_fn::vtable::ScalarFnVTable;
12+
use crate::executor::VectorExecutor;
1313
use crate::expr::ExecutionArgs;
1414
use crate::vectors::VectorIntoArray;
1515
use crate::vtable::CanonicalVTable;
@@ -20,8 +20,7 @@ impl CanonicalVTable<ScalarFnVTable> for ScalarFnVTable {
2020
let child_datums: Vec<_> = array
2121
.children()
2222
.iter()
23-
// TODO(ngates): we could make all execution operate over datums
24-
.map(|child| child.execute(&LEGACY_SESSION).map(Datum::Vector))
23+
.map(|child| child.execute_datum_optimized(&LEGACY_SESSION))
2524
.try_collect()
2625
// FIXME(ngates): canonicalizing really ought to be fallible
2726
.vortex_expect(

vortex-array/src/arrays/scalar_fn/vtable/validity.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33

44
use vortex_error::VortexExpect;
55
use vortex_mask::Mask;
6+
use vortex_vector::Datum;
7+
use vortex_vector::ScalarOps;
8+
use vortex_vector::VectorOps;
69

710
use crate::Array;
811
use crate::arrays::LEGACY_SESSION;
912
use crate::arrays::scalar_fn::array::ScalarFnArray;
1013
use crate::arrays::scalar_fn::vtable::ScalarFnVTable;
14+
use crate::executor::VectorExecutor;
1115
use crate::vtable::ValidityVTable;
1216

1317
impl ValidityVTable<ScalarFnVTable> for ScalarFnVTable {
@@ -46,9 +50,13 @@ impl ValidityVTable<ScalarFnVTable> for ScalarFnVTable {
4650
}
4751

4852
fn validity_mask(array: &ScalarFnArray) -> Mask {
49-
let vector = array
50-
.execute(&LEGACY_SESSION)
53+
let datum = array
54+
.to_array()
55+
.execute_datum(&LEGACY_SESSION)
5156
.vortex_expect("Validity mask computation should be fallible");
52-
Mask::from_buffer(vector.into_bool().into_bits())
57+
match datum {
58+
Datum::Scalar(s) => Mask::new(array.len, s.is_valid()),
59+
Datum::Vector(v) => v.validity().clone(),
60+
}
5361
}
5462
}

vortex-array/src/executor.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::VortexResult;
5+
use vortex_error::vortex_ensure;
6+
use vortex_session::VortexSession;
7+
use vortex_vector::Datum;
8+
use vortex_vector::Vector;
9+
use vortex_vector::VectorOps;
10+
11+
use crate::Array;
12+
use crate::ArrayRef;
13+
use crate::arrays::ConstantVTable;
14+
use crate::kernel::BindCtx;
15+
use crate::session::ArraySessionExt;
16+
17+
/// Executor for exporting a Vortex [`Vector`] or [`Datum`] from an [`ArrayRef`].
18+
pub trait VectorExecutor {
19+
/// Execute the array and return the resulting datum after running the optimizer.
20+
fn execute_datum_optimized(&self, session: &VortexSession) -> VortexResult<Datum>;
21+
/// Execute the array and return the resulting datum.
22+
fn execute_datum(&self, session: &VortexSession) -> VortexResult<Datum>;
23+
/// Execute the array and return the resulting vector after running the optimizer.
24+
fn execute_vector_optimized(&self, session: &VortexSession) -> VortexResult<Vector>;
25+
/// Execute the array and return the resulting vector.
26+
fn execute_vector(&self, session: &VortexSession) -> VortexResult<Vector>;
27+
}
28+
29+
impl VectorExecutor for ArrayRef {
30+
fn execute_datum_optimized(&self, session: &VortexSession) -> VortexResult<Datum> {
31+
session
32+
.arrays()
33+
.optimizer()
34+
.optimize_array(self)?
35+
.execute_datum(session)
36+
}
37+
38+
fn execute_datum(&self, session: &VortexSession) -> VortexResult<Datum> {
39+
// Attempt to short-circuit constant arrays.
40+
if let Some(constant) = self.as_opt::<ConstantVTable>() {
41+
return Ok(Datum::Scalar(constant.scalar().to_vector_scalar()));
42+
}
43+
44+
let mut ctx = BindCtx::new(session.clone());
45+
46+
// NOTE(ngates): in the future we can choose a different mode of execution, or run
47+
// optimization here, etc.
48+
let kernel = self.bind_kernel(&mut ctx)?;
49+
let result = kernel.execute()?;
50+
51+
vortex_ensure!(
52+
result.len() == self.len(),
53+
"Result length mismatch for {}",
54+
self.encoding_id()
55+
);
56+
57+
#[cfg(debug_assertions)]
58+
{
59+
vortex_ensure!(
60+
vortex_vector::vector_matches_dtype(&result, self.dtype()),
61+
"Executed vector dtype mismatch for {}",
62+
self.encoding_id(),
63+
);
64+
}
65+
66+
Ok(Datum::Vector(result))
67+
}
68+
69+
fn execute_vector_optimized(&self, session: &VortexSession) -> VortexResult<Vector> {
70+
session
71+
.arrays()
72+
.optimizer()
73+
.optimize_array(self)?
74+
.execute_vector(session)
75+
}
76+
77+
fn execute_vector(&self, session: &VortexSession) -> VortexResult<Vector> {
78+
let len = self.len();
79+
Ok(self.execute_datum(session)?.ensure_vector(len))
80+
}
81+
}

vortex-array/src/expr/expression.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use vortex_dtype::DType;
1414
use vortex_error::VortexExpect;
1515
use vortex_error::VortexResult;
1616
use vortex_error::vortex_ensure;
17-
use vortex_error::vortex_panic;
1817

1918
use crate::ArrayRef;
2019
use crate::expr::Root;
@@ -124,12 +123,11 @@ impl Expression {
124123
}
125124

126125
/// Evaluates the expression in the given scope, returning an array.
127-
pub fn evaluate(&self, _scope: &ArrayRef) -> VortexResult<ArrayRef> {
128-
vortex_panic!("DEPRECATED");
129-
// if self.is::<Root>() {
130-
// return Ok(scope.clone());
131-
// }
132-
// self.scalar_fn.evaluate(self, scope)
126+
pub fn evaluate(&self, scope: &ArrayRef) -> VortexResult<ArrayRef> {
127+
if self.is::<Root>() {
128+
return Ok(scope.clone());
129+
}
130+
self.scalar_fn.evaluate(self, scope)
133131
}
134132

135133
/// An expression over zone-statistics which implies all records in the zone evaluate to false.

vortex-array/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
pub use array::*;
1616
pub use canonical::*;
1717
pub use context::*;
18+
pub use executor::*;
1819
pub use hash::*;
1920
pub use mask_future::*;
2021
pub use metadata::*;
@@ -30,12 +31,13 @@ mod canonical;
3031
pub mod compute;
3132
mod context;
3233
pub mod display;
34+
mod executor;
3335
pub mod expr;
3436
mod expression;
3537
mod hash;
3638
pub mod iter;
3739
pub mod kernel;
38-
mod mask;
40+
pub mod mask;
3941
mod mask_future;
4042
mod metadata;
4143
pub mod optimizer;

0 commit comments

Comments
 (0)