Skip to content

Commit b736f08

Browse files
maxburkealamb
andauthored
[arrow] Minimize allocation in GenericViewArray::slice() (#9016)
Use the suggested Arc<[Buffer]> storage for ViewArray storage instead of an owned Vec<Buffer> so that the slice clone does not allocate. # Which issue does this PR close? - Closes #6408 . --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 86b6617 commit b736f08

File tree

2 files changed

+67
-35
lines changed

2 files changed

+67
-35
lines changed

arrow-array/src/array/byte_view_array.rs

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ use super::ByteArrayType;
165165
pub struct GenericByteViewArray<T: ByteViewType + ?Sized> {
166166
data_type: DataType,
167167
views: ScalarBuffer<u128>,
168-
buffers: Vec<Buffer>,
168+
buffers: Arc<[Buffer]>,
169169
phantom: PhantomData<T>,
170170
nulls: Option<NullBuffer>,
171171
}
@@ -188,7 +188,10 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
188188
/// # Panics
189189
///
190190
/// Panics if [`GenericByteViewArray::try_new`] returns an error
191-
pub fn new(views: ScalarBuffer<u128>, buffers: Vec<Buffer>, nulls: Option<NullBuffer>) -> Self {
191+
pub fn new<U>(views: ScalarBuffer<u128>, buffers: U, nulls: Option<NullBuffer>) -> Self
192+
where
193+
U: Into<Arc<[Buffer]>>,
194+
{
192195
Self::try_new(views, buffers, nulls).unwrap()
193196
}
194197

@@ -198,11 +201,16 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
198201
///
199202
/// * `views.len() != nulls.len()`
200203
/// * [ByteViewType::validate] fails
201-
pub fn try_new(
204+
pub fn try_new<U>(
202205
views: ScalarBuffer<u128>,
203-
buffers: Vec<Buffer>,
206+
buffers: U,
204207
nulls: Option<NullBuffer>,
205-
) -> Result<Self, ArrowError> {
208+
) -> Result<Self, ArrowError>
209+
where
210+
U: Into<Arc<[Buffer]>>,
211+
{
212+
let buffers: Arc<[Buffer]> = buffers.into();
213+
206214
T::validate(&views, &buffers)?;
207215

208216
if let Some(n) = nulls.as_ref() {
@@ -230,11 +238,14 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
230238
/// # Safety
231239
///
232240
/// Safe if [`Self::try_new`] would not error
233-
pub unsafe fn new_unchecked(
241+
pub unsafe fn new_unchecked<U>(
234242
views: ScalarBuffer<u128>,
235-
buffers: Vec<Buffer>,
243+
buffers: U,
236244
nulls: Option<NullBuffer>,
237-
) -> Self {
245+
) -> Self
246+
where
247+
U: Into<Arc<[Buffer]>>,
248+
{
238249
if cfg!(feature = "force_validate") {
239250
return Self::new(views, buffers, nulls);
240251
}
@@ -243,7 +254,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
243254
data_type: T::DATA_TYPE,
244255
phantom: Default::default(),
245256
views,
246-
buffers,
257+
buffers: buffers.into(),
247258
nulls,
248259
}
249260
}
@@ -253,7 +264,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
253264
Self {
254265
data_type: T::DATA_TYPE,
255266
views: vec![0; len].into(),
256-
buffers: vec![],
267+
buffers: vec![].into(),
257268
nulls: Some(NullBuffer::new_null(len)),
258269
phantom: Default::default(),
259270
}
@@ -279,7 +290,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
279290
}
280291

281292
/// Deconstruct this array into its constituent parts
282-
pub fn into_parts(self) -> (ScalarBuffer<u128>, Vec<Buffer>, Option<NullBuffer>) {
293+
pub fn into_parts(self) -> (ScalarBuffer<u128>, Arc<[Buffer]>, Option<NullBuffer>) {
283294
(self.views, self.buffers, self.nulls)
284295
}
285296

@@ -887,8 +898,21 @@ impl<T: ByteViewType + ?Sized> Array for GenericByteViewArray<T> {
887898

888899
fn shrink_to_fit(&mut self) {
889900
self.views.shrink_to_fit();
890-
self.buffers.iter_mut().for_each(|b| b.shrink_to_fit());
891-
self.buffers.shrink_to_fit();
901+
902+
// The goal of `shrink_to_fit` is to minimize the space used by any of
903+
// its allocations. The use of `Arc::get_mut` over `Arc::make_mut` is
904+
// because if the reference count is greater than 1, `Arc::make_mut`
905+
// will first clone its contents. So, any large allocations will first
906+
// be cloned before being shrunk, leaving the pre-cloned allocations
907+
// intact, before adding the extra (used) space of the new clones.
908+
if let Some(buffers) = Arc::get_mut(&mut self.buffers) {
909+
buffers.iter_mut().for_each(|b| b.shrink_to_fit());
910+
}
911+
912+
// With the assumption that this is a best-effort function, no attempt
913+
// is made to shrink `self.buffers`, which it can't because it's type
914+
// does not expose a `shrink_to_fit` method.
915+
892916
if let Some(nulls) = &mut self.nulls {
893917
nulls.shrink_to_fit();
894918
}
@@ -946,7 +970,7 @@ impl<T: ByteViewType + ?Sized> From<ArrayData> for GenericByteViewArray<T> {
946970
fn from(value: ArrayData) -> Self {
947971
let views = value.buffers()[0].clone();
948972
let views = ScalarBuffer::new(views, value.offset(), value.len());
949-
let buffers = value.buffers()[1..].to_vec();
973+
let buffers = value.buffers()[1..].to_vec().into();
950974
Self {
951975
data_type: T::DATA_TYPE,
952976
views,
@@ -1014,12 +1038,15 @@ where
10141038
}
10151039

10161040
impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
1017-
fn from(mut array: GenericByteViewArray<T>) -> Self {
1041+
fn from(array: GenericByteViewArray<T>) -> Self {
10181042
let len = array.len();
1019-
array.buffers.insert(0, array.views.into_inner());
1043+
1044+
let mut buffers = array.buffers.to_vec();
1045+
buffers.insert(0, array.views.into_inner());
1046+
10201047
let builder = ArrayDataBuilder::new(T::DATA_TYPE)
10211048
.len(len)
1022-
.buffers(array.buffers)
1049+
.buffers(buffers)
10231050
.nulls(array.nulls);
10241051

10251052
unsafe { builder.build_unchecked() }

arrow-select/src/zip.rs

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::fmt::{Debug, Formatter};
3535
use std::hash::Hash;
3636
use std::marker::PhantomData;
3737
use std::ops::Not;
38-
use std::sync::Arc;
38+
use std::sync::{Arc, OnceLock};
3939

4040
/// Zip two arrays by some boolean mask.
4141
///
@@ -667,12 +667,17 @@ fn maybe_prep_null_mask_filter(predicate: &BooleanArray) -> BooleanBuffer {
667667

668668
struct ByteViewScalarImpl<T: ByteViewType> {
669669
truthy_view: Option<u128>,
670-
truthy_buffers: Vec<Buffer>,
670+
truthy_buffers: Arc<[Buffer]>,
671671
falsy_view: Option<u128>,
672-
falsy_buffers: Vec<Buffer>,
672+
falsy_buffers: Arc<[Buffer]>,
673673
phantom: PhantomData<T>,
674674
}
675675

676+
static EMPTY_ARC: OnceLock<Arc<[Buffer]>> = OnceLock::new();
677+
fn empty_arc_buffers() -> Arc<[Buffer]> {
678+
Arc::clone(EMPTY_ARC.get_or_init(|| Arc::new([])))
679+
}
680+
676681
impl<T: ByteViewType> ByteViewScalarImpl<T> {
677682
fn new(truthy: &dyn Array, falsy: &dyn Array) -> Self {
678683
let (truthy_view, truthy_buffers) = Self::get_value_from_scalar(truthy);
@@ -686,9 +691,9 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
686691
}
687692
}
688693

689-
fn get_value_from_scalar(scalar: &dyn Array) -> (Option<u128>, Vec<Buffer>) {
694+
fn get_value_from_scalar(scalar: &dyn Array) -> (Option<u128>, Arc<[Buffer]>) {
690695
if scalar.is_null(0) {
691-
(None, vec![])
696+
(None, empty_arc_buffers())
692697
} else {
693698
let (views, buffers, _) = scalar.as_byte_view::<T>().clone().into_parts();
694699
(views.first().copied(), buffers)
@@ -698,8 +703,8 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
698703
fn get_views_for_single_non_nullable(
699704
predicate: BooleanBuffer,
700705
value: u128,
701-
buffers: Vec<Buffer>,
702-
) -> (ScalarBuffer<u128>, Vec<Buffer>, Option<NullBuffer>) {
706+
buffers: Arc<[Buffer]>,
707+
) -> (ScalarBuffer<u128>, Arc<[Buffer]>, Option<NullBuffer>) {
703708
let number_of_true = predicate.count_set_bits();
704709
let number_of_values = predicate.len();
705710

@@ -708,7 +713,7 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
708713
// All values are null
709714
return (
710715
vec![0; number_of_values].into(),
711-
vec![],
716+
empty_arc_buffers(),
712717
Some(NullBuffer::new_null(number_of_values)),
713718
);
714719
}
@@ -724,10 +729,10 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
724729
predicate: BooleanBuffer,
725730
result_len: usize,
726731
truthy_view: u128,
727-
truthy_buffers: Vec<Buffer>,
732+
truthy_buffers: Arc<[Buffer]>,
728733
falsy_view: u128,
729-
falsy_buffers: Vec<Buffer>,
730-
) -> (ScalarBuffer<u128>, Vec<Buffer>, Option<NullBuffer>) {
734+
falsy_buffers: Arc<[Buffer]>,
735+
) -> (ScalarBuffer<u128>, Arc<[Buffer]>, Option<NullBuffer>) {
731736
let true_count = predicate.count_set_bits();
732737
match true_count {
733738
0 => {
@@ -751,7 +756,7 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
751756
let byte_view_falsy = ByteView::from(falsy_view);
752757
let new_index_falsy_buffers =
753758
buffers.len() as u32 + byte_view_falsy.buffer_index;
754-
buffers.extend(falsy_buffers);
759+
buffers.extend(falsy_buffers.iter().cloned());
755760
let byte_view_falsy =
756761
byte_view_falsy.with_buffer_index(new_index_falsy_buffers);
757762
byte_view_falsy.as_u128()
@@ -778,7 +783,7 @@ impl<T: ByteViewType> ByteViewScalarImpl<T> {
778783
}
779784

780785
let bytes = Buffer::from(mutable);
781-
(bytes.into(), buffers, None)
786+
(bytes.into(), buffers.into(), None)
782787
}
783788
}
784789
}
@@ -804,28 +809,28 @@ impl<T: ByteViewType> ZipImpl for ByteViewScalarImpl<T> {
804809
predicate,
805810
result_len,
806811
truthy,
807-
self.truthy_buffers.clone(),
812+
Arc::clone(&self.truthy_buffers),
808813
falsy,
809-
self.falsy_buffers.clone(),
814+
Arc::clone(&self.falsy_buffers),
810815
),
811816
(Some(truthy), None) => Self::get_views_for_single_non_nullable(
812817
predicate,
813818
truthy,
814-
self.truthy_buffers.clone(),
819+
Arc::clone(&self.truthy_buffers),
815820
),
816821
(None, Some(falsy)) => {
817822
let predicate = predicate.not();
818823
Self::get_views_for_single_non_nullable(
819824
predicate,
820825
falsy,
821-
self.falsy_buffers.clone(),
826+
Arc::clone(&self.falsy_buffers),
822827
)
823828
}
824829
(None, None) => {
825830
// All values are null
826831
(
827832
vec![0; result_len].into(),
828-
vec![],
833+
empty_arc_buffers(),
829834
Some(NullBuffer::new_null(result_len)),
830835
)
831836
}

0 commit comments

Comments
 (0)