Skip to content

Commit a659bc2

Browse files
committed
feat: faster TakeKernel for VarBinArray
also removes potential overflow issues Signed-off-by: Andrew Duffy <[email protected]>
1 parent 2fd4660 commit a659bc2

File tree

1 file changed

+72
-154
lines changed
  • vortex-array/src/arrays/varbin/compute

1 file changed

+72
-154
lines changed

vortex-array/src/arrays/varbin/compute/take.rs

Lines changed: 72 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -1,178 +1,96 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_buffer::{BitBufferMut, BufferMut, ByteBufferMut};
5-
use vortex_dtype::{DType, IntegerPType, match_each_integer_ptype};
6-
use vortex_error::{VortexExpect, VortexResult, vortex_panic};
7-
use vortex_mask::Mask;
4+
use std::sync::Arc;
5+
6+
use vortex_buffer::Buffer;
7+
use vortex_dtype::{IntegerPType, match_each_integer_ptype};
8+
use vortex_error::{VortexExpect, VortexResult};
9+
use vortex_vector::binaryview::BinaryView;
810

911
use crate::arrays::varbin::VarBinArray;
10-
use crate::arrays::{PrimitiveArray, VarBinVTable};
12+
use crate::arrays::{VarBinVTable, VarBinViewArray};
1113
use crate::compute::{TakeKernel, TakeKernelAdapter};
12-
use crate::validity::Validity;
14+
use crate::vtable::ValidityHelper;
1315
use crate::{Array, ArrayRef, IntoArray, ToCanonical, register_kernel};
1416

1517
impl TakeKernel for VarBinVTable {
18+
#[allow(clippy::cognitive_complexity)]
1619
fn take(&self, array: &VarBinArray, indices: &dyn Array) -> VortexResult<ArrayRef> {
1720
let offsets = array.offsets().to_primitive();
18-
let data = array.bytes();
21+
let data = array.bytes().clone();
22+
23+
let has_null_indices = !indices.all_valid();
24+
25+
// Get the validity result
26+
let result_validity = array.validity().take(indices)?;
27+
let result_dtype = array
28+
.dtype()
29+
.with_nullability(result_validity.nullability());
30+
1931
let indices = indices.to_primitive();
32+
2033
match_each_integer_ptype!(offsets.ptype(), |O| {
2134
match_each_integer_ptype!(indices.ptype(), |I| {
22-
Ok(take(
23-
array
24-
.dtype()
25-
.clone()
26-
.union_nullability(indices.dtype().nullability()),
27-
offsets.as_slice::<O>(),
28-
data.as_slice(),
29-
indices.as_slice::<I>(),
30-
array.validity_mask(),
31-
indices.validity_mask(),
32-
)?
33-
.into_array())
35+
let views = if has_null_indices {
36+
take_to_views::<O, I, _>(
37+
data.as_slice(),
38+
offsets.as_slice::<O>(),
39+
indices.as_slice::<I>(),
40+
|index| indices.is_valid(index),
41+
)
42+
} else {
43+
take_to_views::<O, I, _>(
44+
data.as_slice(),
45+
offsets.as_slice::<O>(),
46+
indices.as_slice::<I>(),
47+
|_| true,
48+
)
49+
};
50+
51+
// SAFETY: views are constructed against validated
52+
// string array, validity will have same length as indices.
53+
unsafe {
54+
Ok(VarBinViewArray::new_unchecked(
55+
views,
56+
Arc::new([data]),
57+
result_dtype,
58+
result_validity,
59+
)
60+
.into_array())
61+
}
3462
})
3563
})
3664
}
3765
}
3866

3967
register_kernel!(TakeKernelAdapter(VarBinVTable).lift());
4068

41-
fn take<I: IntegerPType, O: IntegerPType>(
42-
dtype: DType,
43-
offsets: &[O],
44-
data: &[u8],
45-
indices: &[I],
46-
validity_mask: Mask,
47-
indices_validity_mask: Mask,
48-
) -> VortexResult<VarBinArray> {
49-
if !validity_mask.all_true() || !indices_validity_mask.all_true() {
50-
return Ok(take_nullable(
51-
dtype,
52-
offsets,
53-
data,
54-
indices,
55-
validity_mask,
56-
indices_validity_mask,
57-
));
58-
}
59-
60-
let mut new_offsets = BufferMut::with_capacity(indices.len() + 1);
61-
new_offsets.push(O::zero());
62-
let mut current_offset = O::zero();
63-
64-
for &idx in indices {
65-
let idx = idx
66-
.to_usize()
67-
.unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", idx));
68-
let start = offsets[idx];
69-
let stop = offsets[idx + 1];
70-
current_offset += stop - start;
71-
new_offsets.push(current_offset);
72-
}
73-
74-
let mut new_data = ByteBufferMut::with_capacity(
75-
current_offset
76-
.to_usize()
77-
.vortex_expect("Failed to cast max offset to usize"),
78-
);
79-
80-
for idx in indices {
81-
let idx = idx
82-
.to_usize()
83-
.unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", idx));
84-
let start = offsets[idx]
85-
.to_usize()
86-
.vortex_expect("Failed to cast max offset to usize");
87-
let stop = offsets[idx + 1]
88-
.to_usize()
89-
.vortex_expect("Failed to cast max offset to usize");
90-
new_data.extend_from_slice(&data[start..stop]);
91-
}
92-
93-
let array_validity = Validity::from(dtype.nullability());
94-
95-
// Safety:
96-
// All variants of VarBinArray are satisfied here.
97-
unsafe {
98-
Ok(VarBinArray::new_unchecked(
99-
PrimitiveArray::new(new_offsets.freeze(), Validity::NonNullable).into_array(),
100-
new_data.freeze(),
101-
dtype,
102-
array_validity,
103-
))
104-
}
105-
}
106-
107-
fn take_nullable<I: IntegerPType, O: IntegerPType>(
108-
dtype: DType,
109-
offsets: &[O],
110-
data: &[u8],
111-
indices: &[I],
112-
data_validity: Mask,
113-
indices_validity: Mask,
114-
) -> VarBinArray {
115-
let mut new_offsets = BufferMut::with_capacity(indices.len() + 1);
116-
new_offsets.push(O::zero());
117-
let mut current_offset = O::zero();
118-
119-
let mut validity_buffer = BitBufferMut::with_capacity(indices.len());
120-
121-
// Convert indices once and store valid ones with their positions
122-
let mut valid_indices = Vec::with_capacity(indices.len());
123-
124-
// First pass: calculate offsets and validity
125-
for (idx, data_idx) in indices.iter().enumerate() {
126-
if !indices_validity.value(idx) {
127-
validity_buffer.append(false);
128-
new_offsets.push(current_offset);
129-
continue;
130-
}
131-
let data_idx_usize = data_idx
132-
.to_usize()
133-
.unwrap_or_else(|| vortex_panic!("Failed to convert index to usize: {}", data_idx));
134-
if data_validity.value(data_idx_usize) {
135-
validity_buffer.append(true);
136-
let start = offsets[data_idx_usize];
137-
let stop = offsets[data_idx_usize + 1];
138-
current_offset += stop - start;
139-
new_offsets.push(current_offset);
140-
valid_indices.push(data_idx_usize);
141-
} else {
142-
validity_buffer.append(false);
143-
new_offsets.push(current_offset);
144-
}
145-
}
146-
147-
let mut new_data = ByteBufferMut::with_capacity(
148-
current_offset
149-
.to_usize()
150-
.vortex_expect("Failed to cast max offset to usize"),
151-
);
152-
153-
// Second pass: copy data for valid indices only
154-
for data_idx in valid_indices {
155-
let start = offsets[data_idx]
156-
.to_usize()
157-
.vortex_expect("Failed to cast max offset to usize");
158-
let stop = offsets[data_idx + 1]
159-
.to_usize()
160-
.vortex_expect("Failed to cast max offset to usize");
161-
new_data.extend_from_slice(&data[start..stop]);
162-
}
163-
164-
let array_validity = Validity::from(validity_buffer.freeze());
165-
166-
// Safety:
167-
// All variants of VarBinArray are satisfied here.
168-
unsafe {
169-
VarBinArray::new_unchecked(
170-
PrimitiveArray::new(new_offsets.freeze(), Validity::NonNullable).into_array(),
171-
new_data.freeze(),
172-
dtype,
173-
array_validity,
174-
)
175-
}
69+
/// A take implementation which yields VarBinViewArray back.
70+
#[inline(always)]
71+
fn take_to_views<Offset: IntegerPType, Index: IntegerPType, F: Fn(usize) -> bool>(
72+
bytes: &[u8],
73+
offsets: &[Offset],
74+
indices: &[Index],
75+
index_is_valid: F,
76+
) -> Buffer<BinaryView> {
77+
indices
78+
.iter()
79+
.copied()
80+
.enumerate()
81+
.map(|(indices_index, index)| {
82+
if !index_is_valid(indices_index) {
83+
BinaryView::empty_view()
84+
} else {
85+
let index = index.as_();
86+
let offset = offsets[index].to_u32().vortex_expect("offset u32");
87+
let end = offsets[index + 1].as_();
88+
let string = &bytes[offset as usize..end];
89+
90+
BinaryView::make_view(string, 0u32, offset)
91+
}
92+
})
93+
.collect()
17694
}
17795

17896
#[cfg(test)]

0 commit comments

Comments
 (0)