Skip to content

Commit 3931309

Browse files
authored
exponential buffer growth for varbinview builder (#4701)
Signed-off-by: Onur Satici <[email protected]>
1 parent 943369a commit 3931309

File tree

2 files changed

+123
-14
lines changed

2 files changed

+123
-14
lines changed

vortex-array/src/arrays/varbinview/compute/zip.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,18 @@ mod tests {
4242
use crate::IntoArray;
4343
use crate::arrays::VarBinViewVTable;
4444
use crate::arrow::IntoArrowArray;
45-
use crate::builders::{ArrayBuilder as _, VarBinViewBuilder};
45+
use crate::builders::{ArrayBuilder as _, BufferGrowthStrategy, VarBinViewBuilder};
4646
use crate::compute::zip;
4747

4848
#[test]
4949
fn test_varbinview_zip() {
5050
let if_true = {
51-
let mut builder =
52-
VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
51+
let mut builder = VarBinViewBuilder::new(
52+
DType::Utf8(Nullability::NonNullable),
53+
10,
54+
Default::default(),
55+
BufferGrowthStrategy::fixed(64 * 1024),
56+
);
5357
for _ in 0..100 {
5458
builder.append_value("Hello");
5559
builder.append_value("Hello this is a long string that won't be inlined.");
@@ -58,8 +62,12 @@ mod tests {
5862
};
5963

6064
let if_false = {
61-
let mut builder =
62-
VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10);
65+
let mut builder = VarBinViewBuilder::new(
66+
DType::Utf8(Nullability::NonNullable),
67+
10,
68+
Default::default(),
69+
BufferGrowthStrategy::fixed(64 * 1024),
70+
);
6371
for _ in 0..100 {
6472
builder.append_value("Hello2");
6573
builder.append_value("Hello2 this is a long string that won't be inlined.");

vortex-array/src/builders/varbinview.rs

Lines changed: 110 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::any::Any;
5-
use std::cmp::max;
65
use std::sync::Arc;
76

87
use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
@@ -24,25 +23,29 @@ pub struct VarBinViewBuilder {
2423
nulls: LazyNullBufferBuilder,
2524
completed: CompletedBuffers,
2625
in_progress: ByteBufferMut,
26+
growth_strategy: BufferGrowthStrategy,
2727
}
2828

2929
impl VarBinViewBuilder {
30-
// TODO(joe): add a block growth strategy, from arrow
31-
const BLOCK_SIZE: u32 = 8 * 8 * 1024;
32-
3330
pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
34-
Self::new(dtype, capacity, Default::default())
31+
Self::new(dtype, capacity, Default::default(), Default::default())
3532
}
3633

3734
pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
3835
Self::new(
3936
dtype,
4037
capacity,
4138
CompletedBuffers::Deduplicated(Default::default()),
39+
Default::default(),
4240
)
4341
}
4442

45-
fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
43+
pub fn new(
44+
dtype: DType,
45+
capacity: usize,
46+
completed: CompletedBuffers,
47+
growth_strategy: BufferGrowthStrategy,
48+
) -> Self {
4649
assert!(
4750
matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
4851
"VarBinViewBuilder DType must be Utf8 or Binary."
@@ -53,6 +56,7 @@ impl VarBinViewBuilder {
5356
completed,
5457
in_progress: ByteBufferMut::empty(),
5558
dtype,
59+
growth_strategy,
5660
}
5761
}
5862

@@ -67,7 +71,8 @@ impl VarBinViewBuilder {
6771
let required_cap = self.in_progress.len() + value.len();
6872
if self.in_progress.capacity() < required_cap {
6973
self.flush_in_progress();
70-
let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
74+
let next_buffer_size = self.growth_strategy.next_size() as usize;
75+
let to_reserve = next_buffer_size.max(value.len());
7176
self.in_progress.reserve(to_reserve);
7277
};
7378

@@ -278,7 +283,7 @@ impl ArrayBuilder for VarBinViewBuilder {
278283
}
279284
}
280285

281-
enum CompletedBuffers {
286+
pub enum CompletedBuffers {
282287
Default(Vec<ByteBuffer>),
283288
Deduplicated(DeduplicatedBuffers),
284289
}
@@ -339,7 +344,7 @@ enum NewIndices {
339344
}
340345

341346
#[derive(Default)]
342-
struct DeduplicatedBuffers {
347+
pub struct DeduplicatedBuffers {
343348
buffers: Vec<ByteBuffer>,
344349
buffer_to_idx: HashMap<BufferId, u32>,
345350
}
@@ -397,6 +402,53 @@ impl BufferId {
397402
}
398403
}
399404

405+
#[derive(Debug, Clone)]
406+
pub enum BufferGrowthStrategy {
407+
/// Use a fixed buffer size for all allocations.
408+
Fixed { size: u32 },
409+
/// Use exponential growth starting from initial_size, doubling until max_size.
410+
Exponential { current_size: u32, max_size: u32 },
411+
}
412+
413+
impl Default for BufferGrowthStrategy {
414+
fn default() -> Self {
415+
Self::Exponential {
416+
current_size: 4 * 1024, // 4KB starting size
417+
max_size: 2 * 1024 * 1024, // 2MB max size
418+
}
419+
}
420+
}
421+
422+
impl BufferGrowthStrategy {
423+
pub fn fixed(size: u32) -> Self {
424+
Self::Fixed { size }
425+
}
426+
427+
pub fn exponential(initial_size: u32, max_size: u32) -> Self {
428+
Self::Exponential {
429+
current_size: initial_size,
430+
max_size,
431+
}
432+
}
433+
434+
/// Returns the next buffer size to allocate and updates internal state.
435+
pub fn next_size(&mut self) -> u32 {
436+
match self {
437+
Self::Fixed { size } => *size,
438+
Self::Exponential {
439+
current_size,
440+
max_size,
441+
} => {
442+
let result = *current_size;
443+
if *current_size < *max_size {
444+
*current_size = current_size.saturating_mul(2).min(*max_size);
445+
}
446+
result
447+
}
448+
}
449+
}
450+
}
451+
400452
#[cfg(test)]
401453
mod tests {
402454
use std::str::from_utf8;
@@ -583,4 +635,53 @@ mod tests {
583635
let wrong_scalar = Scalar::from(42i32);
584636
assert!(builder.append_scalar(&wrong_scalar).is_err());
585637
}
638+
639+
#[test]
640+
fn test_buffer_growth_strategies() {
641+
use super::BufferGrowthStrategy;
642+
643+
// Test Fixed strategy
644+
let mut strategy = BufferGrowthStrategy::fixed(1024);
645+
646+
// Should always return the fixed size
647+
assert_eq!(strategy.next_size(), 1024);
648+
assert_eq!(strategy.next_size(), 1024);
649+
assert_eq!(strategy.next_size(), 1024);
650+
651+
// Test Exponential strategy
652+
let mut strategy = BufferGrowthStrategy::exponential(1024, 8192);
653+
654+
// Should double each time until hitting max_size
655+
assert_eq!(strategy.next_size(), 1024); // First: 1024
656+
assert_eq!(strategy.next_size(), 2048); // Second: 2048
657+
assert_eq!(strategy.next_size(), 4096); // Third: 4096
658+
assert_eq!(strategy.next_size(), 8192); // Fourth: 8192 (max)
659+
assert_eq!(strategy.next_size(), 8192); // Fifth: 8192 (capped)
660+
}
661+
662+
#[test]
663+
fn test_large_value_allocation() {
664+
use super::{BufferGrowthStrategy, VarBinViewBuilder};
665+
666+
let mut builder = VarBinViewBuilder::new(
667+
DType::Binary(Nullability::Nullable),
668+
10,
669+
Default::default(),
670+
BufferGrowthStrategy::exponential(1024, 4096),
671+
);
672+
673+
// Create a value larger than max_size
674+
let large_value = vec![0u8; 8192];
675+
676+
// Should successfully append the large value
677+
builder.append_value(&large_value);
678+
679+
let array = builder.finish_into_varbinview();
680+
assert_eq!(array.len(), 1);
681+
682+
// Verify the value was stored correctly
683+
let retrieved = array.scalar_at(0).as_binary().value().unwrap();
684+
assert_eq!(retrieved.len(), 8192);
685+
assert_eq!(retrieved.as_slice(), &large_value);
686+
}
586687
}

0 commit comments

Comments
 (0)