Skip to content

Commit c2f5a11

Browse files
committed
feat(cube): Customizer interfaces
1 parent 57a3c63 commit c2f5a11

File tree

14 files changed

+315
-33
lines changed

14 files changed

+315
-33
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,8 +612,12 @@ impl AsyncFileReader for ParquetReaderWithCache {
612612

613613
fn get_metadata(
614614
&mut self,
615+
encryption_config: &Option<
616+
datafusion::parquet::file::encryption::ParquetEncryptionConfig,
617+
>,
615618
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
616619
println!("get_metadata: {} returning cached metadata", self.filename);
620+
assert!(encryption_config.is_none());
617621

618622
// return the cached metadata so the parquet reader does not read it
619623
let metadata = self.metadata.clone();

datafusion/common/src/file_options/mod.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ mod tests {
3333
use super::parquet_writer::ParquetWriterOptions;
3434
use crate::{
3535
config::{ConfigFileType, TableOptions},
36-
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
36+
file_options::{
37+
csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions,
38+
parquet_writer::WriterPropertiesConfig,
39+
},
3740
parsers::CompressionTypeVariant,
3841
Result,
3942
};
@@ -79,7 +82,10 @@ mod tests {
7982
table_config.set_config_format(ConfigFileType::PARQUET);
8083
table_config.alter_with_string_hash_map(&option_map)?;
8184

82-
let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
85+
let parquet_options = ParquetWriterOptions::from_table_parquet_options(
86+
&table_config.parquet,
87+
WriterPropertiesConfig::noop().as_ref(),
88+
)?;
8389
let properties = parquet_options.writer_options();
8490

8591
// Verify the expected options propagated down to parquet crate WriterProperties struct
@@ -184,7 +190,10 @@ mod tests {
184190
table_config.set_config_format(ConfigFileType::PARQUET);
185191
table_config.alter_with_string_hash_map(&option_map)?;
186192

187-
let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
193+
let parquet_options = ParquetWriterOptions::from_table_parquet_options(
194+
&table_config.parquet,
195+
WriterPropertiesConfig::noop().as_ref(),
196+
)?;
188197
let properties = parquet_options.writer_options();
189198

190199
let col1 = ColumnPath::from(vec!["col1".to_owned()]);

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! Options related to how parquet files should be written
1919
20+
use std::sync::Arc;
21+
2022
use crate::{
2123
config::{ParquetOptions, TableParquetOptions},
2224
DataFusionError, Result,
@@ -51,18 +53,53 @@ impl ParquetWriterOptions {
5153
}
5254
}
5355

54-
impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
55-
type Error = DataFusionError;
56-
57-
fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
56+
impl ParquetWriterOptions {
57+
pub fn from_table_parquet_options(
58+
parquet_table_options: &TableParquetOptions,
59+
customizer: &dyn WriterPropertiesCustomizer,
60+
) -> Result<Self> {
5861
// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
62+
let mut builder = WriterPropertiesBuilder::try_from(parquet_table_options)?;
63+
builder = customizer.adjust_write_properties(builder);
64+
5965
Ok(ParquetWriterOptions {
60-
writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
61-
.build(),
66+
writer_options: builder.build(),
6267
})
6368
}
6469
}
6570

71+
pub struct WriterPropertiesConfig {
72+
pub customizer: Arc<dyn WriterPropertiesCustomizer>,
73+
}
74+
75+
impl WriterPropertiesConfig {
76+
pub fn noop() -> Arc<dyn WriterPropertiesCustomizer> {
77+
Arc::new(NoopWriterPropertiesCustomizer {})
78+
}
79+
}
80+
81+
pub trait WriterPropertiesCustomizer: Sync + Send + std::fmt::Debug {
82+
fn adjust_write_properties(
83+
&self,
84+
builder: WriterPropertiesBuilder,
85+
) -> WriterPropertiesBuilder;
86+
fn allow_single_file_parallelism(&self) -> bool {
87+
true
88+
}
89+
}
90+
91+
#[derive(Debug)]
92+
pub struct NoopWriterPropertiesCustomizer;
93+
94+
impl WriterPropertiesCustomizer for NoopWriterPropertiesCustomizer {
95+
fn adjust_write_properties(
96+
&self,
97+
builder: WriterPropertiesBuilder,
98+
) -> WriterPropertiesBuilder {
99+
builder
100+
}
101+
}
102+
66103
impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
67104
type Error = DataFusionError;
68105

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ use crate::execution::context::{SessionConfig, SessionState};
3434

3535
use arrow::datatypes::{DataType, Schema, SchemaRef};
3636
use datafusion_common::config::TableOptions;
37+
use datafusion_common::file_options::parquet_writer::{
38+
WriterPropertiesConfig, WriterPropertiesCustomizer,
39+
};
3740
use datafusion_common::{
3841
DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
3942
DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
@@ -543,6 +546,18 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
543546
}
544547
}
545548

549+
/// Retrieves `WriterPropertiesConfig` from the `SessionConfig::extensions` field, constructing a Noop customizer if not present.
550+
pub fn get_writer_properties_customizer(
551+
config: &SessionConfig,
552+
) -> Arc<dyn WriterPropertiesCustomizer> {
553+
config
554+
.get_extension::<WriterPropertiesConfig>()
555+
.map_or_else(
556+
|| WriterPropertiesConfig::noop(),
557+
|cfg| cfg.customizer.clone(),
558+
)
559+
}
560+
546561
#[cfg(feature = "parquet")]
547562
#[async_trait]
548563
impl ReadOptions<'_> for ParquetReadOptions<'_> {
@@ -551,7 +566,10 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
551566
config: &SessionConfig,
552567
table_options: TableOptions,
553568
) -> ListingOptions {
554-
let mut file_format = ParquetFormat::new().with_options(table_options.parquet);
569+
let customizer = get_writer_properties_customizer(config);
570+
let mut file_format = ParquetFormat::new()
571+
.with_options(table_options.parquet)
572+
.with_customizer(customizer);
555573

556574
if let Some(parquet_pruning) = self.parquet_pruning {
557575
file_format = file_format.with_enable_pruning(parquet_pruning)

0 commit comments

Comments
 (0)