Skip to content

Commit 6450acb

Browse files
authored
Efficient deserialization for Buffer<u8> (#103)
* Specializations for Buffer<u8> * Avoid copies in the the Buffer<u8> iterator * fmt * Proper usage of buffer.slice() * Add test for nullable code path * Add bench for Buffer<u8> optimization * fmt
1 parent b2fd8f8 commit 6450acb

File tree

4 files changed

+145
-14
lines changed

4 files changed

+145
-14
lines changed

arrow2_convert/benches/bench.rs

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,50 @@ use arrow2_convert::{
55
};
66
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
77

8+
// Arrow stores U8 arrays as `arrow2::array::BinaryArray`
89
#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
910
#[arrow_field(transparent)]
10-
pub struct BufStruct(Buffer<u16>);
11+
pub struct BufU8Struct(Buffer<u8>);
1112

13+
// Arrow stores other arrows as `arrow2::array::ListArray`
1214
#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
1315
#[arrow_field(transparent)]
14-
pub struct VecStruct(Vec<u16>);
16+
pub struct BufU32Struct(Buffer<u32>);
17+
18+
// Arrow stores U8 arrows as `arrow2::array::BinaryArray`
19+
#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
20+
#[arrow_field(transparent)]
21+
pub struct VecU8Struct(Vec<u8>);
22+
23+
// Arrow stores other arrows as `arrow2::array::ListArray`
24+
#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
25+
#[arrow_field(transparent)]
26+
pub struct VecU32Struct(Vec<u32>);
1527

1628
pub fn bench_buffer_serialize(c: &mut Criterion) {
1729
let mut group = c.benchmark_group("serialize");
1830
for size in [1, 10, 100, 1000, 10000].iter() {
1931
group.throughput(Throughput::Elements(*size as u64));
20-
group.bench_with_input(BenchmarkId::new("Buffer", size), size, |b, &size| {
21-
let data = [BufStruct((0..size as u16).into_iter().collect())];
32+
group.bench_with_input(BenchmarkId::new("BufferU8", size), size, |b, &size| {
33+
let data = [BufU8Struct((0..size as u8).into_iter().collect())];
2234
b.iter(|| {
2335
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
2436
});
2537
});
26-
group.bench_with_input(BenchmarkId::new("Vec", size), size, |b, &size| {
27-
let data = [VecStruct((0..size as u16).into_iter().collect())];
38+
group.bench_with_input(BenchmarkId::new("VecU8", size), size, |b, &size| {
39+
let data = [VecU8Struct((0..size as u8).into_iter().collect())];
40+
b.iter(|| {
41+
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
42+
});
43+
});
44+
group.bench_with_input(BenchmarkId::new("BufferU32", size), size, |b, &size| {
45+
let data = [BufU32Struct((0..size as u32).into_iter().collect())];
46+
b.iter(|| {
47+
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
48+
});
49+
});
50+
group.bench_with_input(BenchmarkId::new("VecU32", size), size, |b, &size| {
51+
let data = [VecU32Struct((0..size as u32).into_iter().collect())];
2852
b.iter(|| {
2953
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
3054
});
@@ -35,27 +59,53 @@ pub fn bench_buffer_deserialize(c: &mut Criterion) {
3559
let mut group = c.benchmark_group("deserialize");
3660
for size in [1, 10, 100, 1000, 10000].iter() {
3761
group.throughput(Throughput::Elements(*size as u64));
38-
group.bench_with_input(BenchmarkId::new("Buffer", size), size, |b, &size| {
39-
let data: Box<dyn Array> = [BufStruct((0..size as u16).into_iter().collect())]
62+
group.bench_with_input(BenchmarkId::new("BufferU8", size), size, |b, &size| {
63+
let data: Box<dyn Array> = [BufU8Struct((0..size as u8).into_iter().collect())]
64+
.try_into_arrow()
65+
.unwrap();
66+
b.iter_batched(
67+
|| data.clone(),
68+
|data| {
69+
let _: Vec<BufU8Struct> =
70+
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
71+
},
72+
criterion::BatchSize::SmallInput,
73+
)
74+
});
75+
group.bench_with_input(BenchmarkId::new("VecU8", size), size, |b, &size| {
76+
let data: Box<dyn Array> = [VecU8Struct((0..size as u8).into_iter().collect())]
77+
.try_into_arrow()
78+
.unwrap();
79+
b.iter_batched(
80+
|| data.clone(),
81+
|data| {
82+
let _: Vec<VecU8Struct> =
83+
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
84+
},
85+
criterion::BatchSize::SmallInput,
86+
);
87+
});
88+
group.bench_with_input(BenchmarkId::new("BufferU32", size), size, |b, &size| {
89+
let data: Box<dyn Array> = [BufU32Struct((0..size as u32).into_iter().collect())]
4090
.try_into_arrow()
4191
.unwrap();
4292
b.iter_batched(
4393
|| data.clone(),
4494
|data| {
45-
let _: Vec<BufStruct> =
95+
let _: Vec<BufU32Struct> =
4696
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
4797
},
4898
criterion::BatchSize::SmallInput,
4999
)
50100
});
51-
group.bench_with_input(BenchmarkId::new("Vec", size), size, |b, &size| {
52-
let data: Box<dyn Array> = [VecStruct((0..size as u16).into_iter().collect())]
101+
group.bench_with_input(BenchmarkId::new("VecU32", size), size, |b, &size| {
102+
let data: Box<dyn Array> = [VecU32Struct((0..size as u32).into_iter().collect())]
53103
.try_into_arrow()
54104
.unwrap();
55105
b.iter_batched(
56106
|| data.clone(),
57107
|data| {
58-
let _: Vec<VecStruct> =
108+
let _: Vec<VecU32Struct> =
59109
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
60110
},
61111
criterion::BatchSize::SmallInput,

arrow2_convert/src/deserialize.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,64 @@ impl ArrowDeserialize for NaiveDate {
173173
}
174174
}
175175

176+
/// Iterator for for [`BufferBinaryArray`]
177+
pub struct BufferBinaryArrayIter<'a> {
178+
index: usize,
179+
array: &'a BinaryArray<i32>,
180+
}
181+
182+
impl<'a> Iterator for BufferBinaryArrayIter<'a> {
183+
type Item = Option<Buffer<u8>>;
184+
185+
fn next(&mut self) -> Option<Self::Item> {
186+
if self.index >= self.array.len() {
187+
None
188+
} else {
189+
if let Some(validity) = self.array.validity() {
190+
if !validity.get_bit(self.index) {
191+
self.index += 1;
192+
return Some(None);
193+
}
194+
}
195+
let (start, end) = self.array.offsets().start_end(self.index);
196+
self.index += 1;
197+
Some(Some(self.array.values().clone().slice(start, end - start)))
198+
}
199+
}
200+
}
201+
202+
/// Internal `ArrowArray` helper to iterate over a `BinaryArray` while exposing Buffer slices
203+
pub struct BufferBinaryArray;
204+
205+
impl<'a> IntoIterator for &'a BufferBinaryArray {
206+
type Item = Option<Buffer<u8>>;
207+
208+
type IntoIter = BufferBinaryArrayIter<'a>;
209+
210+
fn into_iter(self) -> Self::IntoIter {
211+
unimplemented!("Use iter_from_array_ref");
212+
}
213+
}
214+
215+
impl ArrowArray for BufferBinaryArray {
216+
type BaseArrayType = BinaryArray<i32>;
217+
#[inline]
218+
fn iter_from_array_ref(a: &dyn Array) -> <&Self as IntoIterator>::IntoIter {
219+
let b = a.as_any().downcast_ref::<Self::BaseArrayType>().unwrap();
220+
221+
BufferBinaryArrayIter { index: 0, array: b }
222+
}
223+
}
224+
225+
impl ArrowDeserialize for Buffer<u8> {
226+
type ArrayType = BufferBinaryArray;
227+
228+
#[inline]
229+
fn arrow_deserialize(v: Option<Buffer<u8>>) -> Option<Self> {
230+
v
231+
}
232+
}
233+
176234
impl ArrowDeserialize for Vec<u8> {
177235
type ArrayType = BinaryArray<i32>;
178236

arrow2_convert/src/field.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,15 +281,17 @@ arrow_enable_vec_for_type!(bool);
281281
arrow_enable_vec_for_type!(NaiveDateTime);
282282
arrow_enable_vec_for_type!(NaiveDate);
283283
arrow_enable_vec_for_type!(Vec<u8>);
284+
arrow_enable_vec_for_type!(Buffer<u8>);
284285
arrow_enable_vec_for_type!(LargeBinary);
285286
impl<const SIZE: usize> ArrowEnableVecForType for FixedSizeBinary<SIZE> {}
286287
impl<const PRECISION: usize, const SCALE: usize> ArrowEnableVecForType for I128<PRECISION, SCALE> {}
287288

288289
// Blanket implementation for Vec<Option<T>> if vectors are enabled for T
289290
impl<T> ArrowEnableVecForType for Option<T> where T: ArrowField + ArrowEnableVecForType {}
290291

291-
// Blanket implementation for Vec<Vec<T>> if vectors are enabled for T
292+
// Blanket implementation for Vec<Vec<T>> and Vec<Buffer<T>> if vectors or buffers are enabled for T
292293
impl<T> ArrowEnableVecForType for Vec<T> where T: ArrowField + ArrowEnableVecForType {}
294+
impl<T> ArrowEnableVecForType for Buffer<T> where T: ArrowField + ArrowEnableVecForType {}
293295
impl<T> ArrowEnableVecForType for LargeVec<T> where T: ArrowField + ArrowEnableVecForType {}
294296
impl<T, const SIZE: usize> ArrowEnableVecForType for FixedSizeVec<T, SIZE> where
295297
T: ArrowField + ArrowEnableVecForType

arrow2_convert/tests/test_deserialize.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,32 @@ fn test_deserialize_large_types_schema_mismatch_error() {
7979
}
8080

8181
#[test]
82-
fn test_deserialize_buffer() {
82+
fn test_deserialize_buffer_u16() {
8383
let original_array = [Buffer::from_iter(0u16..5), Buffer::from_iter(7..9)];
8484
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
8585
let iter = arrow_array_deserialize_iterator::<Buffer<u16>>(b.as_ref()).unwrap();
8686
for (i, k) in iter.zip(original_array.iter()) {
8787
assert_eq!(&i, k);
8888
}
8989
}
90+
91+
#[test]
92+
fn test_deserialize_buffer_u8() {
93+
let original_array = [Buffer::from_iter(0u8..5), Buffer::from_iter(7..9)];
94+
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
95+
let iter = arrow_array_deserialize_iterator::<Buffer<u8>>(b.as_ref()).unwrap();
96+
for (i, k) in iter.zip(original_array.iter()) {
97+
assert_eq!(&i, k);
98+
}
99+
100+
let original_array = [
101+
Some(Buffer::from_iter(0u8..5)),
102+
None,
103+
Some(Buffer::from_iter(7..9)),
104+
];
105+
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
106+
let iter = arrow_array_deserialize_iterator::<Option<Buffer<u8>>>(b.as_ref()).unwrap();
107+
for (i, k) in iter.zip(original_array.iter()) {
108+
assert_eq!(&i, k);
109+
}
110+
}

0 commit comments

Comments
 (0)