@@ -27,6 +27,7 @@ use common_pipeline_core::processors::port::InputPort;
2727use common_pipeline_core:: processors:: port:: OutputPort ;
2828use common_pipeline_core:: Pipeline ;
2929use common_pipeline_transforms:: processors:: transforms:: TransformDummy ;
30+ use common_sql:: executor:: PhysicalPlan ;
3031use common_sql:: parse_result_scan_args;
3132use common_sql:: MetadataRef ;
3233use common_storages_result_cache:: gen_result_cache_key;
@@ -72,9 +73,13 @@ impl SelectInterpreter {
7273 } )
7374 }
7475
75- pub async fn build_pipeline ( & self ) -> Result < PipelineBuildResult > {
76+ #[ inline]
77+ pub async fn build_physical_plan ( & self ) -> Result < PhysicalPlan > {
7678 let mut builder = PhysicalPlanBuilder :: new ( self . metadata . clone ( ) , self . ctx . clone ( ) ) ;
77- let physical_plan = builder. build ( & self . s_expr ) . await ?;
79+ builder. build ( & self . s_expr ) . await
80+ }
81+
82+ pub async fn build_pipeline ( & self , physical_plan : PhysicalPlan ) -> Result < PipelineBuildResult > {
7883 build_query_pipeline (
7984 & self . ctx ,
8085 & self . bind_context . columns ,
@@ -187,8 +192,8 @@ impl Interpreter for SelectInterpreter {
187192 /// The QueryPipelineBuilder will use the optimized plan to generate a Pipeline
188193 #[ tracing:: instrument( level = "debug" , name = "select_interpreter_execute" , skip( self ) , fields( ctx. id = self . ctx. get_id( ) . as_str( ) ) ) ]
189194 async fn execute2 ( & self ) -> Result < PipelineBuildResult > {
190- // 0. Need to build pipeline first to get the partitions.
191- let mut build_res = self . build_pipeline ( ) . await ?;
195+ // 0. Need to build physical plan first to get the partitions.
196+ let physical_plan = self . build_physical_plan ( ) . await ?;
192197 if self . ctx . get_settings ( ) . get_enable_query_result_cache ( ) ? && self . ctx . get_cacheable ( ) {
193198 let key = gen_result_cache_key ( self . formatted_ast . as_ref ( ) . unwrap ( ) ) ;
194199 // 1. Try to get result from cache.
@@ -211,7 +216,7 @@ impl Interpreter for SelectInterpreter {
211216 self . ctx
212217 . set_query_id_result_cache ( self . ctx . get_id ( ) , meta_key) ;
213218 }
214- return Ok ( build_res ) ;
219+ return self . build_pipeline ( physical_plan ) . await ;
215220 }
216221
217222 let cache_reader = ResultCacheReader :: create (
@@ -233,6 +238,7 @@ impl Interpreter for SelectInterpreter {
233238 return PipelineBuildResult :: from_blocks ( blocks) ;
234239 }
235240 Ok ( None ) => {
241+ let mut build_res = self . build_pipeline ( physical_plan) . await ?;
236242 // 2.2 If not found result in cache, add pipelines to write the result to cache.
237243 let schema = infer_table_schema ( & self . schema ( ) ) ?;
238244 self . add_result_cache ( & key, schema, & mut build_res. main_pipeline , kv_store) ?;
@@ -244,6 +250,7 @@ impl Interpreter for SelectInterpreter {
244250 }
245251 }
246252 }
247- Ok ( build_res)
253+ // Not use query cache.
254+ self . build_pipeline ( physical_plan) . await
248255 }
249256}
0 commit comments