Skip to content

Commit 5d56aea

Browse files
committed
implement execute_batch for FixedSizeList
Signed-off-by: Connor Tsui <[email protected]>
1 parent 71f0f00 commit 5d56aea

File tree

4 files changed

+133
-14
lines changed

4 files changed

+133
-14
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use vortex_compute::filter::Filter;
7+
use vortex_error::{VortexExpect, VortexResult};
8+
use vortex_mask::{Mask, MaskIter};
9+
use vortex_vector::Vector;
10+
use vortex_vector::fixed_size_list::FixedSizeListVector;
11+
12+
use crate::arrays::{FixedSizeListArray, FixedSizeListVTable};
13+
use crate::execution::ExecutionCtx;
14+
use crate::vtable::OperatorVTable;
15+
16+
// TODO(connor): Write some benchmarks to actually figure this out.
17+
/// Density threshold for choosing between indices and slices representation when expanding masks.
18+
///
19+
/// When the mask density is below this threshold, we use indices. Otherwise, we use slices.
20+
///
21+
/// Note that this is somewhat arbitrarily chosen...
22+
const MASK_EXPANSION_DENSITY_THRESHOLD: f64 = 0.05;
23+
24+
impl OperatorVTable<FixedSizeListVTable> for FixedSizeListVTable {
25+
fn execute_batch(
26+
array: &FixedSizeListArray,
27+
selection_mask: &Mask,
28+
_ctx: &mut dyn ExecutionCtx,
29+
) -> VortexResult<Vector> {
30+
let list_size = array.list_size();
31+
let elem_dtype = array
32+
.dtype()
33+
.as_fixed_size_list_element_opt()
34+
.vortex_expect("`FixedSizeListArray` `DType` was somehow not `FixedSizeList`")
35+
.clone();
36+
37+
let new_validity = array.validity_mask().filter(selection_mask);
38+
39+
// TODO(connor): Should we raise an error if a child kernel returns a data-full `elements`?
40+
// Technically nothing bad will happen if we don't because of this edge case handling below.
41+
42+
// If the size of each list is 0, then we know that the child elements must empty. Even if
43+
// the child kernel incorrectly gives us some data, we can (correctly) just throw it away.
44+
let elements = if list_size == 0 {
45+
Vector::empty(&elem_dtype)
46+
} else {
47+
// Otherwise, bind the child elements by "expanding" the selection mask out by
48+
// `list_size` so that we correctly select all of the child elements we need.
49+
let expanded_selection = expand_selection(selection_mask, list_size as usize);
50+
51+
array
52+
.elements()
53+
.execute_with_selection(&expanded_selection)?
54+
};
55+
56+
Ok(FixedSizeListVector::try_new(Arc::new(elements), list_size, new_validity)?.into())
57+
}
58+
}
59+
60+
/// Given a mask for a fixed-size list array, creates a new mask for the underlying elements.
61+
///
62+
/// This function simply "expands" out the input `selection_mask` by duplicating each bit
63+
/// `list_size` times.
64+
///
65+
/// The output `Mask` is guaranteed to have a length equal to `selection_mask.len() * list_size`.
66+
fn expand_selection(selection_mask: &Mask, list_size: usize) -> Mask {
67+
let expanded_len = selection_mask.len() * list_size;
68+
69+
let values = match selection_mask {
70+
Mask::AllTrue(_) => return Mask::AllTrue(expanded_len),
71+
Mask::AllFalse(_) => return Mask::AllFalse(expanded_len),
72+
Mask::Values(values) => values,
73+
};
74+
75+
// Use threshold_iter to choose the optimal representation based on density.
76+
let expanded_slices = match values.threshold_iter(MASK_EXPANSION_DENSITY_THRESHOLD) {
77+
MaskIter::Slices(slices) => {
78+
// Expand a dense mask (represented as slices) by scaling each slice by `list_size`.
79+
slices
80+
.iter()
81+
.map(|&(start, end)| (start * list_size, end * list_size))
82+
.collect()
83+
}
84+
MaskIter::Indices(indices) => {
85+
// Expand a sparse mask (represented as indices) by duplicating each index `list_size`
86+
// times.
87+
//
88+
// TODO(connor): Note that in the worst case, it is possible that we create only a few
89+
// slices with a small range (for example, when list_size <= 2). This could be further
90+
// optimized, but we choose simplicity for now.
91+
indices
92+
.iter()
93+
.map(|&idx| {
94+
let start = idx * list_size;
95+
let end = (idx + 1) * list_size;
96+
(start, end)
97+
})
98+
.collect()
99+
}
100+
};
101+
102+
Mask::from_slices(expanded_len, expanded_slices)
103+
}

vortex-vector/src/listview/vector_mut.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,13 +176,21 @@ impl ListViewVectorMut {
176176

177177
/// Creates a new [`ListViewVectorMut`] with the specified capacity.
178178
pub fn with_capacity(element_dtype: &DType, capacity: usize) -> Self {
179-
unsafe {
180-
Self::new_unchecked(
181-
Box::new(VectorMut::with_capacity(element_dtype, 0)),
182-
PrimitiveVectorMut::with_capacity(PType::U64, capacity),
183-
PrimitiveVectorMut::with_capacity(PType::U32, capacity),
184-
MaskMut::with_capacity(capacity),
185-
)
179+
// We arbitrarily choose 2 times the number of list scalars for the capacity of the elements
180+
// since we cannot know this ahead of time.
181+
let elements = Box::new(VectorMut::with_capacity(element_dtype, capacity * 2));
182+
183+
let offsets = PrimitiveVectorMut::with_capacity(PType::U32, capacity);
184+
let sizes = PrimitiveVectorMut::with_capacity(PType::U32, capacity);
185+
186+
let validity = MaskMut::with_capacity(capacity);
187+
188+
Self {
189+
elements,
190+
offsets,
191+
sizes,
192+
validity,
193+
len: 0,
186194
}
187195
}
188196

vortex-vector/src/vector.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use std::fmt::Debug;
1010
use std::ops::RangeBounds;
1111

12+
use vortex_dtype::DType;
1213
use vortex_error::vortex_panic;
1314

1415
use crate::binaryview::{BinaryVector, StringVector};
@@ -19,7 +20,7 @@ use crate::listview::ListViewVector;
1920
use crate::null::NullVector;
2021
use crate::primitive::PrimitiveVector;
2122
use crate::struct_::StructVector;
22-
use crate::{Scalar, VectorMut, VectorOps, match_each_vector};
23+
use crate::{Scalar, VectorMut, VectorMutOps, VectorOps, match_each_vector};
2324

2425
/// An enum over all kinds of immutable vectors, which represent fully decompressed (canonical)
2526
/// array data.
@@ -97,6 +98,13 @@ impl VectorOps for Vector {
9798
}
9899

99100
impl Vector {
101+
/// Returns an empty `Vector` according to the given `DType`.
102+
pub fn empty(dtype: &DType) -> Self {
103+
// We _could_ manually implement this for `Vector` instead of doing this via `VectorMut` and
104+
// `freeze`, but it's probably not worth it.
105+
VectorMut::with_capacity(dtype, 0).freeze()
106+
}
107+
100108
/// Returns a reference to the inner [`NullVector`] if `self` is of that variant.
101109
pub fn as_null(&self) -> &NullVector {
102110
if let Vector::Null(v) = self {

vortex-vector/src/vector_mut.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,19 @@ impl VectorMut {
7070
DType::Primitive(ptype, _) => {
7171
PrimitiveVectorMut::with_capacity(*ptype, capacity).into()
7272
}
73+
DType::Decimal(decimal_dtype, _) => {
74+
DecimalVectorMut::with_capacity(decimal_dtype, capacity).into()
75+
}
76+
DType::Utf8(..) => StringVectorMut::with_capacity(capacity).into(),
77+
DType::Binary(..) => BinaryVectorMut::with_capacity(capacity).into(),
78+
DType::List(..) => ListViewVectorMut::with_capacity(dtype, capacity).into(),
7379
DType::FixedSizeList(elem_dtype, list_size, _) => {
7480
FixedSizeListVectorMut::with_capacity(elem_dtype, *list_size, capacity).into()
7581
}
7682
DType::Struct(struct_fields, _) => {
7783
StructVectorMut::with_capacity(struct_fields, capacity).into()
7884
}
79-
DType::Decimal(decimal_dtype, _) => {
80-
DecimalVectorMut::with_capacity(decimal_dtype, capacity).into()
81-
}
82-
DType::Utf8(..) => StringVectorMut::with_capacity(capacity).into(),
83-
DType::Binary(..) => BinaryVectorMut::with_capacity(capacity).into(),
8485
DType::Extension(ext) => VectorMut::with_capacity(ext.storage_dtype(), capacity),
85-
DType::List(..) => ListViewVectorMut::with_capacity(dtype, capacity).into(),
8686
}
8787
}
8888
}

0 commit comments

Comments
 (0)