Skip to content

Commit fc6deac

Browse files
authored
feat: Use buffers for opaque data in VarBin and VarBinView (#1935)
1 parent 1c79fe0 commit fc6deac

File tree

19 files changed

+123
-254
lines changed

19 files changed

+123
-254
lines changed

bench-vortex/benches/bytes_at.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use vortex::{Context, IntoArrayData, IntoArrayVariant};
1616
fn array_data_fixture() -> VarBinArray {
1717
VarBinArray::try_new(
1818
buffer![0i32, 5i32, 10i32, 15i32, 20i32].into_array(),
19-
ByteBuffer::copy_from(b"helloworldhelloworld".as_bytes()).into_array(),
19+
ByteBuffer::copy_from(b"helloworldhelloworld".as_bytes()),
2020
DType::Utf8(Nullability::NonNullable),
2121
Validity::NonNullable,
2222
)

docs/quickstart.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Vortex array:
3535
>>> parquet = pq.read_table("_static/example.parquet")
3636
>>> vtx = vortex.array(parquet)
3737
>>> vtx.nbytes
38-
141070
38+
141069
3939

4040
Compress
4141
^^^^^^^^
@@ -46,7 +46,7 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the
4646

4747
>>> cvtx = vortex.compress(vtx)
4848
>>> cvtx.nbytes
49-
16605
49+
16604
5050
>>> cvtx.nbytes / vtx.nbytes
5151
0.11...
5252

encodings/dict/src/compress.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,8 @@ fn dict_encode_varbin_bytes<'a, I: Iterator<Item = Option<&'a [u8]>>>(
153153
let values_validity = dict_values_validity(dtype.is_nullable(), offsets.len() - 1);
154154
(
155155
PrimitiveArray::new(codes, Validity::NonNullable),
156-
VarBinArray::try_new(
157-
offsets.into_array(),
158-
bytes.into_array(),
159-
dtype,
160-
values_validity,
161-
)
162-
.vortex_expect("Failed to create VarBinArray dictionary during encoding"),
156+
VarBinArray::try_new(offsets.into_array(), bytes.freeze(), dtype, values_validity)
157+
.vortex_expect("Failed to create VarBinArray dictionary during encoding"),
163158
)
164159
}
165160

encodings/fsst/src/canonical.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use arrow_array::builder::make_view;
22
use vortex_array::array::{BinaryView, VarBinArray, VarBinViewArray};
33
use vortex_array::variants::PrimitiveArrayTrait;
4-
use vortex_array::{
5-
ArrayDType, ArrayLen, Canonical, IntoArrayData, IntoArrayVariant, IntoCanonical,
6-
};
4+
use vortex_array::{ArrayDType, ArrayLen, Canonical, IntoCanonical};
75
use vortex_buffer::{BufferMut, ByteBuffer};
86
use vortex_dtype::match_each_integer_ptype;
97
use vortex_error::VortexResult;
@@ -23,12 +21,10 @@ impl IntoCanonical for FSSTArray {
2321
// call. We then turn our uncompressed_lengths into an offsets buffer
2422
// necessary for a VarBinViewArray and construct the canonical array.
2523

26-
let compressed_bytes = VarBinArray::try_from(self.codes())?
27-
.sliced_bytes()?
28-
.into_primitive()?;
24+
let bytes = VarBinArray::try_from(self.codes())?.sliced_bytes();
2925

3026
// Bulk-decompress the entire array.
31-
let uncompressed_bytes = decompressor.decompress(compressed_bytes.as_slice::<u8>());
27+
let uncompressed_bytes = decompressor.decompress(bytes.as_slice());
3228

3329
let uncompressed_lens_array = self
3430
.uncompressed_lengths()
@@ -54,7 +50,7 @@ impl IntoCanonical for FSSTArray {
5450
});
5551

5652
let views = views.freeze();
57-
let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes).into_array();
53+
let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes);
5854

5955
VarBinViewArray::try_new(
6056
views,

vortex-array/src/array/chunked/canonical.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -244,16 +244,12 @@ fn pack_views(
244244
// merged buffers list.
245245
let buffers_offset = u32::try_from(buffers.len())?;
246246
let canonical_chunk = chunk.clone().into_varbinview()?;
247+
buffers.extend(canonical_chunk.buffers());
247248

248-
for buffer in canonical_chunk.buffers() {
249-
let canonical_buffer = buffer.into_canonical()?.into_primitive()?.into_array();
250-
buffers.push(canonical_buffer);
251-
}
252-
253-
for view in canonical_chunk.binary_views()? {
249+
for view in canonical_chunk.views().iter() {
254250
if view.is_inlined() {
255251
// Inlined views can be copied directly into the output
256-
views.push(view);
252+
views.push(*view);
257253
} else {
258254
// Referencing views must have their buffer_index adjusted with new offsets
259255
let view_ref = view.as_view();

vortex-array/src/array/constant/canonical.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,7 @@ fn canonical_byte_view(
8181
let view = BinaryView::from(make_view(scalar_bytes, 0, 0));
8282
let mut buffers = Vec::new();
8383
if scalar_bytes.len() >= BinaryView::MAX_INLINED_SIZE {
84-
buffers.push(
85-
PrimitiveArray::new(Buffer::copy_from(scalar_bytes), Validity::NonNullable)
86-
.into_array(),
87-
);
84+
buffers.push(Buffer::copy_from(scalar_bytes));
8885
}
8986

9087
// Clone our constant view `len` times.

vortex-array/src/array/varbin/accessor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@ impl ArrayAccessor<[u8]> for VarBinArray {
1212
where
1313
F: for<'a> FnOnce(&mut (dyn Iterator<Item = Option<&'a [u8]>>)) -> R,
1414
{
15-
// TODO(ngates): what happens if bytes is much larger than sliced_bytes?
16-
let primitive = self.bytes().into_primitive()?;
1715
let offsets = self.offsets().into_primitive()?;
1816
let validity = self.logical_validity().to_null_buffer()?;
1917

18+
// TODO(ngates): what happens if bytes is much larger than sliced_bytes?
19+
let bytes = self.bytes();
20+
let bytes = bytes.as_slice();
21+
2022
match_each_integer_ptype!(offsets.ptype(), |$T| {
2123
let offsets = offsets.as_slice::<$T>();
22-
let bytes = primitive.as_slice::<u8>();
2324

2425
match validity {
2526
None => {

vortex-array/src/array/varbin/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ impl ValidityVTable<VarBinArray> for VarBinEncoding {
1919
impl VisitorVTable<VarBinArray> for VarBinEncoding {
2020
fn accept(&self, array: &VarBinArray, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
2121
visitor.visit_child("offsets", &array.offsets())?;
22-
visitor.visit_child("bytes", &array.bytes())?;
22+
visitor.visit_buffer(&array.bytes())?;
2323
visitor.visit_validity(&array.validity())
2424
}
2525
}

vortex-array/src/array/varbin/arrow.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,29 +31,22 @@ pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult<ArrayR
3131
.to_null_buffer()
3232
.map_err(|err| err.with_context("Failed to get null buffer from logical validity"))?;
3333

34-
let data = varbin_array
35-
.bytes()
36-
.into_primitive()
37-
.map_err(|err| err.with_context("Failed to canonicalize bytes"))?;
38-
if data.dtype() != &DType::BYTES {
39-
vortex_bail!("Expected bytes to be of type U8, got {}", data.ptype());
40-
}
41-
let data = data.byte_buffer();
34+
let data = varbin_array.bytes();
4235

4336
// Switch on Arrow DType.
4437
Ok(match varbin_array.dtype() {
4538
DType::Binary(_) => match offsets.ptype() {
4639
PType::I32 => Arc::new(unsafe {
4740
BinaryArray::new_unchecked(
4841
offsets.buffer::<i32>().into_arrow_offset_buffer(),
49-
data.clone().into_arrow_buffer(),
42+
data.into_arrow_buffer(),
5043
nulls,
5144
)
5245
}),
5346
PType::I64 => Arc::new(unsafe {
5447
LargeBinaryArray::new_unchecked(
5548
offsets.buffer::<i64>().into_arrow_offset_buffer(),
56-
data.clone().into_arrow_buffer(),
49+
data.into_arrow_buffer(),
5750
nulls,
5851
)
5952
}),
@@ -63,14 +56,14 @@ pub(crate) fn varbin_to_arrow(varbin_array: &VarBinArray) -> VortexResult<ArrayR
6356
PType::I32 => Arc::new(unsafe {
6457
StringArray::new_unchecked(
6558
offsets.buffer::<i32>().into_arrow_offset_buffer(),
66-
data.clone().into_arrow_buffer(),
59+
data.into_arrow_buffer(),
6760
nulls,
6861
)
6962
}),
7063
PType::I64 => Arc::new(unsafe {
7164
LargeStringArray::new_unchecked(
7265
offsets.buffer::<i64>().into_arrow_offset_buffer(),
73-
data.clone().into_arrow_buffer(),
66+
data.into_arrow_buffer(),
7467
nulls,
7568
)
7669
}),

vortex-array/src/array/varbin/builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ impl<O: NativePType + PrimInt> VarBinBuilder<O> {
8080

8181
pub fn finish(mut self, dtype: DType) -> VarBinArray {
8282
let offsets = PrimitiveArray::new(self.offsets.freeze(), Validity::NonNullable);
83-
let data = PrimitiveArray::new(self.data.freeze(), Validity::NonNullable);
8483
let nulls = self.validity.finish();
8584

8685
let validity = if dtype.is_nullable() {
@@ -90,7 +89,7 @@ impl<O: NativePType + PrimInt> VarBinBuilder<O> {
9089
Validity::NonNullable
9190
};
9291

93-
VarBinArray::try_new(offsets.into_array(), data.into_array(), dtype, validity)
92+
VarBinArray::try_new(offsets.into_array(), self.data.freeze(), dtype, validity)
9493
.vortex_expect("Unexpected error while building VarBinArray")
9594
}
9695
}

0 commit comments

Comments
 (0)