Skip to content

Commit bd25e4a

Browse files
authored
Move Array BatchKernel to sync API (#5090)
* It's a little aggressive to require an async runtime simply to decompress in-memory data. * Async expressions tend to require more powerful APIs anyway. For example, returning partial results since the output can be much larger than the input (not common for in-memory scalar compute). As such, I propose implementing async expressions as layout readers (with an improved API), see #5089 Signed-off-by: Nicholas Gates <[email protected]>
1 parent 1ff173a commit bd25e4a

File tree

7 files changed

+100
-107
lines changed

7 files changed

+100
-107
lines changed

vortex-array/src/array/operator.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,62 +3,58 @@
33

44
use std::sync::Arc;
55

6-
use async_trait::async_trait;
76
use vortex_error::VortexResult;
87
use vortex_vector::Vector;
98

10-
use crate::execution::{BatchKernel, BindCtx};
9+
use crate::execution::{BatchKernelRef, BindCtx};
1110
use crate::vtable::{OperatorVTable, VTable};
1211
use crate::{Array, ArrayAdapter, ArrayRef};
1312

1413
/// Array functions as provided by the `OperatorVTable`.
1514
///
1615
/// Note: the public functions such as "execute" should move onto the main `Array` trait when
1716
/// operators is stabilized. The other functions should remain on a `pub(crate)` trait.
18-
#[async_trait]
1917
pub trait ArrayOperator: 'static + Send + Sync {
2018
/// Execute the array producing a canonical vector.
21-
async fn execute(&self) -> VortexResult<Vector> {
22-
self.execute_with_selection(None).await
19+
fn execute(&self) -> VortexResult<Vector> {
20+
self.execute_with_selection(None)
2321
}
2422

2523
/// Execute the array with a selection mask, producing a canonical vector.
26-
async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector>;
24+
fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector>;
2725

2826
/// Bind the array to a batch kernel. This is an internal function
2927
fn bind(
3028
&self,
3129
selection: Option<&ArrayRef>,
3230
ctx: &mut dyn BindCtx,
33-
) -> VortexResult<BatchKernel>;
31+
) -> VortexResult<BatchKernelRef>;
3432
}
3533

36-
#[async_trait]
3734
impl ArrayOperator for Arc<dyn Array> {
38-
async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector> {
39-
self.as_ref().execute_with_selection(selection).await
35+
fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector> {
36+
self.as_ref().execute_with_selection(selection)
4037
}
4138

4239
fn bind(
4340
&self,
4441
selection: Option<&ArrayRef>,
4542
ctx: &mut dyn BindCtx,
46-
) -> VortexResult<BatchKernel> {
43+
) -> VortexResult<BatchKernelRef> {
4744
self.as_ref().bind(selection, ctx)
4845
}
4946
}
5047

51-
#[async_trait]
5248
impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
53-
async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector> {
54-
self.bind(selection, &mut ())?.await
49+
fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector> {
50+
self.bind(selection, &mut ())?.execute()
5551
}
5652

5753
fn bind(
5854
&self,
5955
selection: Option<&ArrayRef>,
6056
ctx: &mut dyn BindCtx,
61-
) -> VortexResult<BatchKernel> {
57+
) -> VortexResult<BatchKernelRef> {
6258
<V::OperatorVTable as OperatorVTable<V>>::bind(&self.0, selection, ctx)
6359
}
6460
}
@@ -69,7 +65,7 @@ impl BindCtx for () {
6965
&mut self,
7066
array: &ArrayRef,
7167
selection: Option<&ArrayRef>,
72-
) -> VortexResult<BatchKernel> {
68+
) -> VortexResult<BatchKernelRef> {
7369
array.bind(selection, self)
7470
}
7571
}
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,33 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use futures::{FutureExt, try_join};
54
use vortex_compute::filter::Filter;
65
use vortex_error::VortexResult;
76
use vortex_vector::BoolVector;
87

98
use crate::ArrayRef;
109
use crate::arrays::{BoolArray, BoolVTable};
11-
use crate::execution::{BatchKernel, BindCtx};
10+
use crate::execution::{BatchKernelRef, BindCtx, kernel};
1211
use crate::vtable::{OperatorVTable, ValidityHelper};
1312

1413
impl OperatorVTable<BoolVTable> for BoolVTable {
1514
fn bind(
1615
array: &BoolArray,
1716
selection: Option<&ArrayRef>,
1817
ctx: &mut dyn BindCtx,
19-
) -> VortexResult<BatchKernel> {
18+
) -> VortexResult<BatchKernelRef> {
2019
let bits = array.buffer.clone();
2120
let mask = ctx.bind_selection(array.len(), selection)?;
2221
let validity = ctx.bind_validity(array.validity(), array.len(), selection)?;
2322

24-
Ok(async move {
25-
let (mask, validity) = try_join!(mask, validity)?;
23+
Ok(kernel(move || {
24+
let mask = mask.execute()?;
25+
let validity = validity.execute()?;
2626

2727
// Note that validity already has the mask applied so we only need to apply it to bits.
2828
let bits = bits.filter(&mask);
2929

3030
Ok(BoolVector::new(bits, validity).into())
31-
}
32-
.boxed())
31+
}))
3332
}
3433
}

vortex-array/src/compute/arrays/logical.rs

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::hash::{Hash, Hasher};
55
use std::sync::LazyLock;
66

77
use enum_map::{Enum, EnumMap, enum_map};
8-
use futures::{FutureExt, try_join};
98
use vortex_buffer::ByteBuffer;
109
use vortex_compute::logical::{
1110
LogicalAnd, LogicalAndKleene, LogicalAndNot, LogicalOr, LogicalOrKleene,
@@ -14,7 +13,7 @@ use vortex_dtype::DType;
1413
use vortex_error::VortexResult;
1514
use vortex_vector::BoolVector;
1615

17-
use crate::execution::{BatchKernel, BindCtx};
16+
use crate::execution::{BatchKernelRef, BindCtx, kernel};
1817
use crate::serde::ArrayChildren;
1918
use crate::stats::{ArrayStats, StatsSetRef};
2019
use crate::vtable::{
@@ -183,37 +182,35 @@ impl OperatorVTable<LogicalVTable> for LogicalVTable {
183182
array: &LogicalArray,
184183
selection: Option<&ArrayRef>,
185184
ctx: &mut dyn BindCtx,
186-
) -> VortexResult<BatchKernel> {
185+
) -> VortexResult<BatchKernelRef> {
187186
let lhs = ctx.bind(&array.lhs, selection)?;
188187
let rhs = ctx.bind(&array.rhs, selection)?;
189188

190189
Ok(match array.operator() {
191-
LogicalOperator::And => kernel(lhs, rhs, |l, r| l.and(&r)),
192-
LogicalOperator::AndKleene => kernel(lhs, rhs, |l, r| l.and_kleene(&r)),
193-
LogicalOperator::Or => kernel(lhs, rhs, |l, r| l.or(&r)),
194-
LogicalOperator::OrKleene => kernel(lhs, rhs, |l, r| l.or_kleene(&r)),
195-
LogicalOperator::AndNot => kernel(lhs, rhs, |l, r| l.and_not(&r)),
190+
LogicalOperator::And => logical_kernel(lhs, rhs, |l, r| l.and(&r)),
191+
LogicalOperator::AndKleene => logical_kernel(lhs, rhs, |l, r| l.and_kleene(&r)),
192+
LogicalOperator::Or => logical_kernel(lhs, rhs, |l, r| l.or(&r)),
193+
LogicalOperator::OrKleene => logical_kernel(lhs, rhs, |l, r| l.or_kleene(&r)),
194+
LogicalOperator::AndNot => logical_kernel(lhs, rhs, |l, r| l.and_not(&r)),
196195
})
197196
}
198197
}
199198

200199
/// Batch execution kernel for logical operations.
201-
fn kernel<O>(lhs: BatchKernel, rhs: BatchKernel, op: O) -> BatchKernel
200+
fn logical_kernel<O>(lhs: BatchKernelRef, rhs: BatchKernelRef, op: O) -> BatchKernelRef
202201
where
203202
O: Fn(BoolVector, BoolVector) -> BoolVector + Send + 'static,
204203
{
205-
async move {
206-
let (lhs, rhs) = try_join!(lhs, rhs)?;
207-
let (lhs, rhs) = (lhs.into_bool(), rhs.into_bool());
204+
kernel(move || {
205+
let lhs = lhs.execute()?.into_bool();
206+
let rhs = rhs.execute()?.into_bool();
208207
Ok(op(lhs, rhs).into())
209-
}
210-
.boxed()
208+
})
211209
}
212210

213211
#[cfg(test)]
214212
mod tests {
215213
use vortex_buffer::bitbuffer;
216-
use vortex_io::runtime::single::block_on;
217214

218215
use crate::compute::arrays::logical::{LogicalArray, LogicalOperator};
219216
use crate::{ArrayOperator, ArrayRef, IntoArray};
@@ -224,28 +221,23 @@ mod tests {
224221

225222
#[test]
226223
fn test_and() {
227-
block_on(|_| async {
228-
let lhs = bitbuffer![0 1 0].into_array();
229-
let rhs = bitbuffer![0 1 1].into_array();
230-
let result = and_(lhs, rhs).execute().await.unwrap().into_bool();
231-
assert_eq!(result.bits(), &bitbuffer![0 1 0]);
232-
})
224+
let lhs = bitbuffer![0 1 0].into_array();
225+
let rhs = bitbuffer![0 1 1].into_array();
226+
let result = and_(lhs, rhs).execute().unwrap().into_bool();
227+
assert_eq!(result.bits(), &bitbuffer![0 1 0]);
233228
}
234229

235230
#[test]
236231
fn test_and_selected() {
237-
block_on(|_| async {
238-
let lhs = bitbuffer![0 1 0].into_array();
239-
let rhs = bitbuffer![0 1 1].into_array();
240-
241-
let selection = bitbuffer![0 1 1].into_array();
242-
243-
let result = and_(lhs, rhs)
244-
.execute_with_selection(Some(&selection))
245-
.await
246-
.unwrap()
247-
.into_bool();
248-
assert_eq!(result.bits(), &bitbuffer![1 0]);
249-
})
232+
let lhs = bitbuffer![0 1 0].into_array();
233+
let rhs = bitbuffer![0 1 1].into_array();
234+
235+
let selection = bitbuffer![0 1 1].into_array();
236+
237+
let result = and_(lhs, rhs)
238+
.execute_with_selection(Some(&selection))
239+
.unwrap()
240+
.into_bool();
241+
assert_eq!(result.bits(), &bitbuffer![1 0]);
250242
}
251243
}
Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,31 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use futures::future::BoxFuture;
54
use vortex_error::VortexResult;
65
use vortex_vector::Vector;
76

87
use crate::ArrayRef;
98

109
/// Type-alias for heap-allocated batch execution kernels.
11-
pub type BatchKernel = BoxFuture<'static, VortexResult<Vector>>;
10+
pub type BatchKernelRef = Box<dyn BatchKernel>;
11+
12+
/// Trait for batch execution kernels that produce a vector result.
13+
pub trait BatchKernel: 'static + Send {
14+
fn execute(self: Box<Self>) -> VortexResult<Vector>;
15+
}
16+
17+
/// Adapter to create a batch kernel from a closure.
18+
pub struct BatchKernelAdapter<F>(F);
19+
impl<F: FnOnce() -> VortexResult<Vector> + Send + 'static> BatchKernel for BatchKernelAdapter<F> {
20+
fn execute(self: Box<Self>) -> VortexResult<Vector> {
21+
self.0()
22+
}
23+
}
24+
25+
/// Create a batch execution kernel from the given closure.
26+
pub fn kernel<F: FnOnce() -> VortexResult<Vector> + Send + 'static>(f: F) -> BatchKernelRef {
27+
Box::new(BatchKernelAdapter(f))
28+
}
1229

1330
/// Context for binding batch execution kernels.
1431
///
@@ -17,6 +34,9 @@ pub type BatchKernel = BoxFuture<'static, VortexResult<Vector>>;
1734
pub trait BindCtx {
1835
/// Bind the given array and optional selection to produce a batch kernel, possibly reusing
1936
/// previously bound results from this context.
20-
fn bind(&mut self, array: &ArrayRef, selection: Option<&ArrayRef>)
21-
-> VortexResult<BatchKernel>;
37+
fn bind(
38+
&mut self,
39+
array: &ArrayRef,
40+
selection: Option<&ArrayRef>,
41+
) -> VortexResult<BatchKernelRef>;
2242
}

vortex-array/src/execution/mask.rs

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use std::pin::Pin;
5-
use std::task::{Context, Poll};
6-
7-
use futures::FutureExt;
8-
use futures::future::BoxFuture;
94
use vortex_dtype::DType;
105
use vortex_dtype::Nullability::NonNullable;
116
use vortex_error::{VortexExpect, VortexResult, vortex_bail};
@@ -17,23 +12,19 @@ use crate::execution::BindCtx;
1712
pub enum MaskExecution {
1813
AllTrue(usize),
1914
AllFalse(usize),
20-
Future(BoxFuture<'static, VortexResult<Mask>>),
15+
Lazy(Box<dyn FnOnce() -> VortexResult<Mask> + Send + 'static>),
2116
}
2217

23-
impl Future for MaskExecution {
24-
type Output = VortexResult<Mask>;
18+
impl MaskExecution {
19+
pub fn lazy<F: FnOnce() -> VortexResult<Mask> + Send + 'static>(f: F) -> MaskExecution {
20+
MaskExecution::Lazy(Box::new(f))
21+
}
2522

26-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27-
match self.get_mut() {
28-
MaskExecution::AllTrue(len) => {
29-
let mask = Mask::new_true(*len);
30-
Poll::Ready(Ok(mask))
31-
}
32-
MaskExecution::AllFalse(len) => {
33-
let mask = Mask::new_false(*len);
34-
Poll::Ready(Ok(mask))
35-
}
36-
MaskExecution::Future(fut) => fut.poll_unpin(cx),
23+
pub fn execute(self) -> VortexResult<Mask> {
24+
match self {
25+
MaskExecution::AllTrue(len) => Ok(Mask::new_true(len)),
26+
MaskExecution::AllFalse(len) => Ok(Mask::new_false(len)),
27+
MaskExecution::Lazy(f) => f(),
3728
}
3829
}
3930
}
@@ -87,12 +78,9 @@ impl dyn BindCtx + '_ {
8778

8879
// If none of the above patterns match, we fall back to canonicalizing.
8980
let execution = self.bind(mask, None)?;
90-
Ok(MaskExecution::Future(
91-
async move {
92-
let mask = execution.await?.into_bool();
93-
Ok(Mask::from(mask.bits().clone()))
94-
}
95-
.boxed(),
96-
))
81+
Ok(MaskExecution::lazy(move || {
82+
let mask = execution.execute()?.into_bool();
83+
Ok(Mask::from(mask.bits().clone()))
84+
}))
9785
}
9886
}

vortex-array/src/execution/validity.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use futures::future::FutureExt;
54
use vortex_compute::filter::Filter;
65
use vortex_error::VortexResult;
76
use vortex_mask::Mask;
@@ -27,23 +26,22 @@ impl dyn BindCtx + '_ {
2726
Some(selection) => {
2827
let selection = self.bind_mask(selection)?;
2928
match validity {
30-
Validity::NonNullable | Validity::AllValid => Ok(MaskExecution::Future(
31-
async move { Ok(Mask::AllTrue(selection.await?.true_count())) }.boxed(),
32-
)),
33-
Validity::AllInvalid => Ok(MaskExecution::Future(
34-
async move { Ok(Mask::AllFalse(selection.await?.true_count())) }.boxed(),
35-
)),
29+
Validity::NonNullable | Validity::AllValid => {
30+
Ok(MaskExecution::lazy(move || {
31+
Ok(Mask::AllTrue(selection.execute()?.true_count()))
32+
}))
33+
}
34+
Validity::AllInvalid => Ok(MaskExecution::lazy(move || {
35+
Ok(Mask::AllFalse(selection.execute()?.true_count()))
36+
})),
3637
Validity::Array(validity) => {
3738
let validity = self.bind_mask(validity)?;
38-
Ok(MaskExecution::Future(
39-
async move {
40-
let validity = validity.await?;
41-
let selection = selection.await?;
42-
// We perform a take on the validity mask using the selection mask.
43-
Ok(validity.filter(&selection))
44-
}
45-
.boxed(),
46-
))
39+
Ok(MaskExecution::lazy(move || {
40+
let validity = validity.execute()?;
41+
let selection = selection.execute()?;
42+
// We perform a take on the validity mask using the selection mask.
43+
Ok(validity.filter(&selection))
44+
}))
4745
}
4846
}
4947
}

0 commit comments

Comments
 (0)