Skip to content

Commit 2fc9ad1

Browse files
committed
Merge remote-tracking branch 'origin/develop' into ji/assert-array-eq-more-places
2 parents 154ada0 + 60ce4b5 commit 2fc9ad1

File tree

22 files changed

+909
-299
lines changed

22 files changed

+909
-299
lines changed

vortex-array/src/array/operator.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,62 +3,58 @@
33

44
use std::sync::Arc;
55

6-
use async_trait::async_trait;
76
use vortex_error::VortexResult;
87
use vortex_vector::Vector;
98

10-
use crate::execution::{BatchKernel, BindCtx};
9+
use crate::execution::{BatchKernelRef, BindCtx};
1110
use crate::vtable::{OperatorVTable, VTable};
1211
use crate::{Array, ArrayAdapter, ArrayRef};
1312

1413
/// Array functions as provided by the `OperatorVTable`.
1514
///
1615
/// Note: the public functions such as "execute" should move onto the main `Array` trait when
1716
/// operators is stabilized. The other functions should remain on a `pub(crate)` trait.
18-
#[async_trait]
1917
pub trait ArrayOperator: 'static + Send + Sync {
2018
/// Execute the array producing a canonical vector.
21-
async fn execute(&self) -> VortexResult<Vector> {
22-
self.execute_with_selection(None).await
19+
fn execute(&self) -> VortexResult<Vector> {
20+
self.execute_with_selection(None)
2321
}
2422

2523
/// Execute the array with a selection mask, producing a canonical vector.
26-
async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector>;
24+
fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector>;
2725

2826
/// Bind the array to a batch kernel. This is an internal function
2927
fn bind(
3028
&self,
3129
selection: Option<&ArrayRef>,
3230
ctx: &mut dyn BindCtx,
33-
) -> VortexResult<BatchKernel>;
31+
) -> VortexResult<BatchKernelRef>;
3432
}
3533

36-
#[async_trait]
3734
impl ArrayOperator for Arc<dyn Array> {
38-
async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector> {
39-
self.as_ref().execute_with_selection(selection).await
35+
fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector> {
36+
self.as_ref().execute_with_selection(selection)
4037
}
4138

4239
fn bind(
4340
&self,
4441
selection: Option<&ArrayRef>,
4542
ctx: &mut dyn BindCtx,
46-
) -> VortexResult<BatchKernel> {
43+
) -> VortexResult<BatchKernelRef> {
4744
self.as_ref().bind(selection, ctx)
4845
}
4946
}
5047

51-
#[async_trait]
5248
impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
53-
async fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector> {
54-
self.bind(selection, &mut ())?.await
49+
fn execute_with_selection(&self, selection: Option<&ArrayRef>) -> VortexResult<Vector> {
50+
self.bind(selection, &mut ())?.execute()
5551
}
5652

5753
fn bind(
5854
&self,
5955
selection: Option<&ArrayRef>,
6056
ctx: &mut dyn BindCtx,
61-
) -> VortexResult<BatchKernel> {
57+
) -> VortexResult<BatchKernelRef> {
6258
<V::OperatorVTable as OperatorVTable<V>>::bind(&self.0, selection, ctx)
6359
}
6460
}
@@ -69,7 +65,7 @@ impl BindCtx for () {
6965
&mut self,
7066
array: &ArrayRef,
7167
selection: Option<&ArrayRef>,
72-
) -> VortexResult<BatchKernel> {
68+
) -> VortexResult<BatchKernelRef> {
7369
array.bind(selection, self)
7470
}
7571
}
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,33 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use futures::{FutureExt, try_join};
54
use vortex_compute::filter::Filter;
65
use vortex_error::VortexResult;
76
use vortex_vector::BoolVector;
87

98
use crate::ArrayRef;
109
use crate::arrays::{BoolArray, BoolVTable};
11-
use crate::execution::{BatchKernel, BindCtx};
10+
use crate::execution::{BatchKernelRef, BindCtx, kernel};
1211
use crate::vtable::{OperatorVTable, ValidityHelper};
1312

1413
impl OperatorVTable<BoolVTable> for BoolVTable {
1514
fn bind(
1615
array: &BoolArray,
1716
selection: Option<&ArrayRef>,
1817
ctx: &mut dyn BindCtx,
19-
) -> VortexResult<BatchKernel> {
18+
) -> VortexResult<BatchKernelRef> {
2019
let bits = array.buffer.clone();
2120
let mask = ctx.bind_selection(array.len(), selection)?;
2221
let validity = ctx.bind_validity(array.validity(), array.len(), selection)?;
2322

24-
Ok(async move {
25-
let (mask, validity) = try_join!(mask, validity)?;
23+
Ok(kernel(move || {
24+
let mask = mask.execute()?;
25+
let validity = validity.execute()?;
2626

2727
// Note that validity already has the mask applied so we only need to apply it to bits.
2828
let bits = bits.filter(&mask);
2929

3030
Ok(BoolVector::new(bits, validity).into())
31-
}
32-
.boxed())
31+
}))
3332
}
3433
}

vortex-array/src/arrays/primitive/vtable/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{EncodingId, EncodingRef, vtable};
88
mod array;
99
mod canonical;
1010
mod operations;
11-
mod pipeline;
11+
mod operator;
1212
mod serde;
1313
mod validity;
1414
mod visitor;
@@ -26,8 +26,8 @@ impl VTable for PrimitiveVTable {
2626
type VisitorVTable = Self;
2727
type ComputeVTable = NotSupported;
2828
type EncodeVTable = NotSupported;
29-
type OperatorVTable = Self;
3029
type SerdeVTable = Self;
30+
type OperatorVTable = Self;
3131

3232
fn id(_encoding: &Self::Encoding) -> EncodingId {
3333
EncodingId::new_ref("vortex.primitive")
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_compute::filter::Filter;
5+
use vortex_dtype::match_each_native_ptype;
6+
use vortex_error::VortexResult;
7+
use vortex_vector::PVector;
8+
9+
use crate::ArrayRef;
10+
use crate::arrays::{PrimitiveArray, PrimitiveVTable};
11+
use crate::execution::{BatchKernelRef, BindCtx, kernel};
12+
use crate::vtable::{OperatorVTable, ValidityHelper};
13+
14+
impl OperatorVTable<PrimitiveVTable> for PrimitiveVTable {
15+
fn bind(
16+
array: &PrimitiveArray,
17+
selection: Option<&ArrayRef>,
18+
ctx: &mut dyn BindCtx,
19+
) -> VortexResult<BatchKernelRef> {
20+
let mask = ctx.bind_selection(array.len(), selection)?;
21+
let validity = ctx.bind_validity(array.validity(), array.len(), selection)?;
22+
23+
match_each_native_ptype!(array.ptype(), |T| {
24+
let elements = array.buffer::<T>();
25+
Ok(kernel(move || {
26+
let mask = mask.execute()?;
27+
let validity = validity.execute()?;
28+
29+
// Note that validity already has the mask applied so we only need to apply it to
30+
// the elements.
31+
let elements = elements.filter(&mask);
32+
33+
Ok(PVector::new(elements, validity).into())
34+
}))
35+
})
36+
}
37+
}

vortex-array/src/arrays/primitive/vtable/pipeline.rs

Lines changed: 0 additions & 85 deletions
This file was deleted.

vortex-array/src/compute/arrays/logical.rs

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::hash::{Hash, Hasher};
55
use std::sync::LazyLock;
66

77
use enum_map::{Enum, EnumMap, enum_map};
8-
use futures::{FutureExt, try_join};
98
use vortex_buffer::ByteBuffer;
109
use vortex_compute::logical::{
1110
LogicalAnd, LogicalAndKleene, LogicalAndNot, LogicalOr, LogicalOrKleene,
@@ -14,7 +13,7 @@ use vortex_dtype::DType;
1413
use vortex_error::VortexResult;
1514
use vortex_vector::BoolVector;
1615

17-
use crate::execution::{BatchKernel, BindCtx};
16+
use crate::execution::{BatchKernelRef, BindCtx, kernel};
1817
use crate::serde::ArrayChildren;
1918
use crate::stats::{ArrayStats, StatsSetRef};
2019
use crate::vtable::{
@@ -183,37 +182,35 @@ impl OperatorVTable<LogicalVTable> for LogicalVTable {
183182
array: &LogicalArray,
184183
selection: Option<&ArrayRef>,
185184
ctx: &mut dyn BindCtx,
186-
) -> VortexResult<BatchKernel> {
185+
) -> VortexResult<BatchKernelRef> {
187186
let lhs = ctx.bind(&array.lhs, selection)?;
188187
let rhs = ctx.bind(&array.rhs, selection)?;
189188

190189
Ok(match array.operator() {
191-
LogicalOperator::And => kernel(lhs, rhs, |l, r| l.and(&r)),
192-
LogicalOperator::AndKleene => kernel(lhs, rhs, |l, r| l.and_kleene(&r)),
193-
LogicalOperator::Or => kernel(lhs, rhs, |l, r| l.or(&r)),
194-
LogicalOperator::OrKleene => kernel(lhs, rhs, |l, r| l.or_kleene(&r)),
195-
LogicalOperator::AndNot => kernel(lhs, rhs, |l, r| l.and_not(&r)),
190+
LogicalOperator::And => logical_kernel(lhs, rhs, |l, r| l.and(&r)),
191+
LogicalOperator::AndKleene => logical_kernel(lhs, rhs, |l, r| l.and_kleene(&r)),
192+
LogicalOperator::Or => logical_kernel(lhs, rhs, |l, r| l.or(&r)),
193+
LogicalOperator::OrKleene => logical_kernel(lhs, rhs, |l, r| l.or_kleene(&r)),
194+
LogicalOperator::AndNot => logical_kernel(lhs, rhs, |l, r| l.and_not(&r)),
196195
})
197196
}
198197
}
199198

200199
/// Batch execution kernel for logical operations.
201-
fn kernel<O>(lhs: BatchKernel, rhs: BatchKernel, op: O) -> BatchKernel
200+
fn logical_kernel<O>(lhs: BatchKernelRef, rhs: BatchKernelRef, op: O) -> BatchKernelRef
202201
where
203202
O: Fn(BoolVector, BoolVector) -> BoolVector + Send + 'static,
204203
{
205-
async move {
206-
let (lhs, rhs) = try_join!(lhs, rhs)?;
207-
let (lhs, rhs) = (lhs.into_bool(), rhs.into_bool());
204+
kernel(move || {
205+
let lhs = lhs.execute()?.into_bool();
206+
let rhs = rhs.execute()?.into_bool();
208207
Ok(op(lhs, rhs).into())
209-
}
210-
.boxed()
208+
})
211209
}
212210

213211
#[cfg(test)]
214212
mod tests {
215213
use vortex_buffer::bitbuffer;
216-
use vortex_io::runtime::single::block_on;
217214

218215
use crate::compute::arrays::logical::{LogicalArray, LogicalOperator};
219216
use crate::{ArrayOperator, ArrayRef, IntoArray};
@@ -224,28 +221,23 @@ mod tests {
224221

225222
#[test]
226223
fn test_and() {
227-
block_on(|_| async {
228-
let lhs = bitbuffer![0 1 0].into_array();
229-
let rhs = bitbuffer![0 1 1].into_array();
230-
let result = and_(lhs, rhs).execute().await.unwrap().into_bool();
231-
assert_eq!(result.bits(), &bitbuffer![0 1 0]);
232-
})
224+
let lhs = bitbuffer![0 1 0].into_array();
225+
let rhs = bitbuffer![0 1 1].into_array();
226+
let result = and_(lhs, rhs).execute().unwrap().into_bool();
227+
assert_eq!(result.bits(), &bitbuffer![0 1 0]);
233228
}
234229

235230
#[test]
236231
fn test_and_selected() {
237-
block_on(|_| async {
238-
let lhs = bitbuffer![0 1 0].into_array();
239-
let rhs = bitbuffer![0 1 1].into_array();
240-
241-
let selection = bitbuffer![0 1 1].into_array();
242-
243-
let result = and_(lhs, rhs)
244-
.execute_with_selection(Some(&selection))
245-
.await
246-
.unwrap()
247-
.into_bool();
248-
assert_eq!(result.bits(), &bitbuffer![1 0]);
249-
})
232+
let lhs = bitbuffer![0 1 0].into_array();
233+
let rhs = bitbuffer![0 1 1].into_array();
234+
235+
let selection = bitbuffer![0 1 1].into_array();
236+
237+
let result = and_(lhs, rhs)
238+
.execute_with_selection(Some(&selection))
239+
.unwrap()
240+
.into_bool();
241+
assert_eq!(result.bits(), &bitbuffer![1 0]);
250242
}
251243
}

0 commit comments

Comments
 (0)