Skip to content

Commit ba3b2e5

Browse files
committed
feat[array]: duckdb execute
Signed-off-by: Joe Isaacs <[email protected]>
1 parent b7fb836 commit ba3b2e5

File tree

23 files changed

+1050
-74
lines changed

23 files changed

+1050
-74
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-compute/src/take/vector/fixed_size_list.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl<I: UnsignedPType> Take<[I]> for &FixedSizeListVector {
4949
}
5050

5151
let element_indices = expand_indices(indices, self.list_size());
52-
let taken_elements = self.elements().take(element_indices.as_slice());
52+
let taken_elements = self.elements().as_ref().take(element_indices.as_slice());
5353

5454
debug_assert_eq!(taken_elements.len(), indices.len() * list_size);
5555
debug_assert_eq!(taken_validity.len(), indices.len());
@@ -92,7 +92,10 @@ fn take_nullable<I: UnsignedPType>(
9292
}
9393

9494
let expanded_nullable_indices = expand_nullable_indices(indices, list_size);
95-
let taken_elements = fsl.elements().take(expanded_nullable_indices.as_slice());
95+
let taken_elements = fsl
96+
.elements()
97+
.as_ref()
98+
.take(expanded_nullable_indices.as_slice());
9699

97100
debug_assert_eq!(taken_elements.len(), indices.len() * list_size);
98101
debug_assert_eq!(taken_validity.len(), indices.len());

vortex-compute/src/take/vector/mod.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,22 @@ mod struct_;
2727
#[cfg(test)]
2828
mod tests;
2929

30+
impl<I: UnsignedPType> Take<PVector<I>> for Vector {
31+
type Output = Vector;
32+
33+
fn take(self, indices: &PVector<I>) -> Vector {
34+
(&self).take(indices)
35+
}
36+
}
37+
38+
impl<I: UnsignedPType> Take<[I]> for Vector {
39+
type Output = Vector;
40+
41+
fn take(self, indices: &[I]) -> Vector {
42+
(&self).take(indices)
43+
}
44+
}
45+
3046
impl<I: UnsignedPType> Take<PVector<I>> for &Vector {
3147
type Output = Vector;
3248

@@ -43,16 +59,24 @@ impl<I: UnsignedPType> Take<[I]> for &Vector {
4359
}
4460
}
4561

46-
impl<T> Take<PrimitiveVector> for &T
62+
impl Take<PrimitiveVector> for &Vector
4763
where
48-
for<'a> &'a T: Take<PVector<u8>, Output = T>,
49-
for<'a> &'a T: Take<PVector<u16>, Output = T>,
50-
for<'a> &'a T: Take<PVector<u32>, Output = T>,
51-
for<'a> &'a T: Take<PVector<u64>, Output = T>,
64+
for<'a> &'a Vector: Take<PVector<u8>, Output = Vector>,
65+
for<'a> &'a Vector: Take<PVector<u16>, Output = Vector>,
66+
for<'a> &'a Vector: Take<PVector<u32>, Output = Vector>,
67+
for<'a> &'a Vector: Take<PVector<u64>, Output = Vector>,
5268
{
53-
type Output = T;
69+
type Output = Vector;
5470

55-
fn take(self, indices: &PrimitiveVector) -> T {
71+
fn take(self, indices: &PrimitiveVector) -> Vector {
5672
match_each_unsigned_pvector!(indices, |iv| { self.take(iv) })
5773
}
5874
}
75+
76+
impl Take<PrimitiveVector> for Vector {
77+
type Output = Vector;
78+
79+
fn take(self, indices: &PrimitiveVector) -> Vector {
80+
(&self).take(indices)
81+
}
82+
}

vortex-duckdb/src/duckdb/logical_type.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ macro_rules! match_each_primitive_type {
402402
let $type = <$crate::duckdb::Double as $crate::duckdb::PrimitiveType>::NATIVE;
403403
$body
404404
}
405-
_ => vortex_panic!(
405+
_ => vortex::error::vortex_panic!(
406406
"Unexpected type for match_each_primitive_type: {:?}",
407407
$self.as_type_id()
408408
),

vortex-duckdb/src/exporter/bool.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use itertools::Itertools;
5+
use vortex::array::ArrayRef;
6+
use vortex::array::VectorExecutor;
57
use vortex::array::arrays::BoolArray;
8+
use vortex::buffer::BitBuffer;
69
use vortex::error::VortexResult;
710
use vortex::mask::Mask;
11+
use vortex::session::VortexSession;
812

13+
use crate::LogicalType;
914
use crate::duckdb::Vector;
1015
use crate::exporter::ColumnExporter;
1116
use crate::exporter::all_invalid;
@@ -52,10 +57,55 @@ impl ColumnExporter for BoolExporter {
5257
}
5358
}
5459

60+
struct BoolVectorExporter {
61+
buffer: BitBuffer,
62+
mask: Mask,
63+
}
64+
65+
pub(crate) fn new_vector_exporter(
66+
array: ArrayRef,
67+
session: &VortexSession,
68+
) -> VortexResult<Box<dyn ColumnExporter>> {
69+
let vector = array.execute_vector(session)?.into_bool();
70+
let (buffer, mask) = vector.into_parts();
71+
if mask.all_false() {
72+
return Ok(all_invalid::new_exporter(
73+
buffer.len(),
74+
&LogicalType::bool(),
75+
));
76+
}
77+
Ok(Box::new(BoolVectorExporter { buffer, mask }))
78+
}
79+
80+
impl ColumnExporter for BoolVectorExporter {
81+
fn export(&self, offset: usize, len: usize, vector: &mut Vector) -> VortexResult<()> {
82+
// Set validity if necessary.
83+
if unsafe { vector.set_validity(&self.mask, offset, len) } {
84+
// All values are null, so no point copying the data.
85+
return Ok(());
86+
}
87+
88+
// DuckDB uses byte bools, not bit bools.
89+
// maybe we can convert into these from a compressed array sometimes?.
90+
unsafe { vector.as_slice_mut(len) }.copy_from_slice(
91+
&self
92+
.buffer
93+
.slice(offset..(offset + len))
94+
.iter()
95+
.collect_vec(),
96+
);
97+
98+
Ok(())
99+
}
100+
}
101+
55102
#[cfg(test)]
56103
mod tests {
57104
use std::iter;
58105

106+
use vortex::VortexSessionDefault;
107+
use vortex::array::IntoArray;
108+
59109
use super::*;
60110
use crate::cpp;
61111
use crate::duckdb::DataChunk;
@@ -97,6 +147,29 @@ mod tests {
97147
format!(
98148
r#"Chunk - [1 Columns]
99149
- FLAT BOOLEAN: 65 = [ {}]
150+
"#,
151+
iter::repeat_n("true", 65).join(", ")
152+
)
153+
);
154+
}
155+
156+
#[test]
157+
fn test_bool_vector_long() {
158+
let arr = BoolArray::from_iter([true; 128]);
159+
160+
let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_BOOLEAN)]);
161+
162+
new_vector_exporter(arr.into_array(), &VortexSession::default())
163+
.unwrap()
164+
.export(1, 66, &mut chunk.get_vector(0))
165+
.unwrap();
166+
chunk.set_len(65);
167+
168+
assert_eq!(
169+
format!("{}", String::try_from(&chunk).unwrap()),
170+
format!(
171+
r#"Chunk - [1 Columns]
172+
- FLAT BOOLEAN: 65 = [ {}]
100173
"#,
101174
iter::repeat_n("true", 65).join(", ")
102175
)

vortex-duckdb/src/exporter/cache.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use parking_lot::Mutex;
77
use vortex::array::ArrayRef;
88
use vortex::array::Canonical;
9+
use vortex::vector::Vector as VxVector;
910
use vortex_utils::aliases::dash_map::DashMap;
1011

1112
use crate::duckdb::Vector;
@@ -19,6 +20,8 @@ use crate::duckdb::Vector;
1920
pub struct ConversionCache {
2021
pub values_cache: DashMap<usize, (ArrayRef, Arc<Mutex<Vector>>)>,
2122
pub canonical_cache: DashMap<usize, (ArrayRef, Canonical)>,
23+
// Use for `USE_VORTEX_OPERATORS` will replace canonical_cache.
24+
pub vector_cache: DashMap<usize, (ArrayRef, VxVector)>,
2225
// A value which must be unique for a given DuckDB operator.
2326
instance_id: u64,
2427
}

vortex-duckdb/src/exporter/decimal.rs

Lines changed: 109 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
use std::marker::PhantomData;
55

66
use num_traits::ToPrimitive;
7+
use vortex::array::ArrayRef;
8+
use vortex::array::VectorExecutor;
79
use vortex::array::arrays::DecimalArray;
810
use vortex::buffer::Buffer;
911
use vortex::dtype::BigCast;
@@ -15,6 +17,7 @@ use vortex::error::VortexResult;
1517
use vortex::error::vortex_bail;
1618
use vortex::mask::Mask;
1719
use vortex::scalar::DecimalType;
20+
use vortex::session::VortexSession;
1821

1922
use crate::duckdb::Vector;
2023
use crate::duckdb::VectorBuffer;
@@ -59,18 +62,6 @@ pub(crate) fn new_exporter(array: &DecimalArray) -> VortexResult<Box<dyn ColumnE
5962
})
6063
}
6164

62-
/// Maps a decimal precision into the small type that can represent it.
63-
/// see <https://duckdb.org/docs/stable/sql/data_types/numeric.html#fixed-point-decimals>
64-
pub fn precision_to_duckdb_storage_size(decimal_dtype: &DecimalDType) -> VortexResult<DecimalType> {
65-
Ok(match decimal_dtype.precision() {
66-
1..=4 => DecimalType::I16,
67-
5..=9 => DecimalType::I32,
68-
10..=18 => DecimalType::I64,
69-
19..=38 => DecimalType::I128,
70-
decimal_dtype => vortex_bail!("cannot represent decimal in ducdkb {decimal_dtype}"),
71-
})
72-
}
73-
7465
impl<D: NativeDecimalType, N: NativeDecimalType> ColumnExporter for DecimalExporter<D, N>
7566
where
7667
D: ToPrimitive,
@@ -115,6 +106,112 @@ impl<D: NativeDecimalType> ColumnExporter for DecimalZeroCopyExporter<D> {
115106
}
116107
}
117108

109+
struct DecimalVectorExporter<D: NativeDecimalType, N: NativeDecimalType> {
110+
values: Buffer<D>,
111+
mask: Mask,
112+
/// The DecimalType of the DuckDB column.
113+
dest_value_type: PhantomData<N>,
114+
}
115+
116+
struct DecimalVectorZeroCopyExporter<D: NativeDecimalType> {
117+
len: usize,
118+
begin: *const D,
119+
shared_buffer: VectorBuffer,
120+
mask: Mask,
121+
}
122+
123+
pub(crate) fn new_vector_exporter(
124+
array: ArrayRef,
125+
session: &VortexSession,
126+
) -> VortexResult<Box<dyn ColumnExporter>> {
127+
let vector = array.execute_vector(session)?.into_decimal();
128+
let decimal_type = vector.decimal_type();
129+
130+
let dest_values_type =
131+
precision_to_duckdb_storage_size(&DecimalDType::new(vector.precision(), vector.scale()))?;
132+
133+
if decimal_type == dest_values_type {
134+
match_each_decimal_value_type!(decimal_type, |D| {
135+
let vector = D::downcast(vector);
136+
let (_, buffer, mask) = vector.into_parts();
137+
return Ok(Box::new(DecimalVectorZeroCopyExporter {
138+
len: buffer.len(),
139+
begin: buffer.as_ptr(),
140+
shared_buffer: VectorBuffer::new(buffer),
141+
mask,
142+
}));
143+
})
144+
}
145+
146+
match_each_decimal_value_type!(decimal_type, |D| {
147+
match_each_decimal_value_type!(decimal_type, |N| {
148+
let vector = D::downcast(vector);
149+
let (_, buffer, mask) = vector.into_parts();
150+
Ok(Box::new(DecimalVectorExporter {
151+
values: buffer,
152+
mask,
153+
dest_value_type: PhantomData::<N>,
154+
}))
155+
})
156+
})
157+
}
158+
159+
impl<D: NativeDecimalType, N: NativeDecimalType> ColumnExporter for DecimalVectorExporter<D, N>
160+
where
161+
D: ToPrimitive,
162+
N: BigCast,
163+
{
164+
fn export(&self, offset: usize, len: usize, vector: &mut Vector) -> VortexResult<()> {
165+
// Set validity if necessary.
166+
if unsafe { vector.set_validity(&self.mask, offset, len) } {
167+
// All values are null, so no point copying the data.
168+
return Ok(());
169+
}
170+
171+
// Copy the values from the Vortex array to the DuckDB vector.
172+
for (src, dst) in self.values[offset..offset + len]
173+
.iter()
174+
.zip(unsafe { vector.as_slice_mut(len) })
175+
{
176+
*dst = <N as BigCast>::from(*src).vortex_expect(
177+
"We know all decimals with this scale/precision fit into the target bit width",
178+
);
179+
}
180+
181+
Ok(())
182+
}
183+
}
184+
185+
impl<D: NativeDecimalType> ColumnExporter for DecimalVectorZeroCopyExporter<D> {
186+
fn export(&self, offset: usize, len: usize, vector: &mut Vector) -> VortexResult<()> {
187+
if unsafe { vector.set_validity(&self.mask, offset, len) } {
188+
// All values are null, so no point copying the data.
189+
return Ok(());
190+
}
191+
192+
assert!(self.len >= offset + len);
193+
194+
let pos = unsafe { self.begin.add(offset) };
195+
unsafe { vector.set_vector_buffer(&self.shared_buffer) };
196+
// While we are setting a *mut T this is an artifact of the C API, this is in fact const.
197+
unsafe { vector.set_data_ptr(pos as *mut D) };
198+
199+
Ok(())
200+
}
201+
}
202+
203+
/// Maps a decimal precision into the small type that can represent it.
204+
/// see <https://duckdb.org/docs/stable/sql/data_types/numeric.html#fixed-point-decimals>
205+
pub fn precision_to_duckdb_storage_size(decimal_dtype: &DecimalDType) -> VortexResult<DecimalType> {
206+
Ok(match decimal_dtype.precision() {
207+
1..=4 => DecimalType::I16,
208+
5..=9 => DecimalType::I32,
209+
10..=18 => DecimalType::I64,
210+
19..=38 => DecimalType::I128,
211+
decimal_dtype => vortex_bail!("cannot represent decimal in ducdkb {decimal_dtype}"),
212+
})
213+
}
214+
118215
#[cfg(test)]
119216
mod tests {
120217
use vortex::array::arrays::DecimalArray;

0 commit comments

Comments
 (0)