Skip to content

Commit 1eacebe

Browse files
committed
WIP: ReaderOptionsCustomizer and WriterPropertiesCustomizer
1 parent 77d1326 commit 1eacebe

File tree

13 files changed

+209
-33
lines changed

13 files changed

+209
-33
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,8 +612,10 @@ impl AsyncFileReader for ParquetReaderWithCache {
612612

613613
fn get_metadata(
614614
&mut self,
615+
encryption_config: &Option<datafusion::parquet::file::encryption::ParquetEncryptionConfig>,
615616
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
616617
println!("get_metadata: {} returning cached metadata", self.filename);
618+
assert!(encryption_config.is_none());
617619

618620
// return the cached metadata so the parquet reader does not read it
619621
let metadata = self.metadata.clone();

datafusion/common/src/file_options/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ mod tests {
3333
use super::parquet_writer::ParquetWriterOptions;
3434
use crate::{
3535
config::{ConfigFileType, TableOptions},
36-
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
36+
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions, parquet_writer::WriterPropertiesConfig},
3737
parsers::CompressionTypeVariant,
3838
Result,
3939
};
@@ -79,7 +79,7 @@ mod tests {
7979
table_config.set_config_format(ConfigFileType::PARQUET);
8080
table_config.alter_with_string_hash_map(&option_map)?;
8181

82-
let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
82+
let parquet_options = ParquetWriterOptions::from_table_parquet_options(&table_config.parquet, WriterPropertiesConfig::noop().as_ref())?;
8383
let properties = parquet_options.writer_options();
8484

8585
// Verify the expected options propagated down to parquet crate WriterProperties struct
@@ -184,7 +184,7 @@ mod tests {
184184
table_config.set_config_format(ConfigFileType::PARQUET);
185185
table_config.alter_with_string_hash_map(&option_map)?;
186186

187-
let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
187+
let parquet_options = ParquetWriterOptions::from_table_parquet_options(&table_config.parquet, WriterPropertiesConfig::noop().as_ref())?;
188188
let properties = parquet_options.writer_options();
189189

190190
let col1 = ColumnPath::from(vec!["col1".to_owned()]);

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@
1717

1818
//! Options related to how parquet files should be written
1919
20+
use std::sync::Arc;
21+
2022
use crate::{
2123
config::{ParquetOptions, TableParquetOptions},
2224
DataFusionError, Result,
2325
};
2426

2527
use parquet::{
26-
basic::{BrotliLevel, GzipLevel, ZstdLevel},
27-
file::properties::{
28+
basic::{BrotliLevel, GzipLevel, ZstdLevel}, file::properties::{
2829
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
2930
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
30-
},
31-
format::KeyValue,
32-
schema::types::ColumnPath,
31+
}, format::KeyValue, schema::types::ColumnPath
3332
};
3433

3534
/// Options for writing parquet files
@@ -51,18 +50,44 @@ impl ParquetWriterOptions {
5150
}
5251
}
5352

54-
impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
55-
type Error = DataFusionError;
56-
57-
fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
53+
impl ParquetWriterOptions {
54+
pub fn from_table_parquet_options(parquet_table_options: &TableParquetOptions, customizer: &dyn WriterPropertiesCustomizer) -> Result<Self> {
5855
// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
56+
let mut builder = WriterPropertiesBuilder::try_from(parquet_table_options)?;
57+
builder = customizer.adjust_write_properties(builder);
58+
5959
Ok(ParquetWriterOptions {
60-
writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
61-
.build(),
60+
writer_options: builder.build(),
6261
})
6362
}
6463
}
6564

65+
pub struct WriterPropertiesConfig {
66+
pub customizer: Arc<dyn WriterPropertiesCustomizer>,
67+
}
68+
69+
impl WriterPropertiesConfig {
70+
pub fn noop() -> Arc<dyn WriterPropertiesCustomizer> {
71+
Arc::new(NoopWriterPropertiesCustomizer{})
72+
}
73+
pub fn todo() -> Arc<dyn WriterPropertiesCustomizer> { // TODO upgrade DF: no
74+
Self::noop()
75+
}
76+
}
77+
78+
pub trait WriterPropertiesCustomizer: Sync + Send + std::fmt::Debug {
79+
fn adjust_write_properties(&self, builder: WriterPropertiesBuilder) -> WriterPropertiesBuilder;
80+
}
81+
82+
#[derive(Debug)]
83+
pub struct NoopWriterPropertiesCustomizer;
84+
85+
impl WriterPropertiesCustomizer for NoopWriterPropertiesCustomizer {
86+
fn adjust_write_properties(&self, builder: WriterPropertiesBuilder) -> WriterPropertiesBuilder {
87+
builder
88+
}
89+
}
90+
6691
impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
6792
type Error = DataFusionError;
6893

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::execution::context::{SessionConfig, SessionState};
3434

3535
use arrow::datatypes::{DataType, Schema, SchemaRef};
3636
use datafusion_common::config::TableOptions;
37+
use datafusion_common::file_options::parquet_writer::{WriterPropertiesConfig, WriterPropertiesCustomizer};
3738
use datafusion_common::{
3839
DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
3940
DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
@@ -543,6 +544,11 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
543544
}
544545
}
545546

547+
/// Retrieves `WriterPropertiesConfig` from the `SessionConfig::extensions` field, constructing a Noop customizer if not present.
548+
pub fn get_writer_properties_customizer(config: &SessionConfig) -> Arc<dyn WriterPropertiesCustomizer> {
549+
config.get_extension::<WriterPropertiesConfig>().map_or_else(|| WriterPropertiesConfig::noop(), |cfg| cfg.customizer.clone())
550+
}
551+
546552
#[cfg(feature = "parquet")]
547553
#[async_trait]
548554
impl ReadOptions<'_> for ParquetReadOptions<'_> {
@@ -551,7 +557,8 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
551557
config: &SessionConfig,
552558
table_options: TableOptions,
553559
) -> ListingOptions {
554-
let mut file_format = ParquetFormat::new().with_options(table_options.parquet);
560+
let customizer = get_writer_properties_customizer(config);
561+
let mut file_format = ParquetFormat::new().with_options(table_options.parquet).with_customizer(customizer);
555562

556563
if let Some(parquet_pruning) = self.parquet_pruning {
557564
file_format = file_format.with_enable_pruning(parquet_pruning)

0 commit comments

Comments
 (0)