@@ -24,6 +24,7 @@ use common_exception::ErrorCode;
2424use common_exception:: Result ;
2525use common_expression:: infer_table_schema;
2626use common_expression:: DataField ;
27+ use common_expression:: DataSchemaRef ;
2728use common_expression:: DataSchemaRefExt ;
2829use common_meta_app:: principal:: StageInfo ;
2930use common_meta_app:: schema:: TableCopiedFileInfo ;
@@ -58,12 +59,7 @@ impl CopyInterpreter {
5859 Ok ( CopyInterpreter { ctx, plan } )
5960 }
6061
61- async fn build_copy_into_stage_pipeline (
62- & self ,
63- stage : & StageInfo ,
64- path : & str ,
65- query : & Plan ,
66- ) -> Result < PipelineBuildResult > {
62+ async fn build_query ( & self , query : & Plan ) -> Result < ( PipelineBuildResult , DataSchemaRef ) > {
6763 let ( s_expr, metadata, bind_context, formatted_ast) = match query {
6864 Plan :: Query {
6965 s_expr,
@@ -97,6 +93,17 @@ impl CopyInterpreter {
9793 } )
9894 . collect ( ) ;
9995 let data_schema = DataSchemaRefExt :: create ( fields) ;
96+ let build_res = select_interpreter. build_pipeline ( ) . await ?;
97+ Ok ( ( build_res, data_schema) )
98+ }
99+
100+ async fn build_copy_into_stage_pipeline (
101+ & self ,
102+ stage : & StageInfo ,
103+ path : & str ,
104+ query : & Plan ,
105+ ) -> Result < PipelineBuildResult > {
106+ let ( mut build_res, data_schema) = self . build_query ( query) . await ?;
100107 let table_schema = infer_table_schema ( & data_schema) ?;
101108 let stage_table_info = StageTableInfo {
102109 schema : table_schema,
@@ -108,14 +115,11 @@ impl CopyInterpreter {
108115 } ,
109116 files_to_copy : None ,
110117 } ;
111-
112- let mut build_res = select_interpreter. execute2 ( ) . await ?;
113118 let table = StageTable :: try_create ( stage_table_info) ?;
114-
115119 append2table (
116120 self . ctx . clone ( ) ,
117- table. clone ( ) ,
118- data_schema. clone ( ) ,
121+ table,
122+ data_schema,
119123 & mut build_res,
120124 false ,
121125 true ,
0 commit comments