Skip to content

Commit 9cffc3c

Browse files
committed
Pass down batch_size from ArrowReaderBuilder through ArrayReaderBuilder.Ensure internal buffers to be pre-allocated.
Api change - making batch size required for ArrayReader and buffers.
1 parent e2b2b8f commit 9cffc3c

File tree

16 files changed

+189
-97
lines changed

16 files changed

+189
-97
lines changed

parquet/benches/arrow_reader.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use criterion::{BenchmarkGroup, Criterion, criterion_group, criterion_main};
2323
use half::f16;
2424
use num_bigint::BigInt;
2525
use num_traits::FromPrimitive;
26+
use parquet::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
2627
use parquet::arrow::array_reader::{
2728
ListArrayReader, make_byte_array_reader, make_byte_view_array_reader,
2829
make_fixed_len_byte_array_reader,
@@ -627,7 +628,8 @@ fn create_f16_by_bytes_reader(
627628
let physical_type = column_desc.physical_type();
628629
match physical_type {
629630
Type::FIXED_LEN_BYTE_ARRAY => {
630-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
631+
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None, DEFAULT_BATCH_SIZE)
632+
.unwrap()
631633
}
632634
_ => unimplemented!(),
633635
}
@@ -640,10 +642,11 @@ fn create_decimal_by_bytes_reader(
640642
let physical_type = column_desc.physical_type();
641643
match physical_type {
642644
Type::BYTE_ARRAY => {
643-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
645+
make_byte_array_reader(Box::new(page_iterator), column_desc, None, DEFAULT_BATCH_SIZE).unwrap()
644646
}
645647
Type::FIXED_LEN_BYTE_ARRAY => {
646-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
648+
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None, DEFAULT_BATCH_SIZE)
649+
.unwrap()
647650
}
648651
_ => unimplemented!(),
649652
}
@@ -653,28 +656,28 @@ fn create_fixed_len_byte_array_reader(
653656
page_iterator: impl PageIterator + 'static,
654657
column_desc: ColumnDescPtr,
655658
) -> Box<dyn ArrayReader> {
656-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
659+
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None, DEFAULT_BATCH_SIZE).unwrap()
657660
}
658661

659662
fn create_byte_array_reader(
660663
page_iterator: impl PageIterator + 'static,
661664
column_desc: ColumnDescPtr,
662665
) -> Box<dyn ArrayReader> {
663-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
666+
make_byte_array_reader(Box::new(page_iterator), column_desc, None, DEFAULT_BATCH_SIZE).unwrap()
664667
}
665668

666669
fn create_byte_view_array_reader(
667670
page_iterator: impl PageIterator + 'static,
668671
column_desc: ColumnDescPtr,
669672
) -> Box<dyn ArrayReader> {
670-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
673+
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None, DEFAULT_BATCH_SIZE).unwrap()
671674
}
672675

673676
fn create_string_view_byte_array_reader(
674677
page_iterator: impl PageIterator + 'static,
675678
column_desc: ColumnDescPtr,
676679
) -> Box<dyn ArrayReader> {
677-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
680+
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None, DEFAULT_BATCH_SIZE).unwrap()
678681
}
679682

680683
fn create_string_byte_array_dictionary_reader(
@@ -684,7 +687,7 @@ fn create_string_byte_array_dictionary_reader(
684687
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
685688
let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
686689

687-
make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type))
690+
make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type), None)
688691
.unwrap()
689692
}
690693

parquet/src/arrow/array_reader/builder.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,26 @@ pub struct ArrayReaderBuilder<'a> {
9595
parquet_metadata: Option<&'a ParquetMetaData>,
9696
/// metrics
9797
metrics: &'a ArrowReaderMetrics,
98+
/// Batch size for pre-allocating internal buffers
99+
batch_size: usize,
98100
}
99101

100102
impl<'a> ArrayReaderBuilder<'a> {
101-
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
103+
/// Create a new `ArrayReaderBuilder`
104+
///
105+
/// `batch_size` is used to pre-allocate internal buffers with the expected capacity,
106+
/// avoiding reallocations when reading the first batch of data.
107+
pub fn new(
108+
row_groups: &'a dyn RowGroups,
109+
metrics: &'a ArrowReaderMetrics,
110+
batch_size: usize,
111+
) -> Self {
102112
Self {
103113
row_groups,
104114
cache_options: None,
105115
parquet_metadata: None,
106116
metrics,
117+
batch_size,
107118
}
108119
}
109120

@@ -373,55 +384,62 @@ impl<'a> ArrayReaderBuilder<'a> {
373384
page_iterator,
374385
column_desc,
375386
arrow_type,
387+
self.batch_size,
376388
)?) as _,
377389
PhysicalType::INT32 => {
378390
if let Some(DataType::Null) = arrow_type {
379391
Box::new(NullArrayReader::<Int32Type>::new(
380392
page_iterator,
381393
column_desc,
394+
self.batch_size,
382395
)?) as _
383396
} else {
384397
Box::new(PrimitiveArrayReader::<Int32Type>::new(
385398
page_iterator,
386399
column_desc,
387400
arrow_type,
401+
self.batch_size,
388402
)?) as _
389403
}
390404
}
391405
PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
392406
page_iterator,
393407
column_desc,
394408
arrow_type,
409+
self.batch_size,
395410
)?) as _,
396411
PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
397412
page_iterator,
398413
column_desc,
399414
arrow_type,
415+
self.batch_size,
400416
)?) as _,
401417
PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
402418
page_iterator,
403419
column_desc,
404420
arrow_type,
421+
self.batch_size,
405422
)?) as _,
406423
PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
407424
page_iterator,
408425
column_desc,
409426
arrow_type,
427+
self.batch_size,
410428
)?) as _,
411429
PhysicalType::BYTE_ARRAY => match arrow_type {
412430
Some(DataType::Dictionary(_, _)) => {
413-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
431+
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
414432
}
415433
Some(DataType::Utf8View | DataType::BinaryView) => {
416-
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
434+
make_byte_view_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
417435
}
418-
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
436+
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?,
419437
},
420438
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
421439
Some(DataType::Dictionary(_, _)) => {
422-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
440+
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
423441
}
424-
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
442+
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?,
425443
},
426444
};
427445
Ok(Some(reader))
@@ -491,7 +509,7 @@ mod tests {
491509
.unwrap();
492510

493511
let metrics = ArrowReaderMetrics::disabled();
494-
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
512+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
495513
.build_array_reader(fields.as_ref(), &mask)
496514
.unwrap();
497515

@@ -524,7 +542,7 @@ mod tests {
524542
.unwrap();
525543

526544
let metrics = ArrowReaderMetrics::disabled();
527-
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
545+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
528546
.with_parquet_metadata(file_reader.metadata())
529547
.build_array_reader(fields.as_ref(), &mask)
530548
.unwrap();

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,14 @@ use std::any::Any;
3838
use std::sync::Arc;
3939

4040
/// Returns an [`ArrayReader`] that decodes the provided byte array column
41+
///
42+
/// `batch_size` is used to pre-allocate internal buffers,
43+
/// avoiding reallocations when reading the first batch of data.
4144
pub fn make_byte_array_reader(
4245
pages: Box<dyn PageIterator>,
4346
column_desc: ColumnDescPtr,
4447
arrow_type: Option<ArrowType>,
48+
batch_size: usize,
4549
) -> Result<Box<dyn ArrayReader>> {
4650
// Check if Arrow type is specified, else create it from Parquet type
4751
let data_type = match arrow_type {
@@ -56,13 +60,13 @@ pub fn make_byte_array_reader(
5660
| ArrowType::Utf8
5761
| ArrowType::Decimal128(_, _)
5862
| ArrowType::Decimal256(_, _) => {
59-
let reader = GenericRecordReader::new(column_desc);
63+
let reader = GenericRecordReader::new(column_desc, batch_size);
6064
Ok(Box::new(ByteArrayReader::<i32>::new(
6165
pages, data_type, reader,
6266
)))
6367
}
6468
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
65-
let reader = GenericRecordReader::new(column_desc);
69+
let reader = GenericRecordReader::new(column_desc, batch_size);
6670
Ok(Box::new(ByteArrayReader::<i64>::new(
6771
pages, data_type, reader,
6872
)))
@@ -202,7 +206,7 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
202206
));
203207
}
204208

205-
let mut buffer = OffsetBuffer::default();
209+
let mut buffer = OffsetBuffer::with_capacity(0);
206210
let mut decoder = ByteArrayDecoderPlain::new(
207211
buf,
208212
num_values as usize,
@@ -620,7 +624,7 @@ mod tests {
620624
.unwrap();
621625

622626
for (encoding, page) in pages {
623-
let mut output = OffsetBuffer::<i32>::default();
627+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
624628
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
625629

626630
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -675,7 +679,7 @@ mod tests {
675679
.unwrap();
676680

677681
for (encoding, page) in pages {
678-
let mut output = OffsetBuffer::<i32>::default();
682+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
679683
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
680684

681685
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -719,7 +723,7 @@ mod tests {
719723

720724
// test nulls read
721725
for (encoding, page) in pages.clone() {
722-
let mut output = OffsetBuffer::<i32>::default();
726+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
723727
decoder.set_data(encoding, page, 4, None).unwrap();
724728
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
725729
}

parquet/src/arrow/array_reader/byte_array_dictionary.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ use crate::util::bit_util::FromBytes;
4040
/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
4141
macro_rules! make_reader {
4242
(
43-
($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, $v:expr) {
43+
($pages:expr, $column_desc:expr, $data_type:expr, $batch_size:expr) => match ($k:expr, $v:expr) {
4444
$(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+
4545
}
4646
) => {
4747
match (($k, $v)) {
4848
$(
4949
($key_arrow, $value_arrow) => {
50-
let reader = GenericRecordReader::new($column_desc);
50+
let reader = GenericRecordReader::new($column_desc, $batch_size);
5151
Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new(
5252
$pages, $data_type, reader,
5353
)))
@@ -73,10 +73,13 @@ macro_rules! make_reader {
7373
/// It is therefore recommended that if `pages` contains data from multiple column chunks,
7474
/// that the read batch size used is a divisor of the row group size
7575
///
76+
/// `batch_size` is used to pre-allocate internal buffers,
77+
/// avoiding reallocations when reading the first batch of data.
7678
pub fn make_byte_array_dictionary_reader(
7779
pages: Box<dyn PageIterator>,
7880
column_desc: ColumnDescPtr,
7981
arrow_type: Option<ArrowType>,
82+
batch_size: usize,
8083
) -> Result<Box<dyn ArrayReader>> {
8184
// Check if Arrow type is specified, else create it from Parquet type
8285
let data_type = match arrow_type {
@@ -89,7 +92,7 @@ pub fn make_byte_array_dictionary_reader(
8992
match &data_type {
9093
ArrowType::Dictionary(key_type, value_type) => {
9194
make_reader! {
92-
(pages, column_desc, data_type) => match (key_type.as_ref(), value_type.as_ref()) {
95+
(pages, column_desc, data_type, batch_size) => match (key_type.as_ref(), value_type.as_ref()) {
9396
(ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u8, i32),
9497
(ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64),
9598
(ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i8, i32),
@@ -273,7 +276,7 @@ where
273276
}
274277

275278
let len = num_values as usize;
276-
let mut buffer = OffsetBuffer::<V>::default();
279+
let mut buffer = OffsetBuffer::<V>::with_capacity(0);
277280
let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8);
278281
decoder.read(&mut buffer, usize::MAX)?;
279282

@@ -426,7 +429,7 @@ mod tests {
426429
.set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len()))
427430
.unwrap();
428431

429-
let mut output = DictionaryBuffer::<i32, i32>::default();
432+
let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
430433
assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);
431434

432435
let mut valid = vec![false, false, true, true, false, true];
@@ -492,7 +495,7 @@ mod tests {
492495
.set_data(Encoding::RLE_DICTIONARY, encoded, 7, Some(data.len()))
493496
.unwrap();
494497

495-
let mut output = DictionaryBuffer::<i32, i32>::default();
498+
let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
496499

497500
// read two skip one
498501
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
@@ -543,7 +546,7 @@ mod tests {
543546
.unwrap();
544547

545548
// Read all pages into single buffer
546-
let mut output = DictionaryBuffer::<i32, i32>::default();
549+
let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
547550

548551
for (encoding, page) in pages {
549552
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
@@ -586,7 +589,7 @@ mod tests {
586589
.unwrap();
587590

588591
// Read all pages into single buffer
589-
let mut output = DictionaryBuffer::<i32, i32>::default();
592+
let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
590593

591594
for (encoding, page) in pages {
592595
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
@@ -650,7 +653,7 @@ mod tests {
650653
.unwrap();
651654

652655
for (encoding, page) in pages.clone() {
653-
let mut output = DictionaryBuffer::<i32, i32>::default();
656+
let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
654657
decoder.set_data(encoding, page, 8, None).unwrap();
655658
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
656659

@@ -665,7 +668,7 @@ mod tests {
665668
}
666669

667670
for (encoding, page) in pages {
668-
let mut output = DictionaryBuffer::<i32, i32>::default();
671+
let mut output = DictionaryBuffer::<i32, i32>::with_capacity(0);
669672
decoder.set_data(encoding, page, 8, None).unwrap();
670673
assert_eq!(decoder.skip_values(1024).unwrap(), 0);
671674

parquet/src/arrow/array_reader/byte_view_array.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@ use bytes::Bytes;
3636
use std::any::Any;
3737

3838
/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
39+
///
40+
/// `batch_size` is used to pre-allocate internal buffers,
41+
/// avoiding reallocations when reading the first batch of data.
3942
pub fn make_byte_view_array_reader(
4043
pages: Box<dyn PageIterator>,
4144
column_desc: ColumnDescPtr,
4245
arrow_type: Option<ArrowType>,
46+
batch_size: usize,
4347
) -> Result<Box<dyn ArrayReader>> {
4448
// Check if Arrow type is specified, else create it from Parquet type
4549
let data_type = match arrow_type {
@@ -52,7 +56,7 @@ pub fn make_byte_view_array_reader(
5256

5357
match data_type {
5458
ArrowType::BinaryView | ArrowType::Utf8View => {
55-
let reader = GenericRecordReader::new(column_desc);
59+
let reader = GenericRecordReader::new(column_desc, batch_size);
5660
Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
5761
}
5862

@@ -710,7 +714,7 @@ mod tests {
710714
.unwrap();
711715

712716
for (encoding, page) in pages {
713-
let mut output = ViewBuffer::default();
717+
let mut output = ViewBuffer::with_capacity(0);
714718
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
715719

716720
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -753,7 +757,7 @@ mod tests {
753757
let column_desc = utf8_column();
754758
let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
755759

756-
let mut view_buffer = ViewBuffer::default();
760+
let mut view_buffer = ViewBuffer::with_capacity(0);
757761
decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
758762
decoder.read(&mut view_buffer, 1).unwrap();
759763
decoder.read(&mut view_buffer, 1).unwrap();

0 commit comments

Comments
 (0)