Skip to content

Commit a4f04e7

Browse files
committed
pass down batch_size from ArrowReaderBuilder through ArrayReaderBuilder.Ensure internal buffers to be pre-allocated.
1 parent e2b2b8f commit a4f04e7

File tree

13 files changed

+154
-26
lines changed

13 files changed

+154
-26
lines changed

parquet/benches/arrow_reader.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,8 @@ fn create_f16_by_bytes_reader(
627627
let physical_type = column_desc.physical_type();
628628
match physical_type {
629629
Type::FIXED_LEN_BYTE_ARRAY => {
630-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
630+
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None, None)
631+
.unwrap()
631632
}
632633
_ => unimplemented!(),
633634
}
@@ -640,10 +641,11 @@ fn create_decimal_by_bytes_reader(
640641
let physical_type = column_desc.physical_type();
641642
match physical_type {
642643
Type::BYTE_ARRAY => {
643-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
644+
make_byte_array_reader(Box::new(page_iterator), column_desc, None, None).unwrap()
644645
}
645646
Type::FIXED_LEN_BYTE_ARRAY => {
646-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
647+
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None, None)
648+
.unwrap()
647649
}
648650
_ => unimplemented!(),
649651
}
@@ -653,28 +655,28 @@ fn create_fixed_len_byte_array_reader(
653655
page_iterator: impl PageIterator + 'static,
654656
column_desc: ColumnDescPtr,
655657
) -> Box<dyn ArrayReader> {
656-
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
658+
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None, None).unwrap()
657659
}
658660

659661
fn create_byte_array_reader(
660662
page_iterator: impl PageIterator + 'static,
661663
column_desc: ColumnDescPtr,
662664
) -> Box<dyn ArrayReader> {
663-
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
665+
make_byte_array_reader(Box::new(page_iterator), column_desc, None, None).unwrap()
664666
}
665667

666668
fn create_byte_view_array_reader(
667669
page_iterator: impl PageIterator + 'static,
668670
column_desc: ColumnDescPtr,
669671
) -> Box<dyn ArrayReader> {
670-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
672+
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None, None).unwrap()
671673
}
672674

673675
fn create_string_view_byte_array_reader(
674676
page_iterator: impl PageIterator + 'static,
675677
column_desc: ColumnDescPtr,
676678
) -> Box<dyn ArrayReader> {
677-
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
679+
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None, None).unwrap()
678680
}
679681

680682
fn create_string_byte_array_dictionary_reader(
@@ -684,7 +686,7 @@ fn create_string_byte_array_dictionary_reader(
684686
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
685687
let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
686688

687-
make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type))
689+
make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type), None)
688690
.unwrap()
689691
}
690692

parquet/src/arrow/array_reader/builder.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ pub struct ArrayReaderBuilder<'a> {
9595
parquet_metadata: Option<&'a ParquetMetaData>,
9696
/// metrics
9797
metrics: &'a ArrowReaderMetrics,
98+
/// Batch size for pre-allocating internal buffers
99+
batch_size: Option<usize>,
98100
}
99101

100102
impl<'a> ArrayReaderBuilder<'a> {
@@ -104,6 +106,7 @@ impl<'a> ArrayReaderBuilder<'a> {
104106
cache_options: None,
105107
parquet_metadata: None,
106108
metrics,
109+
batch_size: None,
107110
}
108111
}
109112

@@ -119,6 +122,15 @@ impl<'a> ArrayReaderBuilder<'a> {
119122
self
120123
}
121124

125+
/// Set the batch size for pre-allocating internal buffers
126+
///
127+
/// This allows the reader to pre-allocate buffers with the expected capacity,
128+
/// avoiding reallocations when reading the first batch of data.
129+
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
130+
self.batch_size = Some(batch_size);
131+
self
132+
}
133+
122134
/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
123135
pub fn build_array_reader(
124136
&self,
@@ -410,18 +422,18 @@ impl<'a> ArrayReaderBuilder<'a> {
410422
)?) as _,
411423
PhysicalType::BYTE_ARRAY => match arrow_type {
412424
Some(DataType::Dictionary(_, _)) => {
413-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
425+
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
414426
}
415427
Some(DataType::Utf8View | DataType::BinaryView) => {
416-
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
428+
make_byte_view_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
417429
}
418-
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
430+
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?,
419431
},
420432
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
421433
Some(DataType::Dictionary(_, _)) => {
422-
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
434+
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
423435
}
424-
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
436+
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?,
425437
},
426438
};
427439
Ok(Some(reader))

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,14 @@ use std::any::Any;
3838
use std::sync::Arc;
3939

4040
/// Returns an [`ArrayReader`] that decodes the provided byte array column
41+
///
42+
/// The optional `batch_size` parameter is used to pre-allocate internal buffers,
43+
/// avoiding reallocations when reading the first batch of data.
4144
pub fn make_byte_array_reader(
4245
pages: Box<dyn PageIterator>,
4346
column_desc: ColumnDescPtr,
4447
arrow_type: Option<ArrowType>,
48+
batch_size: Option<usize>,
4549
) -> Result<Box<dyn ArrayReader>> {
4650
// Check if Arrow type is specified, else create it from Parquet type
4751
let data_type = match arrow_type {
@@ -56,13 +60,23 @@ pub fn make_byte_array_reader(
5660
| ArrowType::Utf8
5761
| ArrowType::Decimal128(_, _)
5862
| ArrowType::Decimal256(_, _) => {
59-
let reader = GenericRecordReader::new(column_desc);
63+
let reader = match batch_size {
64+
Some(capacity) => {
65+
GenericRecordReader::new_with_capacity(column_desc, capacity)
66+
}
67+
None => GenericRecordReader::new(column_desc),
68+
};
6069
Ok(Box::new(ByteArrayReader::<i32>::new(
6170
pages, data_type, reader,
6271
)))
6372
}
6473
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
65-
let reader = GenericRecordReader::new(column_desc);
74+
let reader = match batch_size {
75+
Some(capacity) => {
76+
GenericRecordReader::new_with_capacity(column_desc, capacity)
77+
}
78+
None => GenericRecordReader::new(column_desc),
79+
};
6680
Ok(Box::new(ByteArrayReader::<i64>::new(
6781
pages, data_type, reader,
6882
)))

parquet/src/arrow/array_reader/byte_array_dictionary.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,17 @@ use crate::util::bit_util::FromBytes;
4040
/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
4141
macro_rules! make_reader {
4242
(
43-
($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, $v:expr) {
43+
($pages:expr, $column_desc:expr, $data_type:expr, $batch_size:expr) => match ($k:expr, $v:expr) {
4444
$(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+
4545
}
4646
) => {
4747
match (($k, $v)) {
4848
$(
4949
($key_arrow, $value_arrow) => {
50-
let reader = GenericRecordReader::new($column_desc);
50+
let reader = match $batch_size {
51+
Some(capacity) => GenericRecordReader::new_with_capacity($column_desc, capacity),
52+
None => GenericRecordReader::new($column_desc),
53+
};
5154
Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new(
5255
$pages, $data_type, reader,
5356
)))
@@ -73,10 +76,13 @@ macro_rules! make_reader {
7376
/// It is therefore recommended that if `pages` contains data from multiple column chunks,
7477
/// that the read batch size used is a divisor of the row group size
7578
///
79+
/// The optional `batch_size` parameter is used to pre-allocate internal buffers,
80+
/// avoiding reallocations when reading the first batch of data.
7681
pub fn make_byte_array_dictionary_reader(
7782
pages: Box<dyn PageIterator>,
7883
column_desc: ColumnDescPtr,
7984
arrow_type: Option<ArrowType>,
85+
batch_size: Option<usize>,
8086
) -> Result<Box<dyn ArrayReader>> {
8187
// Check if Arrow type is specified, else create it from Parquet type
8288
let data_type = match arrow_type {
@@ -89,7 +95,7 @@ pub fn make_byte_array_dictionary_reader(
8995
match &data_type {
9096
ArrowType::Dictionary(key_type, value_type) => {
9197
make_reader! {
92-
(pages, column_desc, data_type) => match (key_type.as_ref(), value_type.as_ref()) {
98+
(pages, column_desc, data_type, batch_size) => match (key_type.as_ref(), value_type.as_ref()) {
9399
(ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u8, i32),
94100
(ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64),
95101
(ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i8, i32),

parquet/src/arrow/array_reader/byte_view_array.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@ use bytes::Bytes;
3636
use std::any::Any;
3737

3838
/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
39+
///
40+
/// The optional `batch_size` parameter is used to pre-allocate internal buffers,
41+
/// avoiding reallocations when reading the first batch of data.
3942
pub fn make_byte_view_array_reader(
4043
pages: Box<dyn PageIterator>,
4144
column_desc: ColumnDescPtr,
4245
arrow_type: Option<ArrowType>,
46+
batch_size: Option<usize>,
4347
) -> Result<Box<dyn ArrayReader>> {
4448
// Check if Arrow type is specified, else create it from Parquet type
4549
let data_type = match arrow_type {
@@ -52,7 +56,10 @@ pub fn make_byte_view_array_reader(
5256

5357
match data_type {
5458
ArrowType::BinaryView | ArrowType::Utf8View => {
55-
let reader = GenericRecordReader::new(column_desc);
59+
let reader = match batch_size {
60+
Some(capacity) => GenericRecordReader::new_with_capacity(column_desc, capacity),
61+
None => GenericRecordReader::new(column_desc),
62+
};
5663
Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
5764
}
5865

parquet/src/arrow/array_reader/fixed_len_byte_array.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,14 @@ use std::ops::Range;
4040
use std::sync::Arc;
4141

4242
/// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column
43+
///
44+
/// The optional `batch_size` parameter is used to pre-allocate internal buffers,
45+
/// avoiding reallocations when reading the first batch of data.
4346
pub fn make_fixed_len_byte_array_reader(
4447
pages: Box<dyn PageIterator>,
4548
column_desc: ColumnDescPtr,
4649
arrow_type: Option<ArrowType>,
50+
batch_size: Option<usize>,
4751
) -> Result<Box<dyn ArrayReader>> {
4852
// Check if Arrow type is specified, else create it from Parquet type
4953
let data_type = match arrow_type {
@@ -126,6 +130,7 @@ pub fn make_fixed_len_byte_array_reader(
126130
column_desc,
127131
data_type,
128132
byte_length,
133+
batch_size,
129134
)))
130135
}
131136

@@ -144,14 +149,19 @@ impl FixedLenByteArrayReader {
144149
column_desc: ColumnDescPtr,
145150
data_type: ArrowType,
146151
byte_length: usize,
152+
batch_size: Option<usize>,
147153
) -> Self {
154+
let record_reader = match batch_size {
155+
Some(capacity) => GenericRecordReader::new_with_capacity(column_desc, capacity),
156+
None => GenericRecordReader::new(column_desc),
157+
};
148158
Self {
149159
data_type,
150160
byte_length,
151161
pages,
152162
def_levels_buffer: None,
153163
rep_levels_buffer: None,
154-
record_reader: GenericRecordReader::new(column_desc),
164+
record_reader,
155165
}
156166
}
157167
}
@@ -284,6 +294,15 @@ fn move_values<F>(
284294
}
285295

286296
impl ValuesBuffer for FixedLenByteArrayBuffer {
297+
fn with_capacity(_capacity: usize) -> Self {
298+
// byte_length is not known at trait level, so we return a default buffer
299+
// The decoder will pre-allocate when it knows both capacity and byte_length
300+
Self {
301+
buffer: Vec::new(),
302+
byte_length: None,
303+
}
304+
}
305+
287306
fn pad_nulls(
288307
&mut self,
289308
read_offset: usize,

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ mod read_plan;
5757
pub(crate) mod selection;
5858
pub mod statistics;
5959

60+
/// Default batch size for reading parquet files
61+
pub const DEFAULT_BATCH_SIZE: usize = 1024;
62+
6063
/// Builder for constructing Parquet readers that decode into [Apache Arrow]
6164
/// arrays.
6265
///
@@ -168,7 +171,7 @@ impl<T> ArrowReaderBuilder<T> {
168171
metadata: metadata.metadata,
169172
schema: metadata.schema,
170173
fields: metadata.fields,
171-
batch_size: 1024,
174+
batch_size: DEFAULT_BATCH_SIZE,
172175
row_groups: None,
173176
projection: ProjectionMask::all(),
174177
filter: None,
@@ -196,7 +199,7 @@ impl<T> ArrowReaderBuilder<T> {
196199
&self.schema
197200
}
198201

199-
/// Set the size of [`RecordBatch`] to produce. Defaults to 1024
202+
/// Set the size of [`RecordBatch`] to produce. Defaults to [`DEFAULT_BATCH_SIZE`]
200203
/// If the batch_size more than the file row count, use the file row count.
201204
pub fn with_batch_size(self, batch_size: usize) -> Self {
202205
// Try to avoid allocate large buffer
@@ -1066,6 +1069,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
10661069

10671070
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
10681071
.with_parquet_metadata(&reader.metadata)
1072+
.with_batch_size(batch_size)
10691073
.build_array_reader(fields.as_deref(), predicate.projection())?;
10701074

10711075
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
@@ -1074,6 +1078,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
10741078

10751079
let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
10761080
.with_parquet_metadata(&reader.metadata)
1081+
.with_batch_size(batch_size)
10771082
.build_array_reader(fields.as_deref(), &projection)?;
10781083

10791084
let read_plan = plan_builder
@@ -1381,6 +1386,7 @@ impl ParquetRecordBatchReader {
13811386
let metrics = ArrowReaderMetrics::disabled();
13821387
let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
13831388
.with_parquet_metadata(row_groups.metadata())
1389+
.with_batch_size(batch_size)
13841390
.build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
13851391

13861392
let read_plan = ReadPlanBuilder::new(batch_size)

parquet/src/arrow/buffer/dictionary_buffer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
194194
}
195195

196196
impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
197+
fn with_capacity(capacity: usize) -> Self {
198+
Self::Values {
199+
values: OffsetBuffer::with_capacity(capacity),
200+
}
201+
}
202+
197203
fn pad_nulls(
198204
&mut self,
199205
read_offset: usize,

parquet/src/arrow/buffer/offset_buffer.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
4444
}
4545

4646
impl<I: OffsetSizeTrait> OffsetBuffer<I> {
47+
/// Create a new `OffsetBuffer` with capacity for at least `capacity` elements
48+
///
49+
/// Pre-allocates the offsets vector to avoid reallocations during reading.
50+
/// The values vector is not pre-allocated as its size is unpredictable.
51+
pub fn with_capacity(capacity: usize) -> Self {
52+
let mut offsets = Vec::with_capacity(capacity + 1);
53+
offsets.push(I::default());
54+
Self {
55+
offsets,
56+
values: Vec::new(),
57+
}
58+
}
59+
4760
/// Returns the number of byte arrays in this buffer
4861
pub fn len(&self) -> usize {
4962
self.offsets.len() - 1
@@ -139,6 +152,10 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
139152
}
140153

141154
impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
155+
fn with_capacity(capacity: usize) -> Self {
156+
Self::with_capacity(capacity)
157+
}
158+
142159
fn pad_nulls(
143160
&mut self,
144161
read_offset: usize,

0 commit comments

Comments
 (0)