Skip to content

Commit 49b3244

Browse files
committed
Pass down batch_size from ArrowReaderBuilder through ArrayReaderBuilder.Ensure internal buffers to be pre-allocated.
Api change - making batch size required for ArrayReader and buffers.
1 parent e2b2b8f commit 49b3244

File tree

16 files changed

+304
-125
lines changed

16 files changed

+304
-125
lines changed

parquet/benches/arrow_reader.rs

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use parquet::arrow::array_reader::{
2727
ListArrayReader, make_byte_array_reader, make_byte_view_array_reader,
2828
make_fixed_len_byte_array_reader,
2929
};
30+
use parquet::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
3031
use parquet::basic::Type;
3132
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
3233
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
@@ -605,15 +606,23 @@ fn create_primitive_array_reader(
605606
use parquet::arrow::array_reader::PrimitiveArrayReader;
606607
match column_desc.physical_type() {
607608
Type::INT32 => {
608-
let reader =
609-
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
610-
.unwrap();
609+
let reader = PrimitiveArrayReader::<Int32Type>::new(
610+
Box::new(page_iterator),
611+
column_desc,
612+
None,
613+
DEFAULT_BATCH_SIZE,
614+
)
615+
.unwrap();
611616
Box::new(reader)
612617
}
613618
Type::INT64 => {
614-
let reader =
615-
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
616-
.unwrap();
619+
let reader = PrimitiveArrayReader::<Int64Type>::new(
620+
Box::new(page_iterator),
621+
column_desc,
622+
None,
623+
DEFAULT_BATCH_SIZE,
624+
)
625+
.unwrap();
617626
Box::new(reader)
618627
}
619628
_ => unreachable!(),
@@ -626,9 +635,13 @@ fn create_f16_by_bytes_reader(
626635
) -> Box<dyn ArrayReader> {
627636
let physical_type = column_desc.physical_type();
628637
match physical_type {
629-
Type::FIXED_LEN_BYTE_ARRAY => {
630-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
631-
}
638+
Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
639+
Box::new(page_iterator),
640+
column_desc,
641+
None,
642+
DEFAULT_BATCH_SIZE,
643+
)
644+
.unwrap(),
632645
_ => unimplemented!(),
633646
}
634647
}
@@ -639,12 +652,20 @@ fn create_decimal_by_bytes_reader(
639652
) -> Box<dyn ArrayReader> {
640653
let physical_type = column_desc.physical_type();
641654
match physical_type {
642-
Type::BYTE_ARRAY => {
643-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
644-
}
645-
Type::FIXED_LEN_BYTE_ARRAY => {
646-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
647-
}
655+
Type::BYTE_ARRAY => make_byte_array_reader(
656+
Box::new(page_iterator),
657+
column_desc,
658+
None,
659+
DEFAULT_BATCH_SIZE,
660+
)
661+
.unwrap(),
662+
Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
663+
Box::new(page_iterator),
664+
column_desc,
665+
None,
666+
DEFAULT_BATCH_SIZE,
667+
)
668+
.unwrap(),
648669
_ => unimplemented!(),
649670
}
650671
}
@@ -653,28 +674,52 @@ fn create_fixed_len_byte_array_reader(
653674
page_iterator: impl PageIterator + 'static,
654675
column_desc: ColumnDescPtr,
655676
) -> Box<dyn ArrayReader> {
656-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
677+
make_fixed_len_byte_array_reader(
678+
Box::new(page_iterator),
679+
column_desc,
680+
None,
681+
DEFAULT_BATCH_SIZE,
682+
)
683+
.unwrap()
657684
}
658685

659686
fn create_byte_array_reader(
660687
page_iterator: impl PageIterator + 'static,
661688
column_desc: ColumnDescPtr,
662689
) -> Box<dyn ArrayReader> {
663-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
690+
make_byte_array_reader(
691+
Box::new(page_iterator),
692+
column_desc,
693+
None,
694+
DEFAULT_BATCH_SIZE,
695+
)
696+
.unwrap()
664697
}
665698

666699
fn create_byte_view_array_reader(
667700
page_iterator: impl PageIterator + 'static,
668701
column_desc: ColumnDescPtr,
669702
) -> Box<dyn ArrayReader> {
670-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
703+
make_byte_view_array_reader(
704+
Box::new(page_iterator),
705+
column_desc,
706+
None,
707+
DEFAULT_BATCH_SIZE,
708+
)
709+
.unwrap()
671710
}
672711

673712
fn create_string_view_byte_array_reader(
674713
page_iterator: impl PageIterator + 'static,
675714
column_desc: ColumnDescPtr,
676715
) -> Box<dyn ArrayReader> {
677-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
716+
make_byte_view_array_reader(
717+
Box::new(page_iterator),
718+
column_desc,
719+
None,
720+
DEFAULT_BATCH_SIZE,
721+
)
722+
.unwrap()
678723
}
679724

680725
fn create_string_byte_array_dictionary_reader(
@@ -684,8 +729,13 @@ fn create_string_byte_array_dictionary_reader(
684729
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
685730
let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
686731

687-
make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type))
688-
.unwrap()
732+
make_byte_array_dictionary_reader(
733+
Box::new(page_iterator),
734+
column_desc,
735+
Some(arrow_type),
736+
DEFAULT_BATCH_SIZE,
737+
)
738+
.unwrap()
689739
}
690740

691741
fn create_string_list_reader(

parquet/src/arrow/array_reader/builder.rs

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,26 @@ pub struct ArrayReaderBuilder<'a> {
9595
parquet_metadata: Option<&'a ParquetMetaData>,
9696
/// metrics
9797
metrics: &'a ArrowReaderMetrics,
98+
/// Batch size for pre-allocating internal buffers
99+
batch_size: usize,
98100
}
99101

100102
impl<'a> ArrayReaderBuilder<'a> {
101-
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
103+
/// Create a new `ArrayReaderBuilder`
104+
///
105+
/// `batch_size` is used to pre-allocate internal buffers with the expected capacity,
106+
/// avoiding reallocations when reading the first batch of data.
107+
pub fn new(
108+
row_groups: &'a dyn RowGroups,
109+
metrics: &'a ArrowReaderMetrics,
110+
batch_size: usize,
111+
) -> Self {
102112
Self {
103113
row_groups,
104114
cache_options: None,
105115
parquet_metadata: None,
106116
metrics,
117+
batch_size,
107118
}
108119
}
109120

@@ -373,55 +384,78 @@ impl<'a> ArrayReaderBuilder<'a> {
373384
page_iterator,
374385
column_desc,
375386
arrow_type,
387+
self.batch_size,
376388
)?) as _,
377389
PhysicalType::INT32 => {
378390
if let Some(DataType::Null) = arrow_type {
379391
Box::new(NullArrayReader::<Int32Type>::new(
380392
page_iterator,
381393
column_desc,
394+
self.batch_size,
382395
)?) as _
383396
} else {
384397
Box::new(PrimitiveArrayReader::<Int32Type>::new(
385398
page_iterator,
386399
column_desc,
387400
arrow_type,
401+
self.batch_size,
388402
)?) as _
389403
}
390404
}
391405
PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
392406
page_iterator,
393407
column_desc,
394408
arrow_type,
409+
self.batch_size,
395410
)?) as _,
396411
PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
397412
page_iterator,
398413
column_desc,
399414
arrow_type,
415+
self.batch_size,
400416
)?) as _,
401417
PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
402418
page_iterator,
403419
column_desc,
404420
arrow_type,
421+
self.batch_size,
405422
)?) as _,
406423
PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
407424
page_iterator,
408425
column_desc,
409426
arrow_type,
427+
self.batch_size,
410428
)?) as _,
411429
PhysicalType::BYTE_ARRAY => match arrow_type {
412-
Some(DataType::Dictionary(_, _)) => {
413-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
430+
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
431+
page_iterator,
432+
column_desc,
433+
arrow_type,
434+
self.batch_size,
435+
)?,
436+
Some(DataType::Utf8View | DataType::BinaryView) => make_byte_view_array_reader(
437+
page_iterator,
438+
column_desc,
439+
arrow_type,
440+
self.batch_size,
441+
)?,
442+
_ => {
443+
make_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
414444
}
415-
Some(DataType::Utf8View | DataType::BinaryView) => {
416-
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
417-
}
418-
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
419445
},
420446
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
421-
Some(DataType::Dictionary(_, _)) => {
422-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
423-
}
424-
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
447+
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
448+
page_iterator,
449+
column_desc,
450+
arrow_type,
451+
self.batch_size,
452+
)?,
453+
_ => make_fixed_len_byte_array_reader(
454+
page_iterator,
455+
column_desc,
456+
arrow_type,
457+
self.batch_size,
458+
)?,
425459
},
426460
};
427461
Ok(Some(reader))
@@ -491,7 +525,7 @@ mod tests {
491525
.unwrap();
492526

493527
let metrics = ArrowReaderMetrics::disabled();
494-
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
528+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
495529
.build_array_reader(fields.as_ref(), &mask)
496530
.unwrap();
497531

@@ -524,7 +558,7 @@ mod tests {
524558
.unwrap();
525559

526560
let metrics = ArrowReaderMetrics::disabled();
527-
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
561+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
528562
.with_parquet_metadata(file_reader.metadata())
529563
.build_array_reader(fields.as_ref(), &mask)
530564
.unwrap();

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,14 @@ use std::any::Any;
3838
use std::sync::Arc;
3939

4040
/// Returns an [`ArrayReader`] that decodes the provided byte array column
41+
///
42+
/// `batch_size` is used to pre-allocate internal buffers,
43+
/// avoiding reallocations when reading the first batch of data.
4144
pub fn make_byte_array_reader(
4245
pages: Box<dyn PageIterator>,
4346
column_desc: ColumnDescPtr,
4447
arrow_type: Option<ArrowType>,
48+
batch_size: usize,
4549
) -> Result<Box<dyn ArrayReader>> {
4650
// Check if Arrow type is specified, else create it from Parquet type
4751
let data_type = match arrow_type {
@@ -56,13 +60,13 @@ pub fn make_byte_array_reader(
5660
| ArrowType::Utf8
5761
| ArrowType::Decimal128(_, _)
5862
| ArrowType::Decimal256(_, _) => {
59-
let reader = GenericRecordReader::new(column_desc);
63+
let reader = GenericRecordReader::new(column_desc, batch_size);
6064
Ok(Box::new(ByteArrayReader::<i32>::new(
6165
pages, data_type, reader,
6266
)))
6367
}
6468
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
65-
let reader = GenericRecordReader::new(column_desc);
69+
let reader = GenericRecordReader::new(column_desc, batch_size);
6670
Ok(Box::new(ByteArrayReader::<i64>::new(
6771
pages, data_type, reader,
6872
)))
@@ -202,7 +206,7 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
202206
));
203207
}
204208

205-
let mut buffer = OffsetBuffer::default();
209+
let mut buffer = OffsetBuffer::with_capacity(0);
206210
let mut decoder = ByteArrayDecoderPlain::new(
207211
buf,
208212
num_values as usize,
@@ -620,7 +624,7 @@ mod tests {
620624
.unwrap();
621625

622626
for (encoding, page) in pages {
623-
let mut output = OffsetBuffer::<i32>::default();
627+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
624628
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
625629

626630
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -675,7 +679,7 @@ mod tests {
675679
.unwrap();
676680

677681
for (encoding, page) in pages {
678-
let mut output = OffsetBuffer::<i32>::default();
682+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
679683
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
680684

681685
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -719,7 +723,7 @@ mod tests {
719723

720724
// test nulls read
721725
for (encoding, page) in pages.clone() {
722-
let mut output = OffsetBuffer::<i32>::default();
726+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
723727
decoder.set_data(encoding, page, 4, None).unwrap();
724728
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
725729
}

0 commit comments

Comments
 (0)