@@ -44,7 +44,9 @@ use crate::physical_plan::{
4444
4545use arrow:: compute:: sum;
4646use datafusion_common:: config:: { ConfigField , ConfigFileType , TableParquetOptions } ;
47- use datafusion_common:: file_options:: parquet_writer:: { ParquetWriterOptions , WriterPropertiesConfig , WriterPropertiesCustomizer } ;
47+ use datafusion_common:: file_options:: parquet_writer:: {
48+ ParquetWriterOptions , WriterPropertiesConfig , WriterPropertiesCustomizer ,
49+ } ;
4850use datafusion_common:: parsers:: CompressionTypeVariant ;
4951use datafusion_common:: stats:: Precision ;
5052use datafusion_common:: {
@@ -80,7 +82,9 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
8082use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
8183use tokio:: task:: JoinSet ;
8284
83- use crate :: datasource:: physical_plan:: parquet:: { get_reader_options_config_or_default, MetadataFetcher , ParquetExecBuilder } ;
85+ use crate :: datasource:: physical_plan:: parquet:: {
86+ get_reader_options_config_or_default, MetadataFetcher , ParquetExecBuilder ,
87+ } ;
8488use datafusion_physical_expr_common:: sort_expr:: LexRequirement ;
8589use futures:: { StreamExt , TryStreamExt } ;
8690use object_store:: path:: Path ;
@@ -141,7 +145,9 @@ impl FileFormatFactory for ParquetFormatFactory {
141145 let customizer = get_writer_properties_customizer ( state. config ( ) ) ;
142146
143147 Ok ( Arc :: new (
144- ParquetFormat :: new ( ) . with_options ( parquet_options) . with_customizer ( customizer) ,
148+ ParquetFormat :: new ( )
149+ . with_options ( parquet_options)
150+ . with_customizer ( customizer) ,
145151 ) )
146152 }
147153
@@ -245,7 +251,10 @@ impl ParquetFormat {
245251 }
246252
247253 /// Set WriterPropertiesCustomizer for the ParquetFormat
248- pub fn with_customizer ( mut self , customizer : Arc < dyn WriterPropertiesCustomizer > ) -> Self {
254+ pub fn with_customizer (
255+ mut self ,
256+ customizer : Arc < dyn WriterPropertiesCustomizer > ,
257+ ) -> Self {
249258 self . customizer = customizer;
250259 self
251260 }
@@ -434,7 +443,11 @@ impl FileFormat for ParquetFormat {
434443 }
435444
436445 let sink_schema = conf. output_schema ( ) . clone ( ) ;
437- let sink = Arc :: new ( ParquetSink :: new ( conf, self . options . clone ( ) , self . customizer . clone ( ) ) ) ;
446+ let sink = Arc :: new ( ParquetSink :: new (
447+ conf,
448+ self . options . clone ( ) ,
449+ self . customizer . clone ( ) ,
450+ ) ) ;
438451
439452 Ok ( Arc :: new ( DataSinkExec :: new (
440453 input,
@@ -512,15 +525,16 @@ pub async fn fetch_parquet_metadata(
512525 }
513526}
514527
515-
516528/// Read and parse the schema of the Parquet file at location `path`
517529async fn fetch_schema (
518530 store : & dyn ObjectStore ,
519531 file : & ObjectMeta ,
520532 metadata_size_hint : Option < usize > ,
521533 metadata_fetcher : & dyn MetadataFetcher ,
522534) -> Result < Schema > {
523- let metadata = metadata_fetcher. fetch_metadata ( store, file, metadata_size_hint) . await ?;
535+ let metadata = metadata_fetcher
536+ . fetch_metadata ( store, file, metadata_size_hint)
537+ . await ?;
524538 let file_metadata = metadata. file_metadata ( ) ;
525539 let schema = parquet_to_arrow_schema (
526540 file_metadata. schema_descr ( ) ,
@@ -539,7 +553,9 @@ async fn fetch_statistics(
539553 metadata_size_hint : Option < usize > ,
540554 metadata_fetcher : & dyn MetadataFetcher ,
541555) -> Result < Statistics > {
542- let metadata = metadata_fetcher. fetch_metadata ( store, file, metadata_size_hint) . await ?;
556+ let metadata = metadata_fetcher
557+ . fetch_metadata ( store, file, metadata_size_hint)
558+ . await ?;
543559 statistics_from_parquet_meta_calc ( & metadata, table_schema)
544560}
545561
@@ -703,7 +719,11 @@ impl DisplayAs for ParquetSink {
703719
704720impl ParquetSink {
705721 /// Create from config.
706- pub fn new ( config : FileSinkConfig , parquet_options : TableParquetOptions , customizer : Arc < dyn WriterPropertiesCustomizer > ) -> Self {
722+ pub fn new (
723+ config : FileSinkConfig ,
724+ parquet_options : TableParquetOptions ,
725+ customizer : Arc < dyn WriterPropertiesCustomizer > ,
726+ ) -> Self {
707727 Self {
708728 config,
709729 parquet_options,
@@ -788,15 +808,19 @@ impl DataSink for ParquetSink {
788808 data : SendableRecordBatchStream ,
789809 context : & Arc < TaskContext > ,
790810 ) -> Result < u64 > {
791- let parquet_props = ParquetWriterOptions :: from_table_parquet_options ( & self . parquet_options , self . customizer . as_ref ( ) ) ?;
811+ let parquet_props = ParquetWriterOptions :: from_table_parquet_options (
812+ & self . parquet_options ,
813+ self . customizer . as_ref ( ) ,
814+ ) ?;
792815
793816 let object_store = context
794817 . runtime_env ( )
795818 . object_store ( & self . config . object_store_url ) ?;
796819
797820 let parquet_opts = & self . parquet_options ;
798821 let allow_single_file_parallelism =
799- parquet_opts. global . allow_single_file_parallelism && self . customizer . allow_single_file_parallelism ( ) ;
822+ parquet_opts. global . allow_single_file_parallelism
823+ && self . customizer . allow_single_file_parallelism ( ) ;
800824
801825 let part_col = if !self . config . table_partition_cols . is_empty ( ) {
802826 Some ( self . config . table_partition_cols . clone ( ) )
@@ -1360,16 +1384,29 @@ mod tests {
13601384 let format = ParquetFormat :: default ( ) . with_force_view_types ( force_views) ;
13611385 let schema = format. infer_schema ( & ctx, & store, & meta) . await . unwrap ( ) ;
13621386
1363- let stats =
1364- fetch_statistics ( store. as_ref ( ) , schema. clone ( ) , & meta[ 0 ] , None , & DefaultMetadataFetcher { } ) . await ?;
1387+ let stats = fetch_statistics (
1388+ store. as_ref ( ) ,
1389+ schema. clone ( ) ,
1390+ & meta[ 0 ] ,
1391+ None ,
1392+ & DefaultMetadataFetcher { } ,
1393+ )
1394+ . await ?;
13651395
13661396 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
13671397 let c1_stats = & stats. column_statistics [ 0 ] ;
13681398 let c2_stats = & stats. column_statistics [ 1 ] ;
13691399 assert_eq ! ( c1_stats. null_count, Precision :: Exact ( 1 ) ) ;
13701400 assert_eq ! ( c2_stats. null_count, Precision :: Exact ( 3 ) ) ;
13711401
1372- let stats = fetch_statistics ( store. as_ref ( ) , schema, & meta[ 1 ] , None , & DefaultMetadataFetcher { } ) . await ?;
1402+ let stats = fetch_statistics (
1403+ store. as_ref ( ) ,
1404+ schema,
1405+ & meta[ 1 ] ,
1406+ None ,
1407+ & DefaultMetadataFetcher { } ,
1408+ )
1409+ . await ?;
13731410 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
13741411 let c1_stats = & stats. column_statistics [ 0 ] ;
13751412 let c2_stats = & stats. column_statistics [ 1 ] ;
@@ -1561,9 +1598,14 @@ mod tests {
15611598 . await
15621599 . unwrap ( ) ;
15631600
1564- let stats =
1565- fetch_statistics ( store. upcast ( ) . as_ref ( ) , schema. clone ( ) , & meta[ 0 ] , Some ( 9 ) , & DefaultMetadataFetcher { } )
1566- . await ?;
1601+ let stats = fetch_statistics (
1602+ store. upcast ( ) . as_ref ( ) ,
1603+ schema. clone ( ) ,
1604+ & meta[ 0 ] ,
1605+ Some ( 9 ) ,
1606+ & DefaultMetadataFetcher { } ,
1607+ )
1608+ . await ?;
15671609
15681610 assert_eq ! ( stats. num_rows, Precision :: Exact ( 3 ) ) ;
15691611 let c1_stats = & stats. column_statistics [ 0 ] ;
@@ -1597,7 +1639,7 @@ mod tests {
15971639 schema. clone ( ) ,
15981640 & meta[ 0 ] ,
15991641 Some ( size_hint) ,
1600- & DefaultMetadataFetcher { } ,
1642+ & DefaultMetadataFetcher { } ,
16011643 )
16021644 . await ?;
16031645
@@ -2288,7 +2330,8 @@ mod tests {
22882330 overwrite : true ,
22892331 keep_partition_by_columns : false ,
22902332 } ;
2291- let customizer: Arc < dyn WriterPropertiesCustomizer > = WriterPropertiesConfig :: noop ( ) ;
2333+ let customizer: Arc < dyn WriterPropertiesCustomizer > =
2334+ WriterPropertiesConfig :: noop ( ) ;
22922335 let parquet_sink = Arc :: new ( ParquetSink :: new (
22932336 file_sink_config,
22942337 TableParquetOptions {
@@ -2385,7 +2428,8 @@ mod tests {
23852428 overwrite : true ,
23862429 keep_partition_by_columns : false ,
23872430 } ;
2388- let customizer: Arc < dyn WriterPropertiesCustomizer > = WriterPropertiesConfig :: noop ( ) ;
2431+ let customizer: Arc < dyn WriterPropertiesCustomizer > =
2432+ WriterPropertiesConfig :: noop ( ) ;
23892433 let parquet_sink = Arc :: new ( ParquetSink :: new (
23902434 file_sink_config,
23912435 TableParquetOptions :: default ( ) ,
@@ -2470,7 +2514,8 @@ mod tests {
24702514 overwrite : true ,
24712515 keep_partition_by_columns : false ,
24722516 } ;
2473- let customizer: Arc < dyn WriterPropertiesCustomizer > = WriterPropertiesConfig :: noop ( ) ;
2517+ let customizer: Arc < dyn WriterPropertiesCustomizer > =
2518+ WriterPropertiesConfig :: noop ( ) ;
24742519 let parquet_sink = Arc :: new ( ParquetSink :: new (
24752520 file_sink_config,
24762521 TableParquetOptions {
0 commit comments