@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
55use std:: process:: Command ;
66use std:: sync:: { Arc , LazyLock } ;
77
8+ use arrow_schema:: { DataType , Field , Schema } ;
89use async_trait:: async_trait;
910use clap:: ValueEnum ;
1011use datafusion:: datasource:: file_format:: FileFormat ;
@@ -14,7 +15,7 @@ use datafusion::datasource::listing::{
1415 ListingOptions , ListingTable , ListingTableConfig , ListingTableUrl ,
1516} ;
1617use datafusion:: prelude:: SessionContext ;
17- use datafusion_common:: { Result , TableReference } ;
18+ use datafusion_common:: { DFSchema , Result , TableReference } ;
1819use futures:: future:: join_all;
1920use futures:: { StreamExt , TryStreamExt , stream} ;
2021use humansize:: { DECIMAL , format_size} ;
@@ -360,10 +361,12 @@ impl PBIData {
360361 session. sql ( create_table) . await ?;
361362 let table_ref = TableReference :: bare ( & * table. name ) ;
362363 let df_table = session. table ( table_ref. clone ( ) ) . await ?;
364+ let schema = replace_with_views ( df_table. schema ( ) ) ?;
363365
364366 // drop the temp table after getting the arrow schema.
365- let var_name = format ! ( "DROP TABLE '{}';" , & table. name) ;
366- session. sql ( & var_name) . await ?;
367+ session
368+ . sql ( & format ! ( "DROP TABLE '{}';" , & table. name) )
369+ . await ?;
367370
368371 let df_format: Arc < dyn FileFormat > = match file_type {
369372 FileType :: Csv => Arc :: new (
@@ -380,7 +383,7 @@ impl PBIData {
380383 let table_url = ListingTableUrl :: parse ( path. to_str ( ) . expect ( "unicode" ) ) ?;
381384 let config = ListingTableConfig :: new ( table_url)
382385 . with_listing_options ( ListingOptions :: new ( df_format) )
383- . with_schema ( df_table . schema ( ) . clone ( ) . into ( ) ) ;
386+ . with_schema ( schema. into ( ) ) ;
384387
385388 let listing_table = Arc :: new ( ListingTable :: try_new ( config) ?) ;
386389 session. register_table ( table_ref, listing_table) ?;
@@ -423,6 +426,30 @@ impl Dataset for PBIBenchmark {
423426 }
424427}
425428
429+ fn replace_with_views ( schema : & DFSchema ) -> Result < DFSchema > {
430+ let fields: Vec < _ > = schema
431+ . fields ( )
432+ . iter ( )
433+ . map ( |f| match f. data_type ( ) {
434+ DataType :: Binary => {
435+ Arc :: new ( <Field as Clone >:: clone ( f) . with_data_type ( DataType :: BinaryView ) )
436+ }
437+ DataType :: Utf8 => {
438+ Arc :: new ( <Field as Clone >:: clone ( f) . with_data_type ( DataType :: Utf8View ) )
439+ }
440+ _ => f. clone ( ) ,
441+ } )
442+ . collect ( ) ;
443+ let new_schema = Schema :: new ( fields) ;
444+ let qualifiers = schema
445+ . iter ( )
446+ . map ( |( qualifier, _) | qualifier. cloned ( ) )
447+ . collect ( ) ;
448+ let df_schema =
449+ DFSchema :: from_field_specific_qualified_schema ( qualifiers, & Arc :: new ( new_schema) ) ?;
450+ df_schema. with_functional_dependencies ( schema. functional_dependencies ( ) . clone ( ) )
451+ }
452+
426453pub fn replace_decimals ( create_table_sql : & str ) -> Cow < ' _ , str > {
427454 // replace unsupported decimal types with doubles
428455 let decimal_regex = Regex :: new ( r"(?i)DECIMAL\(\s*\d+\s*(?:,\s*\d+\s*)?\)|\bDECIMAL\b" ) . unwrap ( ) ;
0 commit comments