Skip to content

Commit cb065dd

Browse files
authored
Feature: Implement take on more vectors + stubs (#5653)
Tracking Issue: #5652 #5540 has to be merged first. Implements `take` on the remaining vectors, except for `DecimalVector`, `BinaryViewVector`, and `ListViewVector` For `DecimalVector` we cannot trivially reuse the existing `take` compute on slices and buffers because it has a bound of `NativePType`. We can relax this bound to `Copy` instead, but that will require some more work. Note that the current `DecimalArray` `take` implementation is fully scalar right now so we can definitely improve that in the new world. For the view vectors I believe we want to think through how we "rebuild" those in the array plan (we probably want to place rebuild nodes at good places after just doing a `take` on the existing views). So this PR implements: - `FixedSizeListVector` - `NullVector` - `StructVector` Edit: Now that #5666 has merged, this also implements `DecimalVector`. --------- Signed-off-by: Connor Tsui <[email protected]>
1 parent 366b694 commit cb065dd

File tree

15 files changed

+887
-60
lines changed

15 files changed

+887
-60
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ jiff = "0.2.0"
134134
kanal = "0.1.1"
135135
lending-iterator = "0.1.7"
136136
libfuzzer-sys = "0.4"
137-
log = { version = "0.4" }
137+
log = { version = "0.4.21" }
138138
loom = { version = "0.7", features = ["checkpoint"] }
139139
memmap2 = "0.9.5"
140140
mimalloc = "0.1.42"

vortex-array/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,5 +159,9 @@ harness = false
159159
name = "take_primitive"
160160
harness = false
161161

162+
[[bench]]
163+
name = "take_struct"
164+
harness = false
165+
162166
[package.metadata.cargo-machete]
163167
ignored = ["getrandom_v03"]
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
#![allow(clippy::unwrap_used)]
5+
6+
use divan::Bencher;
7+
use rand::Rng;
8+
use rand::SeedableRng;
9+
use rand::distr::Uniform;
10+
use rand::rngs::StdRng;
11+
use vortex_array::IntoArray;
12+
use vortex_array::arrays::StructArray;
13+
use vortex_array::compute::take;
14+
use vortex_array::validity::Validity;
15+
use vortex_buffer::Buffer;
16+
use vortex_dtype::FieldNames;
17+
18+
fn main() {
19+
divan::main();
20+
}
21+
22+
const ARRAY_SIZE: usize = 100_000;
23+
const TAKE_SIZE: usize = 1000;
24+
25+
#[divan::bench]
26+
fn take_struct_simple(bencher: Bencher) {
27+
let mut rng = StdRng::seed_from_u64(0);
28+
let range = Uniform::new(0i64, 100_000_000).unwrap();
29+
30+
// Create single field for the struct
31+
let field = (0..ARRAY_SIZE)
32+
.map(|_| rng.sample(range))
33+
.collect::<Buffer<i64>>()
34+
.into_array();
35+
36+
let struct_array = StructArray::try_new(
37+
FieldNames::from(["value"]),
38+
vec![field],
39+
ARRAY_SIZE,
40+
Validity::NonNullable,
41+
)
42+
.unwrap();
43+
44+
let indices: Buffer<u64> = (0..TAKE_SIZE)
45+
.map(|_| rng.random_range(0..ARRAY_SIZE) as u64)
46+
.collect();
47+
let indices_array = indices.into_array();
48+
49+
bencher
50+
.with_inputs(|| (&struct_array, &indices_array))
51+
.bench_refs(|(array, indices)| take(array.as_ref(), indices.as_ref()).unwrap());
52+
}
53+
54+
#[divan::bench(args = [8])]
55+
fn take_struct_wide(bencher: Bencher, width: usize) {
56+
let mut rng = StdRng::seed_from_u64(0);
57+
let range = Uniform::new(0i64, 100_000_000).unwrap();
58+
59+
let fields: Vec<_> = (0..width)
60+
.map(|_| {
61+
(0..ARRAY_SIZE)
62+
.map(|_| rng.sample(range))
63+
.collect::<Buffer<i64>>()
64+
.into_array()
65+
})
66+
.collect();
67+
68+
let field_names = FieldNames::from([
69+
"field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8",
70+
]);
71+
72+
let struct_array =
73+
StructArray::try_new(field_names, fields, ARRAY_SIZE, Validity::NonNullable).unwrap();
74+
75+
let indices: Buffer<u64> = (0..TAKE_SIZE)
76+
.map(|_| rng.random_range(0..ARRAY_SIZE) as u64)
77+
.collect();
78+
let indices_array = indices.into_array();
79+
80+
bencher
81+
.with_inputs(|| (&struct_array, &indices_array))
82+
.bench_refs(|(array, indices)| take(array.as_ref(), indices.as_ref()).unwrap());
83+
}
84+
85+
#[divan::bench]
86+
fn take_struct_sequential_indices(bencher: Bencher) {
87+
let mut rng = StdRng::seed_from_u64(0);
88+
let range = Uniform::new(0i64, 100_000_000).unwrap();
89+
90+
// Create single field for the struct
91+
let field = (0..ARRAY_SIZE)
92+
.map(|_| rng.sample(range))
93+
.collect::<Buffer<i64>>()
94+
.into_array();
95+
96+
let struct_array = StructArray::try_new(
97+
FieldNames::from(["value"]),
98+
vec![field],
99+
ARRAY_SIZE,
100+
Validity::NonNullable,
101+
)
102+
.unwrap();
103+
104+
// Sequential indices for better cache performance
105+
let indices: Buffer<u64> = (0..TAKE_SIZE as u64).collect();
106+
let indices_array = indices.into_array();
107+
108+
bencher
109+
.with_inputs(|| (&struct_array, &indices_array))
110+
.bench_refs(|(array, indices)| take(array.as_ref(), indices.as_ref()).unwrap());
111+
}

vortex-compute/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ arrow-array = { workspace = true, optional = true }
3030
arrow-buffer = { workspace = true, optional = true }
3131
arrow-schema = { workspace = true, optional = true }
3232
half = { workspace = true }
33+
itertools = { workspace = true }
34+
log = { workspace = true }
3335
multiversion = { workspace = true }
3436
num-traits = { workspace = true }
3537
paste = { workspace = true }

vortex-compute/src/take/bit_buffer.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,29 @@ fn take_bool<I: UnsignedPType>(bools: &BitBuffer, indices: &[I]) -> BitBuffer {
5858
get_bit(buffer, offset + bool_idx)
5959
})
6060
}
61+
62+
#[cfg(test)]
63+
mod tests {
64+
use crate::take::Take;
65+
66+
#[test]
67+
fn test_take_bit_buffer_take_small_and_large() {
68+
use vortex_buffer::BitBuffer;
69+
70+
// Small buffer (uses take_byte_bool path).
71+
let small: BitBuffer = [true, false, true, true, false, true, false, false]
72+
.into_iter()
73+
.collect();
74+
let result = (&small).take(&[7u32, 0, 2, 5, 1][..]);
75+
76+
let values: Vec<bool> = (0..result.len()).map(|i| result.value(i)).collect();
77+
assert_eq!(values, vec![false, true, true, true, false]);
78+
79+
// Large buffer (uses take_bool path, len > 4096).
80+
let large: BitBuffer = (0..5000).map(|i| i % 3 == 0).collect();
81+
let result = (&large).take(&[4999u32, 0, 1, 2, 3, 4998][..]);
82+
83+
let values: Vec<bool> = (0..result.len()).map(|i| result.value(i)).collect();
84+
assert_eq!(values, vec![false, true, false, false, true, true]);
85+
}
86+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
// use std::ops::Deref;
5+
6+
// use num_traits::AsPrimitive;
7+
// use vortex_buffer::Buffer;
8+
use vortex_dtype::UnsignedPType;
9+
use vortex_vector::VectorOps;
10+
// use vortex_vector::binaryview::BinaryView;
11+
use vortex_vector::binaryview::BinaryViewType;
12+
use vortex_vector::binaryview::BinaryViewVector;
13+
use vortex_vector::primitive::PVector;
14+
15+
use crate::take::Take;
16+
17+
impl<T: BinaryViewType, I: UnsignedPType> Take<PVector<I>> for &BinaryViewVector<T> {
18+
type Output = BinaryViewVector<T>;
19+
20+
fn take(self, indices: &PVector<I>) -> BinaryViewVector<T> {
21+
if indices.validity().all_true() {
22+
self.take(indices.elements().as_slice())
23+
} else {
24+
take_nullable(self, indices)
25+
}
26+
}
27+
}
28+
29+
impl<T: BinaryViewType, I: UnsignedPType> Take<[I]> for &BinaryViewVector<T> {
30+
type Output = BinaryViewVector<T>;
31+
32+
fn take(self, _indices: &[I]) -> BinaryViewVector<T> {
33+
todo!("TODO(connor): Implement `take` for `BinaryViewVector` and figure out rebuilding");
34+
35+
/*
36+
37+
let taken_views = take_views(self.views(), indices);
38+
let taken_validity = self.validity().take(indices);
39+
40+
debug_assert_eq!(taken_views.len(), taken_validity.len());
41+
42+
// SAFETY: We called take on views and validity with the same indices, so the new components
43+
// must have the same length. The views still point into the same buffers which we clone via
44+
// Arc, so all view references remain valid.
45+
unsafe {
46+
BinaryViewVector::new_unchecked(taken_views, self.buffers().clone(), taken_validity)
47+
}
48+
49+
*/
50+
}
51+
}
52+
53+
fn take_nullable<T: BinaryViewType, I: UnsignedPType>(
54+
_bvector: &BinaryViewVector<T>,
55+
_indices: &PVector<I>,
56+
) -> BinaryViewVector<T> {
57+
todo!("TODO(connor): Implement `take` for `BinaryViewVector` and figure out rebuilding");
58+
59+
/*
60+
61+
// We ignore nullability when taking the views since we can let the `Mask` implementation
62+
// determine which elements are null.
63+
let taken_views = take_views(bvector.views(), indices.elements().as_slice());
64+
let taken_validity = bvector.validity().take(indices);
65+
66+
debug_assert_eq!(taken_views.len(), taken_validity.len());
67+
68+
// SAFETY: We used the same indices to take from both components, so they should still have the
69+
// same length. The views still point into the same buffers which we clone via Arc, so all view
70+
// references remain valid.
71+
unsafe {
72+
BinaryViewVector::new_unchecked(taken_views, bvector.buffers().clone(), taken_validity)
73+
}
74+
75+
*/
76+
}
77+
78+
/*
79+
80+
/// Takes views at the given indices.
81+
fn take_views<I: AsPrimitive<usize>>(
82+
views: &Buffer<BinaryView>,
83+
indices: &[I],
84+
) -> Buffer<BinaryView> {
85+
let views_ref = views.deref();
86+
Buffer::<BinaryView>::from_trusted_len_iter(indices.iter().map(|i| views_ref[(*i).as_()]))
87+
}
88+
89+
*/
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_dtype::UnsignedPType;
5+
use vortex_vector::VectorOps;
6+
use vortex_vector::decimal::DecimalVector;
7+
use vortex_vector::match_each_dvector;
8+
use vortex_vector::primitive::PVector;
9+
10+
use crate::take::Take;
11+
12+
impl<I: UnsignedPType> Take<PVector<I>> for &DecimalVector {
13+
type Output = DecimalVector;
14+
15+
fn take(self, indices: &PVector<I>) -> DecimalVector {
16+
// If all the indices are valid, we can delegate to the slice indices implementation.
17+
if indices.validity().all_true() {
18+
return self.take(indices.elements().as_slice());
19+
}
20+
21+
match_each_dvector!(self, |v| { v.take(indices).into() })
22+
}
23+
}
24+
25+
impl<I: UnsignedPType> Take<[I]> for &DecimalVector {
26+
type Output = DecimalVector;
27+
28+
fn take(self, indices: &[I]) -> DecimalVector {
29+
match_each_dvector!(self, |v| { v.take(indices).into() })
30+
}
31+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_dtype::NativeDecimalType;
5+
use vortex_dtype::UnsignedPType;
6+
use vortex_vector::VectorOps;
7+
use vortex_vector::decimal::DVector;
8+
use vortex_vector::primitive::PVector;
9+
10+
use crate::take::Take;
11+
12+
impl<D: NativeDecimalType, I: UnsignedPType> Take<PVector<I>> for &DVector<D> {
13+
type Output = DVector<D>;
14+
15+
fn take(self, indices: &PVector<I>) -> DVector<D> {
16+
if indices.validity().all_true() {
17+
self.take(indices.elements().as_slice())
18+
} else {
19+
take_nullable(self, indices)
20+
}
21+
}
22+
}
23+
24+
impl<D: NativeDecimalType, I: UnsignedPType> Take<[I]> for &DVector<D> {
25+
type Output = DVector<D>;
26+
27+
fn take(self, indices: &[I]) -> DVector<D> {
28+
let taken_elements = self.elements().take(indices);
29+
let taken_validity = self.validity().take(indices);
30+
31+
debug_assert_eq!(taken_elements.len(), taken_validity.len());
32+
33+
// SAFETY: We called take on both components of the vector with the same indices, so the new
34+
// components must have the same length. The elements are unchanged, so they must still be
35+
// within the precision/scale bounds.
36+
unsafe { DVector::new_unchecked(self.precision_scale(), taken_elements, taken_validity) }
37+
}
38+
}
39+
40+
fn take_nullable<D: NativeDecimalType, I: UnsignedPType>(
41+
dvector: &DVector<D>,
42+
indices: &PVector<I>,
43+
) -> DVector<D> {
44+
// We ignore nullability when taking the elements since we can let the `Mask` implementation
45+
// determine which elements are null.
46+
let taken_elements = dvector.elements().take(indices.elements().as_slice());
47+
let taken_validity = dvector.validity().take(indices);
48+
49+
debug_assert_eq!(taken_elements.len(), taken_validity.len());
50+
51+
// SAFETY: We used the same indices to take from both components, so they should still have the
52+
// same length. The elements are unchanged, so they must still be within the precision/scale
53+
// bounds.
54+
unsafe { DVector::new_unchecked(dvector.precision_scale(), taken_elements, taken_validity) }
55+
}

0 commit comments

Comments
 (0)