@@ -44,6 +44,7 @@ use common_expression::ConstantFolder;
4444use common_expression:: Scalar ;
4545use common_functions:: scalars:: BUILTIN_FUNCTIONS ;
4646use common_meta_app:: principal:: StageFileFormatType ;
47+ use common_meta_app:: principal:: StageInfo ;
4748use common_storage:: DataOperator ;
4849use common_storage:: StageFilesInfo ;
4950use common_storages_parquet:: ParquetTable ;
@@ -369,52 +370,62 @@ impl Binder {
369370 options,
370371 alias,
371372 } => {
372- let ( stage_info, path) =
373+ let ( mut stage_info, path) =
373374 parse_file_location ( & self . ctx , location, options. connection . clone ( ) ) . await ?;
374- let file_format_options = match & options. file_format {
375- Some ( f) => self . ctx . get_file_format ( f) . await ?,
376- None => stage_info. file_format_options . clone ( ) ,
375+ if let Some ( f) = & options. file_format {
376+ stage_info. file_format_options = self . ctx . get_file_format ( f) . await ?;
377+ }
378+ let files_info = StageFilesInfo {
379+ path,
380+ pattern : options. pattern . clone ( ) ,
381+ files : options. files . clone ( ) ,
377382 } ;
378- if matches ! ( file_format_options. format, StageFileFormatType :: Parquet ) {
379- let files_info = StageFilesInfo {
380- path,
381- pattern : options. pattern . clone ( ) ,
382- files : options. files . clone ( ) ,
383- } ;
384- let read_options = ParquetReadOptions :: default ( ) ;
385-
386- let table =
387- ParquetTable :: create ( stage_info. clone ( ) , files_info, read_options) . await ?;
388-
389- let table_alias_name = if let Some ( table_alias) = alias {
390- Some (
391- normalize_identifier ( & table_alias. name , & self . name_resolution_ctx ) . name ,
392- )
393- } else {
394- None
395- } ;
396-
397- let table_index = self . metadata . write ( ) . add_table (
398- CATALOG_DEFAULT . to_string ( ) ,
399- "system" . to_string ( ) ,
400- table. clone ( ) ,
401- table_alias_name,
402- false ,
403- ) ;
383+ self . bind_stage_table ( bind_context, stage_info, files_info, alias)
384+ . await
385+ }
386+ }
387+ }
404388
405- let ( s_expr, mut bind_context) = self
406- . bind_base_table ( bind_context, "system" , table_index)
407- . await ?;
408- if let Some ( alias) = alias {
409- bind_context. apply_table_alias ( alias, & self . name_resolution_ctx ) ?;
410- }
411- Ok ( ( s_expr, bind_context) )
412- } else {
413- Err ( ErrorCode :: Unimplemented (
414- "only support parquet format for 'select from stage' for now." ,
415- ) )
416- }
389+ pub ( crate ) async fn bind_stage_table (
390+ & mut self ,
391+ bind_context : & BindContext ,
392+ stage_info : StageInfo ,
393+ files_info : StageFilesInfo ,
394+ alias : & Option < TableAlias > ,
395+ ) -> Result < ( SExpr , BindContext ) > {
396+ if matches ! (
397+ stage_info. file_format_options. format,
398+ StageFileFormatType :: Parquet
399+ ) {
400+ let read_options = ParquetReadOptions :: default ( ) ;
401+
402+ let table = ParquetTable :: create ( stage_info. clone ( ) , files_info, read_options) . await ?;
403+
404+ let table_alias_name = if let Some ( table_alias) = alias {
405+ Some ( normalize_identifier ( & table_alias. name , & self . name_resolution_ctx ) . name )
406+ } else {
407+ None
408+ } ;
409+
410+ let table_index = self . metadata . write ( ) . add_table (
411+ CATALOG_DEFAULT . to_string ( ) ,
412+ "system" . to_string ( ) ,
413+ table. clone ( ) ,
414+ table_alias_name,
415+ false ,
416+ ) ;
417+
418+ let ( s_expr, mut bind_context) = self
419+ . bind_base_table ( bind_context, "system" , table_index)
420+ . await ?;
421+ if let Some ( alias) = alias {
422+ bind_context. apply_table_alias ( alias, & self . name_resolution_ctx ) ?;
417423 }
424+ Ok ( ( s_expr, bind_context) )
425+ } else {
426+ Err ( ErrorCode :: Unimplemented (
427+ "only support parquet format for 'select from stage' for now." ,
428+ ) )
418429 }
419430 }
420431
0 commit comments