Skip to content

Commit c290f54

Browse files
committed
Pass down batch_size from ArrowReaderBuilder through ArrayReaderBuilder.Ensure internal buffers to be pre-allocated.
Api change - making batch size required for ArrayReader and buffers.
1 parent fcfa5d4 commit c290f54

File tree

16 files changed

+304
-125
lines changed

16 files changed

+304
-125
lines changed

parquet/benches/arrow_reader.rs

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use parquet::arrow::array_reader::{
2727
ListArrayReader, make_byte_array_reader, make_byte_view_array_reader,
2828
make_fixed_len_byte_array_reader,
2929
};
30+
use parquet::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
3031
use parquet::basic::Type;
3132
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
3233
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
@@ -611,15 +612,23 @@ fn create_primitive_array_reader(
611612
use parquet::arrow::array_reader::PrimitiveArrayReader;
612613
match column_desc.physical_type() {
613614
Type::INT32 => {
614-
let reader =
615-
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
616-
.unwrap();
615+
let reader = PrimitiveArrayReader::<Int32Type>::new(
616+
Box::new(page_iterator),
617+
column_desc,
618+
None,
619+
DEFAULT_BATCH_SIZE,
620+
)
621+
.unwrap();
617622
Box::new(reader)
618623
}
619624
Type::INT64 => {
620-
let reader =
621-
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
622-
.unwrap();
625+
let reader = PrimitiveArrayReader::<Int64Type>::new(
626+
Box::new(page_iterator),
627+
column_desc,
628+
None,
629+
DEFAULT_BATCH_SIZE,
630+
)
631+
.unwrap();
623632
Box::new(reader)
624633
}
625634
_ => unreachable!(),
@@ -632,9 +641,13 @@ fn create_f16_by_bytes_reader(
632641
) -> Box<dyn ArrayReader> {
633642
let physical_type = column_desc.physical_type();
634643
match physical_type {
635-
Type::FIXED_LEN_BYTE_ARRAY => {
636-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
637-
}
644+
Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
645+
Box::new(page_iterator),
646+
column_desc,
647+
None,
648+
DEFAULT_BATCH_SIZE,
649+
)
650+
.unwrap(),
638651
_ => unimplemented!(),
639652
}
640653
}
@@ -645,12 +658,20 @@ fn create_decimal_by_bytes_reader(
645658
) -> Box<dyn ArrayReader> {
646659
let physical_type = column_desc.physical_type();
647660
match physical_type {
648-
Type::BYTE_ARRAY => {
649-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
650-
}
651-
Type::FIXED_LEN_BYTE_ARRAY => {
652-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
653-
}
661+
Type::BYTE_ARRAY => make_byte_array_reader(
662+
Box::new(page_iterator),
663+
column_desc,
664+
None,
665+
DEFAULT_BATCH_SIZE,
666+
)
667+
.unwrap(),
668+
Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
669+
Box::new(page_iterator),
670+
column_desc,
671+
None,
672+
DEFAULT_BATCH_SIZE,
673+
)
674+
.unwrap(),
654675
_ => unimplemented!(),
655676
}
656677
}
@@ -659,28 +680,52 @@ fn create_fixed_len_byte_array_reader(
659680
page_iterator: impl PageIterator + 'static,
660681
column_desc: ColumnDescPtr,
661682
) -> Box<dyn ArrayReader> {
662-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
683+
make_fixed_len_byte_array_reader(
684+
Box::new(page_iterator),
685+
column_desc,
686+
None,
687+
DEFAULT_BATCH_SIZE,
688+
)
689+
.unwrap()
663690
}
664691

665692
fn create_byte_array_reader(
666693
page_iterator: impl PageIterator + 'static,
667694
column_desc: ColumnDescPtr,
668695
) -> Box<dyn ArrayReader> {
669-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
696+
make_byte_array_reader(
697+
Box::new(page_iterator),
698+
column_desc,
699+
None,
700+
DEFAULT_BATCH_SIZE,
701+
)
702+
.unwrap()
670703
}
671704

672705
fn create_byte_view_array_reader(
673706
page_iterator: impl PageIterator + 'static,
674707
column_desc: ColumnDescPtr,
675708
) -> Box<dyn ArrayReader> {
676-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
709+
make_byte_view_array_reader(
710+
Box::new(page_iterator),
711+
column_desc,
712+
None,
713+
DEFAULT_BATCH_SIZE,
714+
)
715+
.unwrap()
677716
}
678717

679718
fn create_string_view_byte_array_reader(
680719
page_iterator: impl PageIterator + 'static,
681720
column_desc: ColumnDescPtr,
682721
) -> Box<dyn ArrayReader> {
683-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
722+
make_byte_view_array_reader(
723+
Box::new(page_iterator),
724+
column_desc,
725+
None,
726+
DEFAULT_BATCH_SIZE,
727+
)
728+
.unwrap()
684729
}
685730

686731
fn create_string_byte_array_dictionary_reader(
@@ -690,8 +735,13 @@ fn create_string_byte_array_dictionary_reader(
690735
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
691736
let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
692737

693-
make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type))
694-
.unwrap()
738+
make_byte_array_dictionary_reader(
739+
Box::new(page_iterator),
740+
column_desc,
741+
Some(arrow_type),
742+
DEFAULT_BATCH_SIZE,
743+
)
744+
.unwrap()
695745
}
696746

697747
fn create_string_list_reader(

parquet/src/arrow/array_reader/builder.rs

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,26 @@ pub struct ArrayReaderBuilder<'a> {
9696
parquet_metadata: Option<&'a ParquetMetaData>,
9797
/// metrics
9898
metrics: &'a ArrowReaderMetrics,
99+
/// Batch size for pre-allocating internal buffers
100+
batch_size: usize,
99101
}
100102

101103
impl<'a> ArrayReaderBuilder<'a> {
102-
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
104+
/// Create a new `ArrayReaderBuilder`
105+
///
106+
/// `batch_size` is used to pre-allocate internal buffers with the expected capacity,
107+
/// avoiding reallocations when reading the first batch of data.
108+
pub fn new(
109+
row_groups: &'a dyn RowGroups,
110+
metrics: &'a ArrowReaderMetrics,
111+
batch_size: usize,
112+
) -> Self {
103113
Self {
104114
row_groups,
105115
cache_options: None,
106116
parquet_metadata: None,
107117
metrics,
118+
batch_size,
108119
}
109120
}
110121

@@ -389,55 +400,78 @@ impl<'a> ArrayReaderBuilder<'a> {
389400
page_iterator,
390401
column_desc,
391402
arrow_type,
403+
self.batch_size,
392404
)?) as _,
393405
PhysicalType::INT32 => {
394406
if let Some(DataType::Null) = arrow_type {
395407
Box::new(NullArrayReader::<Int32Type>::new(
396408
page_iterator,
397409
column_desc,
410+
self.batch_size,
398411
)?) as _
399412
} else {
400413
Box::new(PrimitiveArrayReader::<Int32Type>::new(
401414
page_iterator,
402415
column_desc,
403416
arrow_type,
417+
self.batch_size,
404418
)?) as _
405419
}
406420
}
407421
PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
408422
page_iterator,
409423
column_desc,
410424
arrow_type,
425+
self.batch_size,
411426
)?) as _,
412427
PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
413428
page_iterator,
414429
column_desc,
415430
arrow_type,
431+
self.batch_size,
416432
)?) as _,
417433
PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
418434
page_iterator,
419435
column_desc,
420436
arrow_type,
437+
self.batch_size,
421438
)?) as _,
422439
PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
423440
page_iterator,
424441
column_desc,
425442
arrow_type,
443+
self.batch_size,
426444
)?) as _,
427445
PhysicalType::BYTE_ARRAY => match arrow_type {
428-
Some(DataType::Dictionary(_, _)) => {
429-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
446+
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
447+
page_iterator,
448+
column_desc,
449+
arrow_type,
450+
self.batch_size,
451+
)?,
452+
Some(DataType::Utf8View | DataType::BinaryView) => make_byte_view_array_reader(
453+
page_iterator,
454+
column_desc,
455+
arrow_type,
456+
self.batch_size,
457+
)?,
458+
_ => {
459+
make_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
430460
}
431-
Some(DataType::Utf8View | DataType::BinaryView) => {
432-
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
433-
}
434-
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
435461
},
436462
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
437-
Some(DataType::Dictionary(_, _)) => {
438-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
439-
}
440-
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
463+
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
464+
page_iterator,
465+
column_desc,
466+
arrow_type,
467+
self.batch_size,
468+
)?,
469+
_ => make_fixed_len_byte_array_reader(
470+
page_iterator,
471+
column_desc,
472+
arrow_type,
473+
self.batch_size,
474+
)?,
441475
},
442476
};
443477
Ok(Some(reader))
@@ -507,7 +541,7 @@ mod tests {
507541
.unwrap();
508542

509543
let metrics = ArrowReaderMetrics::disabled();
510-
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
544+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
511545
.build_array_reader(fields.as_ref(), &mask)
512546
.unwrap();
513547

@@ -540,7 +574,7 @@ mod tests {
540574
.unwrap();
541575

542576
let metrics = ArrowReaderMetrics::disabled();
543-
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
577+
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
544578
.with_parquet_metadata(file_reader.metadata())
545579
.build_array_reader(fields.as_ref(), &mask)
546580
.unwrap();

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,14 @@ use std::any::Any;
3838
use std::sync::Arc;
3939

4040
/// Returns an [`ArrayReader`] that decodes the provided byte array column
41+
///
42+
/// `batch_size` is used to pre-allocate internal buffers,
43+
/// avoiding reallocations when reading the first batch of data.
4144
pub fn make_byte_array_reader(
4245
pages: Box<dyn PageIterator>,
4346
column_desc: ColumnDescPtr,
4447
arrow_type: Option<ArrowType>,
48+
batch_size: usize,
4549
) -> Result<Box<dyn ArrayReader>> {
4650
// Check if Arrow type is specified, else create it from Parquet type
4751
let data_type = match arrow_type {
@@ -56,13 +60,13 @@ pub fn make_byte_array_reader(
5660
| ArrowType::Utf8
5761
| ArrowType::Decimal128(_, _)
5862
| ArrowType::Decimal256(_, _) => {
59-
let reader = GenericRecordReader::new(column_desc);
63+
let reader = GenericRecordReader::new(column_desc, batch_size);
6064
Ok(Box::new(ByteArrayReader::<i32>::new(
6165
pages, data_type, reader,
6266
)))
6367
}
6468
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
65-
let reader = GenericRecordReader::new(column_desc);
69+
let reader = GenericRecordReader::new(column_desc, batch_size);
6670
Ok(Box::new(ByteArrayReader::<i64>::new(
6771
pages, data_type, reader,
6872
)))
@@ -202,7 +206,7 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
202206
));
203207
}
204208

205-
let mut buffer = OffsetBuffer::default();
209+
let mut buffer = OffsetBuffer::with_capacity(0);
206210
let mut decoder = ByteArrayDecoderPlain::new(
207211
buf,
208212
num_values as usize,
@@ -620,7 +624,7 @@ mod tests {
620624
.unwrap();
621625

622626
for (encoding, page) in pages {
623-
let mut output = OffsetBuffer::<i32>::default();
627+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
624628
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
625629

626630
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -675,7 +679,7 @@ mod tests {
675679
.unwrap();
676680

677681
for (encoding, page) in pages {
678-
let mut output = OffsetBuffer::<i32>::default();
682+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
679683
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
680684

681685
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
@@ -719,7 +723,7 @@ mod tests {
719723

720724
// test nulls read
721725
for (encoding, page) in pages.clone() {
722-
let mut output = OffsetBuffer::<i32>::default();
726+
let mut output = OffsetBuffer::<i32>::with_capacity(0);
723727
decoder.set_data(encoding, page, 4, None).unwrap();
724728
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
725729
}

0 commit comments

Comments
 (0)