22//
33// SPDX-License-Identifier: Apache-2.0
44
5- use std:: { collections:: HashMap , sync:: Arc } ;
6-
75use arrow:: datatypes:: UInt32Type ;
8- use arrow_array:: { RecordBatch , UInt32Array } ;
6+ use arrow_array:: { Array , RecordBatch , UInt32Array } ;
97use arrow_buffer:: i256;
108use arrow_schema:: { ArrowError , DataType , Field , Schema , SchemaBuilder , SchemaRef , TimeUnit } ;
119use chrono:: { DateTime , Duration } ;
@@ -23,9 +21,13 @@ use datafusion::{
2321} ;
2422use datafusion_common:: DataFusionError :: { External , Internal } ;
2523use datafusion_common:: { cast:: as_primitive_array, DFSchema , DataFusionError , Result , ScalarValue } ;
24+ use std:: iter:: zip;
25+ use std:: { collections:: HashMap , sync:: Arc } ;
2626
2727use datafusion_substrait:: substrait:: proto:: Plan ;
28+ use futures:: { StreamExt , TryStreamExt } ;
2829use object_store:: path:: Path ;
30+ use object_store:: ObjectMeta ;
2931use parquet:: format:: FileMetaData ;
3032use proto:: proto:: entity:: JniWrapper ;
3133use rand:: distributions:: DistString ;
@@ -94,7 +96,7 @@ fn range_partition_to_partition_cols(
9496pub fn get_columnar_values (
9597 batch : & RecordBatch ,
9698 range_partitions : Arc < Vec < String > > ,
97- ) -> datafusion :: error :: Result < Vec < ( String , ScalarValue ) > > {
99+ ) -> Result < Vec < ( String , ScalarValue ) > > {
98100 range_partitions
99101 . iter ( )
100102 . map ( |range_col| {
@@ -104,12 +106,10 @@ pub fn get_columnar_values(
104106 Err ( e) => Err ( e) ,
105107 }
106108 } else {
107- Err ( datafusion:: error:: DataFusionError :: External (
108- format ! ( "Invalid partition desc of {}" , range_col) . into ( ) ,
109- ) )
109+ Err ( External ( format ! ( "Invalid partition desc of {}" , range_col) . into ( ) ) )
110110 }
111111 } )
112- . collect :: < datafusion :: error :: Result < Vec < _ > > > ( )
112+ . collect :: < Result < Vec < _ > > > ( )
113113}
114114
115115pub fn format_scalar_value ( v : & ScalarValue ) -> String {
@@ -310,11 +310,7 @@ pub fn partition_desc_to_scalar_values(schema: SchemaRef, partition_desc: String
310310 Some ( ( name, val) ) => {
311311 part_values. push ( ( name, val) ) ;
312312 }
313- _ => {
314- return Err ( datafusion:: error:: DataFusionError :: External (
315- format ! ( "Invalid partition_desc: {}" , partition_desc) . into ( ) ,
316- ) )
317- }
313+ _ => return Err ( External ( format ! ( "Invalid partition_desc: {}" , partition_desc) . into ( ) ) ) ,
318314 }
319315 }
320316 let mut scalar_values = Vec :: with_capacity ( schema. fields ( ) . len ( ) ) ;
@@ -350,9 +346,7 @@ pub fn partition_desc_from_file_scan_config(conf: &FileScanConfig) -> Result<(St
350346 . map ( |( idx, col) | ( col. name ( ) . clone ( ) , file. partition_values [ idx] . to_string ( ) ) ) ,
351347 ) ,
352348 ) ) ,
353- None => Err ( DataFusionError :: External (
354- format ! ( "Invalid file_group {:?}" , conf. file_groups) . into ( ) ,
355- ) ) ,
349+ None => Err ( External ( format ! ( "Invalid file_group {:?}" , conf. file_groups) . into ( ) ) ) ,
356350 }
357351 }
358352}
@@ -371,8 +365,19 @@ pub async fn listing_table_from_lakesoul_io_config(
371365 . iter ( )
372366 . map ( ListingTableUrl :: parse)
373367 . collect :: < Result < Vec < _ > > > ( ) ?;
368+ let object_metas = get_file_object_meta ( session_state, & table_paths) . await ?;
369+ let ( table_paths, object_metas) : ( Vec < _ > , Vec < _ > ) = zip ( table_paths, object_metas)
370+ . filter ( |( _, obj_meta) | {
371+ let valid = obj_meta. size >= 8 ;
372+ if !valid {
373+ println ! ( "File {}, size {}, is invalid" , obj_meta. location, obj_meta. size) ;
374+ }
375+ valid
376+ } )
377+ . unzip ( ) ;
374378 // Resolve the schema
375- let resolved_schema = infer_schema ( session_state, & table_paths, Arc :: clone ( & file_format) ) . await ?;
379+ let resolved_schema =
380+ infer_schema ( session_state, & table_paths, & object_metas, Arc :: clone ( & file_format) ) . await ?;
376381
377382 let target_schema = if lakesoul_io_config. inferring_schema {
378383 SchemaRef :: new ( Schema :: empty ( ) )
@@ -419,31 +424,42 @@ pub async fn listing_table_from_lakesoul_io_config(
419424 Ok ( ( config. file_schema . clone ( ) , Arc :: new ( ListingTable :: try_new ( config) ?) ) )
420425}
421426
427+ pub async fn get_file_object_meta ( sc : & SessionState , table_paths : & [ ListingTableUrl ] ) -> Result < Vec < ObjectMeta > > {
428+ let object_store_url = table_paths
429+ . first ( )
430+ . ok_or ( Internal ( "no table path" . to_string ( ) ) ) ?
431+ . object_store ( ) ;
432+ let store = sc. runtime_env ( ) . object_store ( object_store_url. clone ( ) ) ?;
433+ futures:: stream:: iter ( table_paths)
434+ . map ( |path| {
435+ let store = store. clone ( ) ;
436+ async move {
437+ let path = Path :: from_url_path ( <ListingTableUrl as AsRef < Url > >:: as_ref ( path) . path ( ) )
438+ . map_err ( object_store:: Error :: from) ?;
439+ store. head ( & path) . await
440+ }
441+ } )
442+ . boxed ( )
443+ . buffered ( sc. config_options ( ) . execution . meta_fetch_concurrency )
444+ . try_collect ( )
445+ . await
446+ . map_err ( DataFusionError :: from)
447+ }
448+
422449pub async fn infer_schema (
423450 sc : & SessionState ,
424451 table_paths : & [ ListingTableUrl ] ,
452+ object_metas : & [ ObjectMeta ] ,
425453 file_format : Arc < dyn FileFormat > ,
426454) -> Result < SchemaRef > {
427- // Create default parquet options
428455 let object_store_url = table_paths
429456 . first ( )
430- . ok_or ( DataFusionError :: Internal ( "no table path" . to_string ( ) ) ) ?
457+ . ok_or ( Internal ( "no table path" . to_string ( ) ) ) ?
431458 . object_store ( ) ;
432459 let store = sc. runtime_env ( ) . object_store ( object_store_url. clone ( ) ) ?;
433- let mut objects = vec ! [ ] ;
434-
435- for url in table_paths {
436- objects. push (
437- store
438- . head ( & Path :: from_url_path (
439- <ListingTableUrl as AsRef < Url > >:: as_ref ( url) . path ( ) ,
440- ) ?)
441- . await ?,
442- ) ;
443- }
444460
445461 // Resolve the schema
446- file_format. infer_schema ( sc, & store, & objects ) . await
462+ file_format. infer_schema ( sc, & store, & object_metas ) . await
447463}
448464
449465pub fn apply_partition_filter ( wrapper : JniWrapper , schema : SchemaRef , filter : Plan ) -> Result < JniWrapper > {
@@ -550,7 +566,7 @@ pub fn get_batch_memory_size(batch: &RecordBatch) -> Result<usize> {
550566}
551567
552568pub fn get_file_size ( metadata : & FileMetaData ) -> usize {
553- let footer_size= metadata. footer_signing_key_metadata . as_ref ( ) . map_or ( 0 , |f| f. len ( ) ) ;
569+ let footer_size = metadata. footer_signing_key_metadata . as_ref ( ) . map_or ( 0 , |f| f. len ( ) ) ;
554570 dbg ! ( & metadata) ;
555571 let rg_size = metadata
556572 . row_groups
@@ -567,4 +583,4 @@ pub fn get_file_exist_col(metadata: &FileMetaData) -> String {
567583 . map ( |schema_element| schema_element. name . clone ( ) )
568584 . collect :: < Vec < _ > > ( )
569585 . join ( "," )
570- }
586+ }
0 commit comments