@@ -18,7 +18,7 @@ use datafusion::{
1818 datasource:: physical_plan:: parquet:: { ParquetAccessPlan , RowGroupAccess } ,
1919 datasource:: physical_plan:: ParquetSource ,
2020 execution:: cache:: cache_manager:: CacheManagerConfig ,
21- execution:: cache:: cache_unit :: DefaultListFilesCache ,
21+ execution:: cache:: DefaultListFilesCache ,
2222 execution:: cache:: CacheAccessor ,
2323 execution:: context:: SessionContext ,
2424 execution:: runtime_env:: RuntimeEnvBuilder ,
@@ -31,6 +31,7 @@ use datafusion_datasource::PartitionedFile;
3131use datafusion_datasource:: file_groups:: FileGroup ;
3232use datafusion_datasource:: file_scan_config:: FileScanConfigBuilder ;
3333use datafusion_datasource:: source:: DataSourceExec ;
34+ use datafusion_datasource:: TableSchema ;
3435use datafusion_substrait:: logical_plan:: consumer:: from_substrait_plan;
3536use datafusion_substrait:: substrait:: proto:: { Plan , extensions:: simple_extension_declaration:: MappingType } ;
3637use object_store:: ObjectMeta ;
@@ -121,7 +122,11 @@ pub async fn execute_query_with_cross_rt_stream(
121122 ) ;
122123
123124 let list_file_cache = Arc :: new ( DefaultListFilesCache :: default ( ) ) ;
124- list_file_cache. put ( table_path. prefix ( ) , object_meta) ;
125+ let table_scoped_path = datafusion:: execution:: cache:: TableScopedPath {
126+ table : None ,
127+ path : table_path. prefix ( ) . clone ( ) ,
128+ } ;
129+ list_file_cache. put ( & table_scoped_path, object_meta) ;
125130
126131 let runtimeEnv = & runtime. runtime_env ;
127132
@@ -361,15 +366,18 @@ pub async fn execute_fetch_phase(
361366 ) ;
362367
363368 let list_file_cache = Arc :: new ( DefaultListFilesCache :: default ( ) ) ;
364- list_file_cache. put ( table_path. prefix ( ) , object_meta) ;
369+ let table_scoped_path = datafusion:: execution:: cache:: TableScopedPath {
370+ table : None ,
371+ path : table_path. prefix ( ) . clone ( ) ,
372+ } ;
373+ list_file_cache. put ( & table_scoped_path, object_meta) ;
365374
366375 let runtime_env = RuntimeEnvBuilder :: new ( )
367376 . with_cache_manager (
368377 CacheManagerConfig :: default ( ) . with_list_files_cache ( Some ( list_file_cache) )
369- . with_metadata_cache_limit ( runtime. runtime_env . cache_manager . get_file_metadata_cache ( ) . cache_limit ( ) )
378+ . with_metadata_cache_limit ( runtime. runtime_env . cache_manager . get_file_metadata_cache ( ) . cache_limit ( ) )
370379 . with_file_metadata_cache ( Some ( runtime. runtime_env . cache_manager . get_file_metadata_cache ( ) . clone ( ) ) )
371380 . with_files_statistics_cache ( runtime. runtime_env . cache_manager . get_file_statistic_cache ( ) ) ,
372-
373381 )
374382 . build ( ) ?;
375383
@@ -414,7 +422,12 @@ pub async fn execute_fetch_phase(
414422 . collect ( ) ;
415423
416424 let file_group = FileGroup :: new ( partitioned_files) ;
417- let file_source = Arc :: new ( ParquetSource :: default ( ) ) ;
425+
426+ let table_schema = datafusion_datasource:: table_schema:: TableSchema :: new (
427+ parquet_schema. clone ( ) ,
428+ vec ! [ Arc :: new( Field :: new( ROW_BASE_FIELD_NAME , DataType :: Int64 , false ) ) ] ,
429+ ) ;
430+ let file_source = Arc :: new ( ParquetSource :: new ( table_schema) ) ;
418431
419432 let mut projection_index = vec ! [ ] ;
420433 for field_name in projections. iter ( ) {
@@ -435,17 +448,15 @@ pub async fn execute_fetch_phase(
435448
436449 let file_scan_config = FileScanConfigBuilder :: new (
437450 ObjectStoreUrl :: local_filesystem ( ) ,
438- parquet_schema. clone ( ) ,
439451 file_source,
440452 )
441- . with_table_partition_cols ( vec ! [ Field :: new( ROW_BASE_FIELD_NAME , DataType :: Int64 , false ) ] )
442- . with_projection_indices ( Some ( projection_index. clone ( ) ) )
453+ . with_projection_indices ( Some ( projection_index. clone ( ) ) ) ?
443454 . with_file_group ( file_group)
444455 . build ( ) ;
445456
446457 let parquet_exec = DataSourceExec :: from_data_source ( file_scan_config. clone ( ) ) ;
447458
448- let projection_exprs = build_projection_exprs ( file_scan_config. projected_schema ( ) )
459+ let projection_exprs = build_projection_exprs ( file_scan_config. projected_schema ( ) ? )
449460 . expect ( "Failed to build projection expressions" ) ;
450461
451462 let projection_exec = Arc :: new ( ProjectionExec :: try_new ( projection_exprs, parquet_exec)
0 commit comments