Skip to content

Commit cc94c10

Browse files
committed
ALP Kernel
Signed-off-by: Nicholas Gates <[email protected]>
1 parent a157b09 commit cc94c10

File tree

8 files changed

+209
-36
lines changed

8 files changed

+209
-36
lines changed

vortex-array/src/arrow/executor/byte.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ use crate::ArrayRef;
1919
use crate::VectorExecutor;
2020
use crate::arrays::VarBinArray;
2121
use crate::arrays::VarBinVTable;
22-
use crate::arrow::null_buffer::to_null_buffer;
22+
use crate::arrow::executor::validity::to_arrow_null_buffer;
2323
use crate::builtins::ArrayBuiltins;
24+
use crate::vtable::ValidityHelper;
2425

2526
/// Convert a Vortex array into an Arrow GenericBinaryArray.
2627
pub(super) fn to_arrow_byte_array<T: ByteArrayType>(
@@ -60,7 +61,7 @@ where
6061

6162
let data = array.bytes().clone().into_arrow_buffer();
6263

63-
let null_buffer = to_null_buffer(array.validity_mask());
64+
let null_buffer = to_arrow_null_buffer(array.validity(), array.len(), session)?;
6465

6566
Ok(Arc::new(unsafe {
6667
GenericBinaryArray::<T::Offset>::new_unchecked(offsets, data, null_buffer)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use arrow_schema::FieldRef;
7+
use vortex_compute::arrow::IntoArrow;
8+
use vortex_error::VortexResult;
9+
use vortex_error::vortex_ensure;
10+
use vortex_error::vortex_err;
11+
use vortex_session::VortexSession;
12+
13+
use crate::ArrayRef;
14+
use crate::VectorExecutor;
15+
use crate::arrays::FixedSizeListArray;
16+
use crate::arrays::FixedSizeListVTable;
17+
use crate::arrow::ArrowArrayExecutor;
18+
use crate::arrow::executor::validity::to_arrow_null_buffer;
19+
use crate::vtable::ValidityHelper;
20+
21+
pub(super) fn to_arrow_fixed_list(
22+
array: ArrayRef,
23+
list_size: i32,
24+
elements_field: &FieldRef,
25+
session: &VortexSession,
26+
) -> VortexResult<arrow_array::ArrayRef> {
27+
// Check for Vortex FixedSizeListArray and convert directly.
28+
if let Some(array) = array.as_opt::<FixedSizeListVTable>() {
29+
return list_to_list(array, elements_field, list_size, session);
30+
}
31+
32+
// Otherwise, we execute the array to become a FixedSizeListArray.
33+
let vector = array
34+
.execute_vector(session)?
35+
.into_fixed_size_list_opt()
36+
.ok_or_else(|| vortex_err!("Failed to convert array to FixedSizeListArray"))?;
37+
vortex_ensure!(
38+
Ok(list_size) == i32::try_from(vector.list_size()),
39+
"Mismatched list size when converting FixedSizeListVector to Arrow array"
40+
);
41+
42+
Ok(Arc::new(vector.into_arrow()?))
43+
}
44+
45+
fn list_to_list(
46+
array: &FixedSizeListArray,
47+
elements_field: &FieldRef,
48+
list_size: i32,
49+
session: &VortexSession,
50+
) -> VortexResult<arrow_array::ArrayRef> {
51+
vortex_ensure!(
52+
Ok(list_size) == i32::try_from(array.list_size()),
53+
"Mismatched list size when converting FixedSizeListArray to Arrow array"
54+
);
55+
56+
let elements = array
57+
.elements()
58+
.clone()
59+
.execute_arrow(elements_field.data_type(), session)?;
60+
vortex_ensure!(
61+
elements_field.is_nullable() || elements.null_count() == 0,
62+
"Cannot convert FixedSizeListArray to non-nullable Arrow array when elements are nullable"
63+
);
64+
65+
let null_buffer = to_arrow_null_buffer(array.validity(), array.len(), session)?;
66+
67+
Ok(Arc::new(arrow_array::FixedSizeListArray::new(
68+
elements_field.clone(),
69+
list_size,
70+
elements,
71+
null_buffer,
72+
)))
73+
}

vortex-array/src/arrow/executor/list.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ use crate::arrays::ListVTable;
2929
use crate::arrays::ListViewArray;
3030
use crate::arrays::ListViewVTable;
3131
use crate::arrow::ArrowArrayExecutor;
32-
use crate::arrow::null_buffer::to_null_buffer;
32+
use crate::arrow::executor::validity::to_arrow_null_buffer;
3333
use crate::builtins::ArrayBuiltins;
34+
use crate::vtable::ValidityHelper;
3435

3536
/// Convert a Vortex array into an Arrow GenericBinaryArray.
3637
pub(super) fn to_arrow_list<O: OffsetSizeTrait + NativePType>(
@@ -91,7 +92,7 @@ fn list_to_list<O: OffsetSizeTrait + NativePType>(
9192
"Cannot convert to non-nullable Arrow array with null elements"
9293
);
9394

94-
let null_buffer = to_null_buffer(array.validity_mask());
95+
let null_buffer = to_arrow_null_buffer(array.validity(), array.len(), session)?;
9596

9697
// TODO(ngates): use new_unchecked when it is added to arrow-rs.
9798
Ok(Arc::new(GenericListArray::<O>::new(
@@ -143,13 +144,13 @@ fn list_view_zctl<O: OffsetSizeTrait + NativePType>(
143144
});
144145

145146
// Extract the elements array.
146-
let elements = elements.execute_arrow(&elements_field.data_type(), session)?;
147+
let elements = elements.execute_arrow(elements_field.data_type(), session)?;
147148
vortex_ensure!(
148149
elements_field.is_nullable() || elements.null_count() == 0,
149150
"Cannot convert to non-nullable Arrow array with null elements"
150151
);
151152

152-
let null_buffer = to_null_buffer(validity.to_mask(sizes.len()));
153+
let null_buffer = to_arrow_null_buffer(&validity, sizes.len(), session)?;
153154

154155
Ok(Arc::new(GenericListArray::<O>::new(
155156
elements_field.clone(),
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use arrow_array::GenericListViewArray;
7+
use arrow_array::OffsetSizeTrait;
8+
use arrow_schema::FieldRef;
9+
use vortex_compute::arrow::IntoArrow;
10+
use vortex_compute::cast::Cast;
11+
use vortex_dtype::DType;
12+
use vortex_dtype::IntegerPType;
13+
use vortex_dtype::Nullability::NonNullable;
14+
use vortex_dtype::PTypeDowncastExt;
15+
use vortex_error::VortexResult;
16+
use vortex_error::vortex_ensure;
17+
use vortex_error::vortex_err;
18+
use vortex_session::VortexSession;
19+
use vortex_vector::listview::ListViewVector;
20+
21+
use crate::ArrayRef;
22+
use crate::VectorExecutor;
23+
use crate::arrays::ListViewArray;
24+
use crate::arrays::ListViewVTable;
25+
use crate::arrow::ArrowArrayExecutor;
26+
use crate::arrow::executor::validity::to_arrow_null_buffer;
27+
use crate::builtins::ArrayBuiltins;
28+
29+
pub(super) fn to_arrow_list_view<O: OffsetSizeTrait + IntegerPType>(
30+
array: ArrayRef,
31+
elements_field: &FieldRef,
32+
session: &VortexSession,
33+
) -> VortexResult<arrow_array::ArrayRef> {
34+
// Check for Vortex ListViewArray and convert directly.
35+
let array = match array.try_into::<ListViewVTable>() {
36+
Ok(array) => return list_view_to_list_view::<O>(array, elements_field, session),
37+
Err(array) => array,
38+
};
39+
40+
// Otherwise, we execute as a vector and convert.
41+
let mut vector = array
42+
.execute_vector(session)?
43+
.into_list_opt()
44+
.ok_or_else(|| vortex_err!("Failed to convert array to ListVector"))?;
45+
46+
// Ensure the offset type matches.
47+
if vector.offsets().ptype() != O::PTYPE || vector.sizes().ptype() != O::PTYPE {
48+
let (elements, offsets, sizes, validity) = vector.into_parts();
49+
let offsets = offsets
50+
.cast(&DType::Primitive(O::PTYPE, NonNullable))?
51+
.into_primitive();
52+
let sizes = sizes
53+
.cast(&DType::Primitive(O::PTYPE, NonNullable))?
54+
.into_primitive();
55+
vector = unsafe { ListViewVector::new_unchecked(elements, offsets, sizes, validity) };
56+
}
57+
58+
Ok(Arc::new(vector.into_arrow()?))
59+
}
60+
61+
fn list_view_to_list_view<O: OffsetSizeTrait + IntegerPType>(
62+
array: ListViewArray,
63+
elements_field: &FieldRef,
64+
session: &VortexSession,
65+
) -> VortexResult<arrow_array::ArrayRef> {
66+
let (elements, offsets, sizes, validity) = array.into_parts();
67+
68+
let elements = elements.execute_arrow(elements_field.data_type(), session)?;
69+
vortex_ensure!(
70+
elements_field.is_nullable() || elements.null_count() == 0,
71+
"Elements field is non-nullable but elements array contains nulls"
72+
);
73+
74+
let offsets = offsets
75+
.cast(DType::Primitive(O::PTYPE, NonNullable))?
76+
.execute_vector(session)?
77+
.into_primitive()
78+
.downcast::<O>()
79+
.into_buffer()
80+
.into_arrow_scalar_buffer();
81+
let sizes = sizes
82+
.cast(DType::Primitive(O::PTYPE, NonNullable))?
83+
.execute_vector(session)?
84+
.into_primitive()
85+
.downcast::<O>()
86+
.into_buffer()
87+
.into_arrow_scalar_buffer();
88+
89+
let null_buffer = to_arrow_null_buffer(&validity, offsets.len(), session)?;
90+
91+
Ok(Arc::new(GenericListViewArray::<O>::new(
92+
elements_field.clone(),
93+
offsets,
94+
sizes,
95+
elements,
96+
null_buffer,
97+
)))
98+
}

vortex-array/src/arrow/executor/mod.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ mod byte;
66
mod byte_view;
77
mod decimal;
88
mod dictionary;
9+
mod fixed_list;
910
mod list;
11+
mod list_view;
1012
mod null;
1113
mod primitive;
1214
mod run_end;
@@ -20,7 +22,6 @@ use arrow_array::cast::AsArray;
2022
use arrow_array::types::*;
2123
use arrow_schema::DataType;
2224
use arrow_schema::Schema;
23-
use vortex_error::VortexError;
2425
use vortex_error::VortexResult;
2526
use vortex_error::vortex_bail;
2627
use vortex_session::VortexSession;
@@ -31,7 +32,9 @@ use crate::arrow::executor::byte::to_arrow_byte_array;
3132
use crate::arrow::executor::byte_view::to_arrow_byte_view;
3233
use crate::arrow::executor::decimal::to_arrow_decimal;
3334
use crate::arrow::executor::dictionary::to_arrow_dictionary;
35+
use crate::arrow::executor::fixed_list::to_arrow_fixed_list;
3436
use crate::arrow::executor::list::to_arrow_list;
37+
use crate::arrow::executor::list_view::to_arrow_list_view;
3538
use crate::arrow::executor::null::to_arrow_null;
3639
use crate::arrow::executor::primitive::to_arrow_primitive;
3740
use crate::arrow::executor::run_end::to_arrow_run_end;
@@ -56,7 +59,7 @@ pub trait ArrowArrayExecutor: Sized {
5659
session: &VortexSession,
5760
) -> VortexResult<RecordBatch> {
5861
let array = self.execute_arrow(&DataType::Struct(schema.fields.clone()), session)?;
59-
RecordBatch::try_from(array.as_struct()).map_err(VortexError::from)
62+
Ok(RecordBatch::from(array.as_struct()))
6063
}
6164
}
6265

@@ -95,18 +98,16 @@ impl ArrowArrayExecutor for ArrayRef {
9598
DataType::LargeList(elements_field) => {
9699
to_arrow_list::<i64>(self, elements_field, session)
97100
}
98-
DataType::ListView(_) => {
99-
todo!()
101+
DataType::FixedSizeList(elements_field, list_size) => {
102+
to_arrow_fixed_list(self, *list_size, elements_field, session)
100103
}
101-
DataType::FixedSizeList(..) => {
102-
todo!()
104+
DataType::ListView(elements_field) => {
105+
to_arrow_list_view::<i32>(self, elements_field, session)
103106
}
104-
105-
DataType::LargeListView(_) => {
106-
todo!()
107+
DataType::LargeListView(elements_field) => {
108+
to_arrow_list_view::<i64>(self, elements_field, session)
107109
}
108110
DataType::Struct(fields) => to_arrow_struct(self, fields, session),
109-
110111
DataType::Dictionary(codes_type, values_type) => {
111112
to_arrow_dictionary(self, codes_type, values_type, session)
112113
}
@@ -123,7 +124,7 @@ impl ArrowArrayExecutor for ArrayRef {
123124
to_arrow_decimal::<Decimal256Type, vortex_dtype::i256>(self, *p, *s, session)
124125
}
125126
DataType::RunEndEncoded(ends_type, values_type) => {
126-
to_arrow_run_end(self, ends_type.data_type(), &values_type, session)
127+
to_arrow_run_end(self, ends_type.data_type(), values_type, session)
127128
}
128129
DataType::FixedSizeBinary(_)
129130
| DataType::Map(..)

vortex-array/src/arrow/executor/struct_.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,16 @@ pub(super) fn to_arrow_struct(
4040
};
4141

4242
// We can also short-circuit if the array is a `pack` scalar function:
43-
if let Some(array) = array.as_opt::<ScalarFnVTable>() {
44-
if let Some(_pack_options) = array.scalar_fn().as_opt::<Pack>() {
45-
return create_from_fields(
46-
fields,
47-
array.children().to_vec(),
48-
None, // Pack is never null,
49-
len,
50-
session,
51-
);
52-
}
43+
if let Some(array) = array.as_opt::<ScalarFnVTable>()
44+
&& let Some(_pack_options) = array.scalar_fn().as_opt::<Pack>()
45+
{
46+
return create_from_fields(
47+
fields,
48+
array.children().to_vec(),
49+
None, // Pack is never null,
50+
len,
51+
session,
52+
);
5353
}
5454

5555
// Otherwise, we have some options:
@@ -95,11 +95,6 @@ fn create_from_fields(
9595
}
9696

9797
Ok(Arc::new(unsafe {
98-
StructArray::new_unchecked_with_length(
99-
fields.clone(),
100-
arrow_fields.into(),
101-
null_buffer,
102-
len,
103-
)
98+
StructArray::new_unchecked_with_length(fields.clone(), arrow_fields, null_buffer, len)
10499
}))
105100
}

vortex-datafusion/src/persistent/opener.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use arrow_schema::Schema;
1212
use arrow_schema::SchemaRef;
1313
use datafusion_common::DataFusionError;
1414
use datafusion_common::Result as DFResult;
15+
use datafusion_common::arrow::array::RecordBatch;
1516
use datafusion_datasource::FileRange;
1617
use datafusion_datasource::PartitionedFile;
1718
use datafusion_datasource::file_meta::FileMeta;
@@ -39,6 +40,7 @@ use vortex::error::VortexError;
3940
use vortex::expr::root;
4041
use vortex::expr::select;
4142
use vortex::layout::LayoutReader;
43+
use vortex::layout::layouts::USE_VORTEX_OPERATORS;
4244
use vortex::metrics::VortexMetrics;
4345
use vortex::scan::ScanBuilder;
4446
use vortex::session::VortexSession;
@@ -347,8 +349,11 @@ impl FileOpener for VortexOpener {
347349
.with_some_filter(filter)
348350
.with_ordered(has_output_ordering)
349351
.map(move |chunk| {
350-
chunk.execute_record_batch(&projected_schema, &chunk_session)
351-
// RecordBatch::try_from(chunk.as_ref())
352+
if *USE_VORTEX_OPERATORS {
353+
chunk.execute_record_batch(&projected_schema, &chunk_session)
354+
} else {
355+
RecordBatch::try_from(chunk.as_ref())
356+
}
352357
})
353358
.into_stream()
354359
.map_err(|e| {

vortex-vector/src/listview/vector.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::fmt::Debug;
77
use std::ops::BitAnd;
88
use std::ops::RangeBounds;
99
use std::sync::Arc;
10-
1110
use vortex_error::VortexExpect;
1211
use vortex_error::VortexResult;
1312
use vortex_error::vortex_ensure;

0 commit comments

Comments
 (0)