Skip to content

Commit 7c9a7a6

Browse files
committed
feat(cube): Add and use split_row_group_reads in ParquetOptions
1 parent 9157065 commit 7c9a7a6

File tree

13 files changed

+61
-19
lines changed

13 files changed

+61
-19
lines changed

Cargo.lock

Lines changed: 19 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,9 @@ config_namespace! {
452452
/// BLOB instead.
453453
pub binary_as_string: bool, default = false
454454

455+
/// (reading) Allows multiple I/O's per row group, if the batch size is small enough.
456+
pub split_row_group_reads: bool, default = false
457+
455458
// The following options affect writing to parquet files
456459
// and map to parquet::file::properties::WriterProperties
457460

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ impl ParquetOptions {
272272
bloom_filter_on_read: _, // reads not used for writer props
273273
schema_force_view_types: _,
274274
binary_as_string: _, // not used for writer props
275+
split_row_group_reads: _,
275276
skip_arrow_metadata: _,
276277
} = self;
277278

@@ -548,6 +549,7 @@ mod tests {
548549
bloom_filter_on_read: defaults.bloom_filter_on_read,
549550
schema_force_view_types: defaults.schema_force_view_types,
550551
binary_as_string: defaults.binary_as_string,
552+
split_row_group_reads: defaults.split_row_group_reads,
551553
skip_arrow_metadata: defaults.skip_arrow_metadata,
552554
}
553555
}
@@ -654,6 +656,7 @@ mod tests {
654656
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
655657
schema_force_view_types: global_options_defaults.schema_force_view_types,
656658
binary_as_string: global_options_defaults.binary_as_string,
659+
split_row_group_reads: global_options_defaults.split_row_group_reads,
657660
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
658661
},
659662
column_specific_options,

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ pub(super) struct ParquetOpener {
5555
pub projection: Arc<[usize]>,
5656
/// Target number of rows in each output RecordBatch
5757
pub batch_size: usize,
58+
/// If true, perform multiple I/O's per row group (if batch size is smaller than row group size)
59+
pub split_row_group_reads: bool,
5860
/// Optional limit on the number of rows to read
5961
pub limit: Option<usize>,
6062
/// Optional predicate to apply during the scan
@@ -107,6 +109,7 @@ impl FileOpener for ParquetOpener {
107109
)?;
108110

109111
let batch_size = self.batch_size;
112+
let split_row_group_reads = self.split_row_group_reads;
110113

111114
let projected_schema =
112115
SchemaRef::from(self.table_schema.project(&self.projection)?);
@@ -262,6 +265,7 @@ impl FileOpener for ParquetOpener {
262265
let stream = builder
263266
.with_projection(mask)
264267
.with_batch_size(batch_size)
268+
.with_split_row_group_reads(split_row_group_reads)
265269
.with_row_groups(row_group_indexes)
266270
.build()?;
267271

datafusion/core/src/datasource/physical_plan/parquet/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ impl FileSource for ParquetSource {
515515
batch_size: self
516516
.batch_size
517517
.expect("Batch size must set before creating ParquetOpener"),
518+
split_row_group_reads: self.table_parquet_options.global.split_row_group_reads,
518519
limit: base_config.limit,
519520
predicate: self.predicate.clone(),
520521
pruning_predicate: self.pruning_predicate.clone(),

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ message ParquetOptions {
497497
bool schema_force_view_types = 28; // default = false
498498
bool binary_as_string = 29; // default = false
499499
bool skip_arrow_metadata = 30; // default = false
500+
bool split_row_group_reads = 32; // default = false
500501

501502
oneof metadata_size_hint_opt {
502503
uint64 metadata_size_hint = 4;

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
984984
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize,
985985
schema_force_view_types: value.schema_force_view_types,
986986
binary_as_string: value.binary_as_string,
987+
split_row_group_reads: value.split_row_group_reads,
987988
skip_arrow_metadata: value.skip_arrow_metadata,
988989
})
989990
}

datafusion/proto-common/src/generated/pbjson.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4939,6 +4939,9 @@ impl serde::Serialize for ParquetOptions {
49394939
if self.skip_arrow_metadata {
49404940
len += 1;
49414941
}
4942+
if self.split_row_group_reads {
4943+
len += 1;
4944+
}
49424945
if self.dictionary_page_size_limit != 0 {
49434946
len += 1;
49444947
}
@@ -5038,6 +5041,9 @@ impl serde::Serialize for ParquetOptions {
50385041
if self.skip_arrow_metadata {
50395042
struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?;
50405043
}
5044+
if self.split_row_group_reads {
5045+
struct_ser.serialize_field("splitRowGroupReads", &self.split_row_group_reads)?;
5046+
}
50415047
if self.dictionary_page_size_limit != 0 {
50425048
#[allow(clippy::needless_borrow)]
50435049
#[allow(clippy::needless_borrows_for_generic_args)]
@@ -5177,6 +5183,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
51775183
"binaryAsString",
51785184
"skip_arrow_metadata",
51795185
"skipArrowMetadata",
5186+
"split_row_group_reads",
5187+
"splitRowGroupReads",
51805188
"dictionary_page_size_limit",
51815189
"dictionaryPageSizeLimit",
51825190
"data_page_row_count_limit",
@@ -5223,6 +5231,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
52235231
SchemaForceViewTypes,
52245232
BinaryAsString,
52255233
SkipArrowMetadata,
5234+
SplitRowGroupReads,
52265235
DictionaryPageSizeLimit,
52275236
DataPageRowCountLimit,
52285237
MaxRowGroupSize,
@@ -5274,6 +5283,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
52745283
"schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes),
52755284
"binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString),
52765285
"skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata),
5286+
"splitRowGroupReads" | "split_row_group_reads" => Ok(GeneratedField::SplitRowGroupReads),
52775287
"dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit),
52785288
"dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit),
52795289
"maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize),
@@ -5323,6 +5333,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
53235333
let mut schema_force_view_types__ = None;
53245334
let mut binary_as_string__ = None;
53255335
let mut skip_arrow_metadata__ = None;
5336+
let mut split_row_group_reads__ = None;
53265337
let mut dictionary_page_size_limit__ = None;
53275338
let mut data_page_row_count_limit__ = None;
53285339
let mut max_row_group_size__ = None;
@@ -5443,6 +5454,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
54435454
}
54445455
skip_arrow_metadata__ = Some(map_.next_value()?);
54455456
}
5457+
GeneratedField::SplitRowGroupReads => {
5458+
if split_row_group_reads__.is_some() {
5459+
return Err(serde::de::Error::duplicate_field("splitRowGroupReads"));
5460+
}
5461+
split_row_group_reads__ = Some(map_.next_value()?);
5462+
}
54465463
GeneratedField::DictionaryPageSizeLimit => {
54475464
if dictionary_page_size_limit__.is_some() {
54485465
return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit"));
@@ -5552,6 +5569,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
55525569
schema_force_view_types: schema_force_view_types__.unwrap_or_default(),
55535570
binary_as_string: binary_as_string__.unwrap_or_default(),
55545571
skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(),
5572+
split_row_group_reads: split_row_group_reads__.unwrap_or_default(),
55555573
dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(),
55565574
data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(),
55575575
max_row_group_size: max_row_group_size__.unwrap_or_default(),

datafusion/proto-common/src/generated/prost.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,9 @@ pub struct ParquetOptions {
764764
/// default = false
765765
#[prost(bool, tag = "30")]
766766
pub skip_arrow_metadata: bool,
767+
/// default = false
768+
#[prost(bool, tag = "32")]
769+
pub split_row_group_reads: bool,
767770
#[prost(uint64, tag = "12")]
768771
pub dictionary_page_size_limit: u64,
769772
#[prost(uint64, tag = "18")]

datafusion/proto-common/src/to_proto/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
835835
maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64,
836836
schema_force_view_types: value.schema_force_view_types,
837837
binary_as_string: value.binary_as_string,
838+
split_row_group_reads: value.split_row_group_reads,
838839
skip_arrow_metadata: value.skip_arrow_metadata,
839840
})
840841
}

0 commit comments

Comments
 (0)