11use crate :: compile:: engine:: df:: planner:: CubeQueryPlanner ;
2- use std:: { backtrace:: Backtrace , collections:: HashMap , future:: Future , pin:: Pin , sync:: Arc } ;
2+ use std:: {
3+ backtrace:: Backtrace , collections:: HashMap , future:: Future , pin:: Pin , sync:: Arc ,
4+ time:: SystemTime ,
5+ } ;
36
47use crate :: {
58 compile:: {
@@ -21,8 +24,9 @@ use crate::{
2124 } ,
2225 config:: ConfigObj ,
2326 sql:: {
24- compiler_cache:: CompilerCache , statement:: SensitiveDataSanitizer , SessionManager ,
25- SessionState ,
27+ compiler_cache:: { CompilerCache , CompilerCacheEntry } ,
28+ statement:: SensitiveDataSanitizer ,
29+ SessionManager , SessionState ,
2630 } ,
2731 transport:: { LoadRequestMeta , MetaContext , SpanId , TransportService } ,
2832 CubeErrorCauseType ,
@@ -78,6 +82,11 @@ pub trait QueryEngine {
7882
7983 fn sanitize_statement ( & self , stmt : & Self :: AstStatementType ) -> Self :: AstStatementType ;
8084
85+ async fn get_cache_entry (
86+ & self ,
87+ state : Arc < SessionState > ,
88+ ) -> Result < Arc < CompilerCacheEntry > , CompilationError > ;
89+
8190 async fn plan (
8291 & self ,
8392 stmt : Self :: AstStatementType ,
@@ -86,6 +95,26 @@ pub trait QueryEngine {
8695 meta : Arc < MetaContext > ,
8796 state : Arc < SessionState > ,
8897 ) -> CompilationResult < ( QueryPlan , Self :: PlanMetadataType ) > {
98+ let cache_entry = self . get_cache_entry ( state. clone ( ) ) . await ?;
99+
100+ let planning_start = SystemTime :: now ( ) ;
101+ if let Some ( span_id) = span_id. as_ref ( ) {
102+ if let Some ( auth_context) = state. auth_context ( ) {
103+ self . transport_ref ( )
104+ . log_load_state (
105+ Some ( span_id. clone ( ) ) ,
106+ auth_context,
107+ state. get_load_request_meta ( ) ,
108+ "SQL API Query Planning" . to_string ( ) ,
109+ serde_json:: json!( {
110+ "query" : span_id. query_key. clone( ) ,
111+ } ) ,
112+ )
113+ . await
114+ . map_err ( |e| CompilationError :: internal ( e. to_string ( ) ) ) ?;
115+ }
116+ }
117+
89118 let ctx = self . create_session_ctx ( state. clone ( ) ) ?;
90119 let cube_ctx = self . create_cube_ctx ( state. clone ( ) , meta. clone ( ) , ctx. clone ( ) ) ?;
91120
@@ -144,7 +173,7 @@ pub trait QueryEngine {
144173 let mut finalized_graph = self
145174 . compiler_cache_ref ( )
146175 . rewrite (
147- state . auth_context ( ) . unwrap ( ) ,
176+ Arc :: clone ( & cache_entry ) ,
148177 cube_ctx. clone ( ) ,
149178 converter. take_egraph ( ) ,
150179 & query_params. unwrap ( ) ,
@@ -192,6 +221,7 @@ pub trait QueryEngine {
192221 let result = rewriter
193222 . find_best_plan (
194223 root,
224+ cache_entry,
195225 state. auth_context ( ) . unwrap ( ) ,
196226 qtrace,
197227 span_id. clone ( ) ,
@@ -243,12 +273,31 @@ pub trait QueryEngine {
243273 // TODO: We should find what optimizers will be safety to use for OLAP queries
244274 guard. optimizer . rules = vec ! [ ] ;
245275 }
246- if let Some ( span_id) = span_id {
276+ if let Some ( span_id) = & span_id {
247277 span_id. set_is_data_query ( true ) . await ;
248278 }
249279 } ;
250280
251281 log:: debug!( "Rewrite: {:#?}" , rewrite_plan) ;
282+
283+ if let Some ( span_id) = span_id. as_ref ( ) {
284+ if let Some ( auth_context) = state. auth_context ( ) {
285+ self . transport_ref ( )
286+ . log_load_state (
287+ Some ( span_id. clone ( ) ) ,
288+ auth_context,
289+ state. get_load_request_meta ( ) ,
290+ "SQL API Query Planning Success" . to_string ( ) ,
291+ serde_json:: json!( {
292+ "query" : span_id. query_key. clone( ) ,
293+ "duration" : planning_start. elapsed( ) . unwrap( ) . as_millis( ) as u64 ,
294+ } ) ,
295+ )
296+ . await
297+ . map_err ( |e| CompilationError :: internal ( e. to_string ( ) ) ) ?;
298+ }
299+ }
300+
252301 let rewrite_plan = Self :: evaluate_wrapped_sql (
253302 self . transport_ref ( ) . clone ( ) ,
254303 Arc :: new ( state. get_load_request_meta ( ) ) ,
@@ -501,6 +550,21 @@ impl QueryEngine for SqlQueryEngine {
501550 fn sanitize_statement ( & self , stmt : & Self :: AstStatementType ) -> Self :: AstStatementType {
502551 SensitiveDataSanitizer :: new ( ) . replace ( stmt. clone ( ) )
503552 }
553+
554+ async fn get_cache_entry (
555+ & self ,
556+ state : Arc < SessionState > ,
557+ ) -> Result < Arc < CompilerCacheEntry > , CompilationError > {
558+ self . compiler_cache_ref ( )
559+ . get_cache_entry (
560+ state. auth_context ( ) . ok_or_else ( || {
561+ CompilationError :: internal ( "Unable to get auth context" . to_string ( ) )
562+ } ) ?,
563+ state. protocol . clone ( ) ,
564+ )
565+ . await
566+ . map_err ( |e| CompilationError :: internal ( e. to_string ( ) ) )
567+ }
504568}
505569
506570fn is_olap_query ( parent : & LogicalPlan ) -> Result < bool , CompilationError > {
0 commit comments