Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 71 additions & 21 deletions parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use parquet::arrow::array_reader::{
ListArrayReader, make_byte_array_reader, make_byte_view_array_reader,
make_fixed_len_byte_array_reader,
};
use parquet::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
use parquet::basic::Type;
use parquet::data_type::{ByteArray, FixedLenByteArrayType};
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
Expand Down Expand Up @@ -611,15 +612,23 @@ fn create_primitive_array_reader(
use parquet::arrow::array_reader::PrimitiveArrayReader;
match column_desc.physical_type() {
Type::INT32 => {
let reader =
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();
let reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap();
Box::new(reader)
}
Type::INT64 => {
let reader =
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();
let reader = PrimitiveArrayReader::<Int64Type>::new(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap();
Box::new(reader)
}
_ => unreachable!(),
Expand All @@ -632,9 +641,13 @@ fn create_f16_by_bytes_reader(
) -> Box<dyn ArrayReader> {
let physical_type = column_desc.physical_type();
match physical_type {
Type::FIXED_LEN_BYTE_ARRAY => {
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap(),
_ => unimplemented!(),
}
}
Expand All @@ -645,12 +658,20 @@ fn create_decimal_by_bytes_reader(
) -> Box<dyn ArrayReader> {
let physical_type = column_desc.physical_type();
match physical_type {
Type::BYTE_ARRAY => {
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
Type::FIXED_LEN_BYTE_ARRAY => {
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}
Type::BYTE_ARRAY => make_byte_array_reader(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap(),
Type::FIXED_LEN_BYTE_ARRAY => make_fixed_len_byte_array_reader(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap(),
_ => unimplemented!(),
}
}
Expand All @@ -659,28 +680,52 @@ fn create_fixed_len_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
make_fixed_len_byte_array_reader(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap()
}

fn create_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
make_byte_array_reader(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap()
}

fn create_byte_view_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
make_byte_view_array_reader(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap()
}

fn create_string_view_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
make_byte_view_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
make_byte_view_array_reader(
Box::new(page_iterator),
column_desc,
None,
DEFAULT_BATCH_SIZE,
)
.unwrap()
}

fn create_string_byte_array_dictionary_reader(
Expand All @@ -690,8 +735,13 @@ fn create_string_byte_array_dictionary_reader(
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

make_byte_array_dictionary_reader(Box::new(page_iterator), column_desc, Some(arrow_type))
.unwrap()
make_byte_array_dictionary_reader(
Box::new(page_iterator),
column_desc,
Some(arrow_type),
DEFAULT_BATCH_SIZE,
)
.unwrap()
}

fn create_string_list_reader(
Expand Down
60 changes: 47 additions & 13 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,26 @@ pub struct ArrayReaderBuilder<'a> {
parquet_metadata: Option<&'a ParquetMetaData>,
/// metrics
metrics: &'a ArrowReaderMetrics,
/// Batch size for pre-allocating internal buffers
batch_size: usize,
}

impl<'a> ArrayReaderBuilder<'a> {
pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
/// Create a new `ArrayReaderBuilder`
///
/// `batch_size` is used to pre-allocate internal buffers with the expected capacity,
/// avoiding reallocations when reading the first batch of data.
pub fn new(
row_groups: &'a dyn RowGroups,
metrics: &'a ArrowReaderMetrics,
batch_size: usize,
) -> Self {
Self {
row_groups,
cache_options: None,
parquet_metadata: None,
metrics,
batch_size,
}
}

Expand Down Expand Up @@ -389,55 +400,78 @@ impl<'a> ArrayReaderBuilder<'a> {
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?) as _,
PhysicalType::INT32 => {
if let Some(DataType::Null) = arrow_type {
Box::new(NullArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
self.batch_size,
)?) as _
} else {
Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?) as _
}
}
PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?) as _,
PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?) as _,
PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?) as _,
PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?) as _,
PhysicalType::BYTE_ARRAY => match arrow_type {
Some(DataType::Dictionary(_, _)) => {
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?,
Some(DataType::Utf8View | DataType::BinaryView) => make_byte_view_array_reader(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?,
_ => {
make_byte_array_reader(page_iterator, column_desc, arrow_type, self.batch_size)?
}
Some(DataType::Utf8View | DataType::BinaryView) => {
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
}
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
Some(DataType::Dictionary(_, _)) => {
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
}
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?,
_ => make_fixed_len_byte_array_reader(
page_iterator,
column_desc,
arrow_type,
self.batch_size,
)?,
},
};
Ok(Some(reader))
Expand Down Expand Up @@ -507,7 +541,7 @@ mod tests {
.unwrap();

let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
.build_array_reader(fields.as_ref(), &mask)
.unwrap();

Expand Down Expand Up @@ -540,7 +574,7 @@ mod tests {
.unwrap();

let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics, 1024)
.with_parquet_metadata(file_reader.metadata())
.build_array_reader(fields.as_ref(), &mask)
.unwrap();
Expand Down
16 changes: 10 additions & 6 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ use std::any::Any;
use std::sync::Arc;

/// Returns an [`ArrayReader`] that decodes the provided byte array column
///
/// `batch_size` is used to pre-allocate internal buffers,
/// avoiding reallocations when reading the first batch of data.
pub fn make_byte_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
batch_size: usize,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Expand All @@ -56,13 +60,13 @@ pub fn make_byte_array_reader(
| ArrowType::Utf8
| ArrowType::Decimal128(_, _)
| ArrowType::Decimal256(_, _) => {
let reader = GenericRecordReader::new(column_desc);
let reader = GenericRecordReader::new(column_desc, batch_size);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
)))
}
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
let reader = GenericRecordReader::new(column_desc);
let reader = GenericRecordReader::new(column_desc, batch_size);
Ok(Box::new(ByteArrayReader::<i64>::new(
pages, data_type, reader,
)))
Expand Down Expand Up @@ -202,7 +206,7 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
));
}

let mut buffer = OffsetBuffer::default();
let mut buffer = OffsetBuffer::with_capacity(0);
let mut decoder = ByteArrayDecoderPlain::new(
buf,
num_values as usize,
Expand Down Expand Up @@ -620,7 +624,7 @@ mod tests {
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
let mut output = OffsetBuffer::<i32>::with_capacity(0);
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
Expand Down Expand Up @@ -675,7 +679,7 @@ mod tests {
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
let mut output = OffsetBuffer::<i32>::with_capacity(0);
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
Expand Down Expand Up @@ -719,7 +723,7 @@ mod tests {

// test nulls read
for (encoding, page) in pages.clone() {
let mut output = OffsetBuffer::<i32>::default();
let mut output = OffsetBuffer::<i32>::with_capacity(0);
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
}
Expand Down
Loading
Loading