Skip to content

Commit 6e32d0f

Browse files
committed
take/slice/filter kernels
1 parent ed75a1a commit 6e32d0f

File tree

3 files changed

+279
-0
lines changed

3 files changed

+279
-0
lines changed
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::ops::Range;
5+
6+
use vortex_array::ArrayRef;
7+
use vortex_array::DynArray;
8+
use vortex_array::ExecutionCtx;
9+
use vortex_array::IntoArray;
10+
use vortex_array::arrays::dict::TakeExecute;
11+
use vortex_array::arrays::dict::TakeExecuteAdaptor;
12+
use vortex_array::arrays::filter::FilterExecuteAdaptor;
13+
use vortex_array::arrays::filter::FilterKernel;
14+
use vortex_array::arrays::slice::SliceExecuteAdaptor;
15+
use vortex_array::arrays::slice::SliceKernel;
16+
use vortex_array::kernel::ParentKernelSet;
17+
use vortex_error::VortexResult;
18+
use vortex_mask::Mask;
19+
20+
use crate::ParquetVariant;
21+
use crate::array::ParquetVariantArray;
22+
23+
pub(crate) static PARENT_KERNELS: ParentKernelSet<ParquetVariant> = ParentKernelSet::new(&[
24+
ParentKernelSet::lift(&FilterExecuteAdaptor(ParquetVariant)),
25+
ParentKernelSet::lift(&SliceExecuteAdaptor(ParquetVariant)),
26+
ParentKernelSet::lift(&TakeExecuteAdaptor(ParquetVariant)),
27+
]);
28+
29+
impl SliceKernel for ParquetVariant {
30+
fn slice(
31+
array: &ParquetVariantArray,
32+
range: Range<usize>,
33+
_ctx: &mut ExecutionCtx,
34+
) -> VortexResult<Option<ArrayRef>> {
35+
let validity = array.validity.slice(range.clone())?;
36+
let metadata = array.metadata.slice(range.clone())?;
37+
let value = array
38+
.value
39+
.as_ref()
40+
.map(|v| v.slice(range.clone()))
41+
.transpose()?;
42+
let typed_value = array
43+
.typed_value
44+
.as_ref()
45+
.map(|tv| tv.slice(range))
46+
.transpose()?;
47+
Ok(Some(
48+
ParquetVariantArray::try_new_with_validity(validity, metadata, value, typed_value)?
49+
.into_array(),
50+
))
51+
}
52+
}
53+
54+
impl FilterKernel for ParquetVariant {
55+
fn filter(
56+
array: &ParquetVariantArray,
57+
mask: &Mask,
58+
_ctx: &mut ExecutionCtx,
59+
) -> VortexResult<Option<ArrayRef>> {
60+
let validity = array.validity.filter(mask)?;
61+
let metadata = array.metadata.filter(mask.clone())?;
62+
let value = array
63+
.value
64+
.as_ref()
65+
.map(|v| v.filter(mask.clone()))
66+
.transpose()?;
67+
let typed_value = array
68+
.typed_value
69+
.as_ref()
70+
.map(|tv| tv.filter(mask.clone()))
71+
.transpose()?;
72+
Ok(Some(
73+
ParquetVariantArray::try_new_with_validity(validity, metadata, value, typed_value)?
74+
.into_array(),
75+
))
76+
}
77+
}
78+
79+
impl TakeExecute for ParquetVariant {
80+
fn take(
81+
array: &ParquetVariantArray,
82+
indices: &ArrayRef,
83+
_ctx: &mut ExecutionCtx,
84+
) -> VortexResult<Option<ArrayRef>> {
85+
let validity = array.validity.take(indices)?;
86+
let metadata = array.metadata.take(indices.to_array())?;
87+
let value = array
88+
.value
89+
.as_ref()
90+
.map(|v| v.take(indices.to_array()))
91+
.transpose()?;
92+
let typed_value = array
93+
.typed_value
94+
.as_ref()
95+
.map(|tv| tv.take(indices.to_array()))
96+
.transpose()?;
97+
Ok(Some(
98+
ParquetVariantArray::try_new_with_validity(validity, metadata, value, typed_value)?
99+
.into_array(),
100+
))
101+
}
102+
}
103+
104+
#[cfg(test)]
105+
mod tests {
106+
use std::sync::Arc;
107+
108+
use arrow_array::ArrayRef as ArrowArrayRef;
109+
use arrow_array::StructArray;
110+
use arrow_buffer::NullBuffer;
111+
use arrow_schema::DataType;
112+
use arrow_schema::Field;
113+
use parquet_variant::Variant as PqVariant;
114+
use parquet_variant_compute::VariantArray as ArrowVariantArray;
115+
use parquet_variant_compute::VariantArrayBuilder;
116+
use vortex_array::ArrayRef;
117+
use vortex_array::DynArray;
118+
use vortex_array::IntoArray;
119+
use vortex_array::arrays::PrimitiveArray;
120+
use vortex_error::VortexResult;
121+
use vortex_mask::Mask;
122+
123+
use crate::ParquetVariantArray;
124+
125+
fn make_unshredded_array() -> VortexResult<ArrayRef> {
126+
let mut builder = VariantArrayBuilder::new(4);
127+
builder.append_variant(PqVariant::from(42i32));
128+
builder.append_variant(PqVariant::from("hello"));
129+
builder.append_variant(PqVariant::from(true));
130+
builder.append_variant(PqVariant::from(99i64));
131+
ParquetVariantArray::from_arrow_variant(&builder.build())
132+
}
133+
134+
fn make_nullable_array() -> VortexResult<ArrayRef> {
135+
let mut builder = VariantArrayBuilder::new(4);
136+
builder.append_variant(PqVariant::from(42i32));
137+
builder.append_variant(PqVariant::from("hello"));
138+
builder.append_variant(PqVariant::from(true));
139+
builder.append_variant(PqVariant::from(99i64));
140+
let inner = builder.build().into_inner();
141+
142+
let null_struct = StructArray::try_new(
143+
inner.fields().clone(),
144+
inner.columns().to_vec(),
145+
Some(NullBuffer::from(vec![true, false, true, false])),
146+
)
147+
.unwrap();
148+
let arrow_variant = ArrowVariantArray::try_new(&null_struct).unwrap();
149+
ParquetVariantArray::from_arrow_variant(&arrow_variant)
150+
}
151+
152+
#[test]
153+
fn test_slice_basic() -> VortexResult<()> {
154+
let arr = make_unshredded_array()?;
155+
let sliced = arr.slice(1..3)?;
156+
157+
assert_eq!(sliced.len(), 2);
158+
assert_eq!(sliced.scalar_at(0)?, arr.scalar_at(1)?);
159+
assert_eq!(sliced.scalar_at(1)?, arr.scalar_at(2)?);
160+
161+
Ok(())
162+
}
163+
164+
#[test]
165+
fn test_slice_preserves_validity() -> VortexResult<()> {
166+
let arr = make_nullable_array()?;
167+
let sliced = arr.slice(0..3)?;
168+
169+
assert_eq!(sliced.len(), 3);
170+
assert!(!sliced.scalar_at(0)?.is_null());
171+
assert!(sliced.scalar_at(1)?.is_null());
172+
assert!(!sliced.scalar_at(2)?.is_null());
173+
174+
Ok(())
175+
}
176+
177+
#[test]
178+
fn test_filter_basic() -> VortexResult<()> {
179+
let arr = make_unshredded_array()?;
180+
let mask = Mask::from_iter([true, false, true, false]);
181+
let filtered = arr.filter(mask)?;
182+
183+
assert_eq!(filtered.len(), 2);
184+
assert_eq!(filtered.scalar_at(0)?, arr.scalar_at(0)?);
185+
assert_eq!(filtered.scalar_at(1)?, arr.scalar_at(2)?);
186+
187+
Ok(())
188+
}
189+
190+
#[test]
191+
fn test_filter_preserves_validity() -> VortexResult<()> {
192+
let arr = make_nullable_array()?;
193+
// Keep rows 0 (valid), 1 (null), 3 (null)
194+
let mask = Mask::from_iter([true, true, false, true]);
195+
let filtered = arr.filter(mask)?;
196+
197+
assert_eq!(filtered.len(), 3);
198+
assert!(!filtered.scalar_at(0)?.is_null());
199+
assert!(filtered.scalar_at(1)?.is_null());
200+
assert!(filtered.scalar_at(2)?.is_null());
201+
202+
Ok(())
203+
}
204+
205+
#[test]
206+
fn test_take_basic() -> VortexResult<()> {
207+
let arr = make_unshredded_array()?;
208+
let indices = PrimitiveArray::from_iter([2u64, 0, 3]);
209+
let taken = arr.take(indices.into_array())?;
210+
211+
assert_eq!(taken.len(), 3);
212+
assert_eq!(taken.scalar_at(0)?, arr.scalar_at(2)?);
213+
assert_eq!(taken.scalar_at(1)?, arr.scalar_at(0)?);
214+
assert_eq!(taken.scalar_at(2)?, arr.scalar_at(3)?);
215+
216+
Ok(())
217+
}
218+
219+
#[test]
220+
fn test_take_preserves_validity() -> VortexResult<()> {
221+
let arr = make_nullable_array()?;
222+
// Take: valid (0), null (1), null (3), valid (2)
223+
let indices = PrimitiveArray::from_iter([0u64, 1, 3, 2]);
224+
let taken = arr.take(indices.into_array())?;
225+
226+
assert_eq!(taken.len(), 4);
227+
assert!(!taken.scalar_at(0)?.is_null());
228+
assert!(taken.scalar_at(1)?.is_null());
229+
assert!(taken.scalar_at(2)?.is_null());
230+
assert!(!taken.scalar_at(3)?.is_null());
231+
232+
Ok(())
233+
}
234+
235+
fn binary_view_array(values: &[&[u8]]) -> ArrowArrayRef {
236+
let mut builder = arrow_array::builder::BinaryViewBuilder::new();
237+
for value in values {
238+
builder.append_value(*value);
239+
}
240+
Arc::new(builder.finish())
241+
}
242+
243+
#[test]
244+
fn test_slice_shredded_typed_value() -> VortexResult<()> {
245+
let metadata = binary_view_array(&[b"\x01\x00", b"\x01\x00", b"\x01\x00"]);
246+
let typed_value: ArrowArrayRef = Arc::new(arrow_array::Int32Array::from(vec![10, 20, 30]));
247+
248+
let struct_array = StructArray::try_new(
249+
vec![
250+
Arc::new(Field::new("metadata", DataType::BinaryView, false)),
251+
Arc::new(Field::new("typed_value", DataType::Int32, false)),
252+
]
253+
.into(),
254+
vec![metadata, typed_value],
255+
None,
256+
)
257+
.unwrap();
258+
let arrow_variant = ArrowVariantArray::try_new(&struct_array).unwrap();
259+
let arr = ParquetVariantArray::from_arrow_variant(&arrow_variant)?;
260+
261+
let sliced = arr.slice(1..3)?;
262+
assert_eq!(sliced.len(), 2);
263+
assert_eq!(sliced.scalar_at(0)?, arr.scalar_at(1)?);
264+
assert_eq!(sliced.scalar_at(1)?, arr.scalar_at(2)?);
265+
266+
Ok(())
267+
}
268+
}

encodings/parquet-variant/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
//! [Arrow canonical extension type]: https://arrow.apache.org/docs/format/CanonicalExtensions.html#parquet-variant
2626
2727
mod array;
28+
mod kernel;
2829
mod operations;
2930
mod validity;
3031
mod vtable;

encodings/parquet-variant/src/vtable.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use vortex_proto::dtype as pb;
3535
use vortex_session::VortexSession;
3636

3737
use crate::array::ParquetVariantArray;
38+
use crate::kernel::PARENT_KERNELS;
3839

3940
/// VTable for [`ParquetVariantArray`].
4041
#[derive(Debug, Clone)]
@@ -342,6 +343,15 @@ impl VTable for ParquetVariant {
342343
VariantArray::new(array.as_ref().clone().into_array()).into_array(),
343344
))
344345
}
346+
347+
fn execute_parent(
348+
array: &Self::Array,
349+
parent: &ArrayRef,
350+
child_idx: usize,
351+
ctx: &mut ExecutionCtx,
352+
) -> VortexResult<Option<ArrayRef>> {
353+
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
354+
}
345355
}
346356

347357
#[cfg(test)]

0 commit comments

Comments
 (0)