Skip to content

Commit 174e5b5

Browse files
authored
feat: add new uninit_range API for PrimitiveBuilder (#2443)
1 parent 93ba60a commit 174e5b5

File tree

2 files changed

+215
-102
lines changed

2 files changed

+215
-102
lines changed

encodings/fastlanes/src/bitpacking/compress.rs

Lines changed: 98 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
use std::ops::DerefMut as _;
1+
use std::mem::MaybeUninit;
22

33
use arrow_buffer::ArrowNativeType;
44
use fastlanes::BitPacking;
5+
use num_traits::AsPrimitive;
56
use vortex_array::arrays::PrimitiveArray;
6-
use vortex_array::builders::{ArrayBuilder as _, PrimitiveBuilder};
7+
use vortex_array::builders::{ArrayBuilder as _, PrimitiveBuilder, UninitRange};
78
use vortex_array::patches::Patches;
89
use vortex_array::validity::Validity;
910
use vortex_array::variants::PrimitiveArrayTrait;
10-
use vortex_array::IntoArray;
11+
use vortex_array::{IntoArray, IntoArrayVariant};
1112
use vortex_buffer::{Buffer, BufferMut, ByteBuffer};
1213
use vortex_dtype::{
1314
match_each_integer_ptype, match_each_integer_ptype_with_unsigned_type,
@@ -239,64 +240,116 @@ where
239240
F: Fn(&[UnsignedT]) -> &[T],
240241
G: Fn(&mut [T]) -> &mut [UnsignedT],
241242
{
242-
let my_offset_in_builder = builder.len();
243-
244-
builder
245-
.nulls
246-
.append_validity(array.validity(), array.len())?;
243+
// Append a dense null Mask.
244+
builder.append_mask(array.validity_mask()?);
247245

248246
let packed = array.packed_slice::<UnsignedT>();
249247
let bit_width = array.bit_width() as usize;
250248
let length = array.len();
251249
let offset = array.offset() as usize;
252250

251+
let mut uninit = builder.uninit_range(length);
252+
253253
unpack_values_into(
254254
packed,
255255
bit_width,
256-
length,
257256
offset,
258-
builder,
257+
// Ask the builder for the next `length` values in the buffer as a [MaybeUninit<T>]
258+
&mut uninit,
259259
transmute,
260260
transmute_mut,
261261
)?;
262262

263263
if let Some(patches) = array.patches() {
264-
builder.patch(patches, my_offset_in_builder)?;
264+
apply_patches(&mut uninit, patches)?;
265265
};
266266

267+
uninit.finish();
268+
269+
Ok(())
270+
}
271+
272+
fn apply_patches<T: NativePType>(dst: &mut UninitRange<T>, patches: Patches) -> VortexResult<()> {
273+
let (array_len, indices_offset, indices, values) = patches.into_parts();
274+
assert_eq!(array_len, dst.len());
275+
276+
let indices = indices.into_primitive()?;
277+
let values = values.into_primitive()?;
278+
let validity = values.validity();
279+
let values = values.as_slice::<T>();
280+
match_each_unsigned_integer_ptype!(indices.ptype(), |$P| {
281+
insert_values_and_validity_at_indices::<T, $P>(
282+
dst,
283+
indices,
284+
values,
285+
validity,
286+
indices_offset,
287+
)
288+
})
289+
}
290+
291+
fn insert_values_and_validity_at_indices<
292+
T: NativePType,
293+
IndexT: NativePType + AsPrimitive<usize>,
294+
>(
295+
dst: &mut UninitRange<T>,
296+
indices: PrimitiveArray,
297+
values: &[T],
298+
validity: Validity,
299+
indices_offset: usize,
300+
) -> VortexResult<()> {
301+
match validity {
302+
Validity::NonNullable => {
303+
for (compressed_index, decompressed_index) in
304+
indices.as_slice::<IndexT>().iter().enumerate()
305+
{
306+
dst[decompressed_index.as_() - indices_offset] =
307+
MaybeUninit::new(values[compressed_index]);
308+
}
309+
}
310+
_ => {
311+
let validity = validity.to_logical(indices.len())?;
312+
for (compressed_index, decompressed_index) in
313+
indices.as_slice::<IndexT>().iter().enumerate()
314+
{
315+
let out_index = decompressed_index.as_() - indices_offset;
316+
dst[decompressed_index.as_() - indices_offset] =
317+
MaybeUninit::new(values[compressed_index]);
318+
dst.set_bit(out_index, validity.value(out_index));
319+
}
320+
}
321+
}
322+
267323
Ok(())
268324
}
269325

270326
fn unpack_values_into<T: NativePType, UnsignedT: NativePType + BitPacking, F, G>(
271327
packed: &[UnsignedT],
272328
bit_width: usize,
273-
length: usize,
274329
offset: usize,
275330
// TODO(ngates): do we want to use fastlanes alignment for this buffer?
276-
builder: &mut PrimitiveBuilder<T>,
331+
uninit: &mut UninitRange<T>,
277332
transmute: F,
278333
transmute_mut: G,
279334
) -> VortexResult<()>
280335
where
281336
F: Fn(&[UnsignedT]) -> &[T],
282337
G: Fn(&mut [T]) -> &mut [UnsignedT],
283338
{
284-
let my_offset_in_builder = builder.len();
285-
let last_chunk_length = match (offset + length) % 1024 {
286-
0 => 1024,
287-
last_chunk_length => last_chunk_length,
339+
let last_chunk_length = if (offset + uninit.len()) % 1024 == 0 {
340+
1024
341+
} else {
342+
(offset + uninit.len()) % 1024
288343
};
289344

290345
if bit_width == 0 {
291-
builder.values.push_n(T::zero(), length);
346+
uninit.fill(MaybeUninit::new(T::zero()));
292347
return Ok(());
293348
}
294349

295-
builder.values.reserve(length);
296-
297350
// How many fastlanes vectors we will process.
298351
// Packed array might not start at 0 when the array is sliced. Offset is guaranteed to be < 1024.
299-
let num_chunks = (offset + length).div_ceil(1024);
352+
let num_chunks = (offset + uninit.len()).div_ceil(1024);
300353
let elems_per_chunk = 128 * bit_width / size_of::<T>();
301354
assert_eq!(
302355
packed.len(),
@@ -313,9 +366,12 @@ where
313366
// 1. chunk is elems_per_chunk.
314367
// 2. decoded is exactly 1024.
315368
unsafe { BitPacking::unchecked_unpack(bit_width, chunk, &mut decoded) };
316-
builder
317-
.values
318-
.extend_from_slice(transmute(&decoded[offset..][..length]));
369+
uninit.copy_from_init(
370+
0,
371+
uninit.len(),
372+
transmute(&decoded[offset..][..uninit.len()]),
373+
);
374+
319375
return Ok(());
320376
}
321377

@@ -324,46 +380,42 @@ where
324380
let full_chunks_range =
325381
(first_chunk_is_sliced as usize)..(num_chunks - last_chunk_is_sliced as usize);
326382

383+
// Index into the builder's uninit values buffer.
384+
const CHUNK_SIZE: usize = 1024;
385+
let mut out_idx = 0;
327386
if first_chunk_is_sliced {
328387
let chunk = &packed[..elems_per_chunk];
329-
let mut decoded = [UnsignedT::zero(); 1024];
388+
let mut decoded = [UnsignedT::zero(); CHUNK_SIZE];
330389
// SAFETY:
331390
// 1. chunk is elems_per_chunk.
332391
// 2. decoded is exactly 1024.
333392
unsafe { BitPacking::unchecked_unpack(bit_width, chunk, &mut decoded) };
334-
builder
335-
.values
336-
.extend_from_slice(transmute(&decoded[offset..]));
393+
uninit.copy_from_init(out_idx, CHUNK_SIZE - offset, transmute(&decoded[offset..]));
394+
out_idx += CHUNK_SIZE - offset;
337395
}
338396
for i in full_chunks_range {
339397
let chunk = &packed[i * elems_per_chunk..][..elems_per_chunk];
340398

341-
// SAFETY:
342-
//
343-
// 1. unchecked_unpack only writes into the output and when it is finished, all the outputs
344-
// have been written to.
345-
//
346-
// 2. The output buffer is exactly size 1024.
347399
unsafe {
348-
builder.values.set_len(builder.values.len() + 1024);
349-
BitPacking::unchecked_unpack(
350-
bit_width,
351-
chunk,
352-
&mut transmute_mut(builder.values.deref_mut())
353-
[my_offset_in_builder + i * 1024 - offset..][..1024],
354-
);
400+
// SAFETY: &[T] and &[MaybeUninit<T>] have the same layout
401+
let dst: &mut [T] = std::mem::transmute(&mut uninit[out_idx..][..1024]);
402+
let dst: &mut [UnsignedT] = transmute_mut(dst);
403+
BitPacking::unchecked_unpack(bit_width, chunk, dst);
355404
}
405+
out_idx += CHUNK_SIZE;
356406
}
357407
if last_chunk_is_sliced {
358408
let chunk = &packed[(num_chunks - 1) * elems_per_chunk..][..elems_per_chunk];
359-
let mut decoded = [UnsignedT::zero(); 1024];
409+
let mut decoded = [UnsignedT::zero(); CHUNK_SIZE];
360410
// SAFETY:
361411
// 1. chunk is elems_per_chunk.
362412
// 2. decoded is exactly 1024.
363413
unsafe { BitPacking::unchecked_unpack(bit_width, chunk, &mut decoded) };
364-
builder
365-
.values
366-
.extend_from_slice(transmute(&decoded[..last_chunk_length]));
414+
uninit.copy_from_init(
415+
out_idx,
416+
last_chunk_length,
417+
transmute(&decoded[..last_chunk_length]),
418+
);
367419
}
368420

369421
Ok(())
@@ -717,7 +769,7 @@ mod test {
717769
let mut rng = StdRng::seed_from_u64(0);
718770

719771
let chunks = (0..10)
720-
.map(|_| make_array(&mut rng, 100, 0.0, 0.0).unwrap())
772+
.map(|_| make_array(&mut rng, 100, 0.25, 0.25).unwrap())
721773
.collect::<Vec<_>>();
722774
let chunked = ChunkedArray::from_iter(chunks).into_array();
723775

0 commit comments

Comments
 (0)