Skip to content

Commit 2013eff

Browse files
feat[vortex-array] chunked array has owned chunk_offsets child (#5426)
We should ensure that visitors are given ref to externally owned array Signed-off-by: Joe Isaacs <[email protected]>
1 parent bec5098 commit 2013eff

File tree

6 files changed

+43
-26
lines changed

6 files changed

+43
-26
lines changed

vortex-array/src/arrays/chunked/array.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@ use vortex_buffer::{Buffer, BufferMut};
1212
use vortex_dtype::DType;
1313
use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
1414

15+
use crate::arrays::PrimitiveArray;
1516
use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
1617
use crate::search_sorted::{SearchSorted, SearchSortedSide};
1718
use crate::stats::ArrayStats;
1819
use crate::stream::{ArrayStream, ArrayStreamAdapter};
20+
use crate::validity::Validity;
1921
use crate::{Array, ArrayRef, IntoArray};
2022

2123
#[derive(Clone, Debug)]
2224
pub struct ChunkedArray {
2325
pub(super) dtype: DType,
2426
pub(super) len: usize,
25-
pub(super) chunk_offsets: Buffer<u64>,
27+
pub(super) chunk_offsets: PrimitiveArray,
2628
pub(super) chunks: Vec<ArrayRef>,
2729
pub(super) stats_set: ArrayStats,
2830
}
@@ -58,20 +60,22 @@ impl ChunkedArray {
5860

5961
let nchunks = chunks.len();
6062

61-
let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
63+
let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(nchunks + 1);
6264
// SAFETY: nchunks + 1
63-
unsafe { chunk_offsets.push_unchecked(0) }
65+
unsafe { chunk_offsets_buf.push_unchecked(0) }
6466
let mut curr_offset = 0;
6567
for c in &chunks {
6668
curr_offset += c.len() as u64;
6769
// SAFETY: nchunks + 1
68-
unsafe { chunk_offsets.push_unchecked(curr_offset) }
70+
unsafe { chunk_offsets_buf.push_unchecked(curr_offset) }
6971
}
7072

73+
let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.freeze(), Validity::NonNullable);
74+
7175
Self {
7276
dtype,
7377
len: curr_offset.try_into().vortex_unwrap(),
74-
chunk_offsets: chunk_offsets.freeze(),
78+
chunk_offsets,
7579
chunks,
7680
stats_set: Default::default(),
7781
}
@@ -102,8 +106,8 @@ impl ChunkedArray {
102106
}
103107

104108
#[inline]
105-
pub fn chunk_offsets(&self) -> &Buffer<u64> {
106-
&self.chunk_offsets
109+
pub fn chunk_offsets(&self) -> Buffer<u64> {
110+
self.chunk_offsets.buffer()
107111
}
108112

109113
pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {

vortex-array/src/arrays/chunked/compute/filter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ pub(crate) fn chunk_filters(
8181
let mut chunk_filters = vec![ChunkFilter::None; array.nchunks()];
8282

8383
for (slice_start, slice_end) in slices {
84-
let (start_chunk, start_idx) = find_chunk_idx(slice_start, chunk_offsets);
84+
let (start_chunk, start_idx) = find_chunk_idx(slice_start, &chunk_offsets);
8585
// NOTE: we adjust slice end back by one, in case it ends on a chunk boundary, we do not
8686
// want to index into the unused chunk.
87-
let (end_chunk, end_idx) = find_chunk_idx(slice_end - 1, chunk_offsets);
87+
let (end_chunk, end_idx) = find_chunk_idx(slice_end - 1, &chunk_offsets);
8888
// Adjust back to an exclusive range
8989
let end_idx = end_idx + 1;
9090

@@ -143,7 +143,7 @@ fn filter_indices(
143143
let chunk_offsets = array.chunk_offsets();
144144

145145
for set_index in indices {
146-
let (chunk_id, index) = find_chunk_idx(set_index, chunk_offsets);
146+
let (chunk_id, index) = find_chunk_idx(set_index, &chunk_offsets);
147147
if chunk_id != current_chunk_id {
148148
// Push the chunk we've accumulated.
149149
if !chunk_indices.is_empty() {

vortex-array/src/arrays/chunked/compute/mask.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ fn mask_indices(
4747
let chunk_offsets = array.chunk_offsets();
4848

4949
for &set_index in indices {
50-
let (chunk_id, index) = find_chunk_idx(set_index, chunk_offsets);
50+
let (chunk_id, index) = find_chunk_idx(set_index, &chunk_offsets);
5151
if chunk_id != current_chunk_id {
5252
let chunk = array.chunk(current_chunk_id);
5353
let masked_chunk = mask(chunk, &Mask::from_indices(chunk.len(), chunk_indices))?;

vortex-array/src/arrays/chunked/vtable/array.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
2727
fn array_hash<H: std::hash::Hasher>(array: &ChunkedArray, state: &mut H, precision: Precision) {
2828
array.dtype.hash(state);
2929
array.len.hash(state);
30-
array.chunk_offsets.array_hash(state, precision);
30+
array.chunk_offsets.as_ref().array_hash(state, precision);
3131
for chunk in &array.chunks {
3232
chunk.array_hash(state, precision);
3333
}
@@ -38,7 +38,8 @@ impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
3838
&& array.len == other.len
3939
&& array
4040
.chunk_offsets
41-
.array_eq(&other.chunk_offsets, precision)
41+
.as_ref()
42+
.array_eq(other.chunk_offsets.as_ref(), precision)
4243
&& array.chunks.len() == other.chunks.len()
4344
&& array
4445
.chunks

vortex-array/src/arrays/chunked/vtable/mod.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use vortex_dtype::{DType, Nullability, PType};
77
use vortex_error::{VortexResult, vortex_bail, vortex_err};
88
use vortex_vector::{Vector, VectorMut, VectorMutOps};
99

10-
use crate::arrays::ChunkedArray;
10+
use crate::arrays::{ChunkedArray, PrimitiveArray};
1111
use crate::execution::ExecutionCtx;
1212
use crate::serde::ArrayChildren;
13+
use crate::validity::Validity;
1314
use crate::vtable::{NotSupported, VTable};
1415
use crate::{ArrayOperator, EmptyMetadata, EncodingId, EncodingRef, ToCanonical, vtable};
1516

@@ -71,18 +72,19 @@ impl VTable for ChunkedVTable {
7172
let nchunks = children.len() - 1;
7273

7374
// The first child contains the row offsets of the chunks
74-
let chunk_offsets = children
75+
let chunk_offsets_array = children
7576
.get(
7677
0,
7778
&DType::Primitive(PType::U64, Nullability::NonNullable),
7879
// 1 extra offset for the end of the last chunk
7980
nchunks + 1,
8081
)?
81-
.to_primitive()
82-
.buffer::<u64>();
82+
.to_primitive();
83+
84+
let chunk_offsets_buf = chunk_offsets_array.buffer::<u64>();
8385

8486
// The remaining children contain the actual data of the chunks
85-
let chunks = chunk_offsets
87+
let chunks = chunk_offsets_buf
8688
.iter()
8789
.tuple_windows()
8890
.enumerate()
@@ -93,9 +95,22 @@ impl VTable for ChunkedVTable {
9395
})
9496
.try_collect()?;
9597

96-
// SAFETY: All chunks are deserialized with the same dtype that was serialized.
97-
// Each chunk was validated during deserialization to match the expected dtype.
98-
unsafe { Ok(ChunkedArray::new_unchecked(chunks, dtype.clone())) }
98+
let chunk_offsets = PrimitiveArray::new(chunk_offsets_buf.clone(), Validity::NonNullable);
99+
100+
let total_len = chunk_offsets_buf
101+
.last()
102+
.ok_or_else(|| vortex_err!("chunk_offsets must not be empty"))?;
103+
let len = usize::try_from(*total_len)
104+
.map_err(|_| vortex_err!("total length {} exceeds usize range", total_len))?;
105+
106+
// Construct directly using the struct fields to avoid recomputing chunk_offsets
107+
Ok(ChunkedArray {
108+
dtype: dtype.clone(),
109+
len,
110+
chunk_offsets,
111+
chunks,
112+
stats_set: Default::default(),
113+
})
99114
}
100115

101116
fn execute(array: &Self::Array, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {

vortex-array/src/arrays/chunked/vtable/visitor.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use crate::arrays::{ChunkedArray, ChunkedVTable, PrimitiveArray};
5-
use crate::validity::Validity;
4+
use crate::arrays::{ChunkedArray, ChunkedVTable};
65
use crate::vtable::VisitorVTable;
76
use crate::{ArrayBufferVisitor, ArrayChildVisitor};
87

98
impl VisitorVTable<ChunkedVTable> for ChunkedVTable {
109
fn visit_buffers(_array: &ChunkedArray, _visitor: &mut dyn ArrayBufferVisitor) {}
1110

1211
fn visit_children(array: &ChunkedArray, visitor: &mut dyn ArrayChildVisitor) {
13-
let chunk_offsets =
14-
PrimitiveArray::new(array.chunk_offsets().clone(), Validity::NonNullable);
15-
visitor.visit_child("chunk_offsets", chunk_offsets.as_ref());
12+
visitor.visit_child("chunk_offsets", array.chunk_offsets.as_ref());
1613

1714
for (idx, chunk) in array.chunks().iter().enumerate() {
1815
visitor.visit_child(format!("chunks[{idx}]").as_str(), chunk);

0 commit comments

Comments
 (0)