Skip to content

Commit 5c4254c

Browse files
committed
WIP: Add WriterPropertiesCustomizer::allow_single_file_parallelism
1 parent 1eacebe commit 5c4254c

File tree

2 files changed

+6
-4
lines changed

2 files changed

+6
-4
lines changed

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ impl WriterPropertiesConfig {
7777

7878
pub trait WriterPropertiesCustomizer: Sync + Send + std::fmt::Debug {
7979
fn adjust_write_properties(&self, builder: WriterPropertiesBuilder) -> WriterPropertiesBuilder;
80+
fn allow_single_file_parallelism(&self) -> bool {
81+
true
82+
}
8083
}
8184

8285
#[derive(Debug)]

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,6 @@ pub async fn fetch_parquet_metadata(
477477
let mut footer = [0; 8];
478478
footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
479479

480-
// TODO upgrade DF: Can't use decode_footer directly.
481480
#[allow(deprecated)]
482481
let length = decode_footer(&footer)?;
483482

@@ -798,7 +797,7 @@ impl DataSink for ParquetSink {
798797

799798
let parquet_opts = &self.parquet_options;
800799
let allow_single_file_parallelism =
801-
parquet_opts.global.allow_single_file_parallelism;
800+
parquet_opts.global.allow_single_file_parallelism && self.customizer.allow_single_file_parallelism();
802801

803802
let part_col = if !self.config.table_partition_cols.is_empty() {
804803
Some(self.config.table_partition_cols.clone())
@@ -937,8 +936,8 @@ fn spawn_column_parallel_row_group_writer(
937936
pool: &Arc<dyn MemoryPool>,
938937
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
939938
let schema_desc = arrow_to_parquet_schema(&schema)?;
940-
let row_group_ordinal = 0; // TODO upgrade DF: encrypt
941-
let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema, row_group_ordinal)?;
939+
let row_group_ordinal_unused = 0;
940+
let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema, row_group_ordinal_unused)?;
942941
let num_columns = col_writers.len();
943942

944943
let mut col_writer_tasks = Vec::with_capacity(num_columns);

0 commit comments

Comments
 (0)