Skip to content

Commit 23315a0

Browse files
committed
chore(cubesql): Do not call async Node functions while planning
1 parent 591a383 commit 23315a0

File tree

5 files changed

+172
-150
lines changed

5 files changed

+172
-150
lines changed

rust/cubesql/cubesql/src/compile/query_engine.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use crate::compile::engine::df::planner::CubeQueryPlanner;
2-
use std::{backtrace::Backtrace, collections::HashMap, future::Future, pin::Pin, sync::Arc};
2+
use std::{
3+
backtrace::Backtrace, collections::HashMap, future::Future, pin::Pin, sync::Arc,
4+
time::SystemTime,
5+
};
36

47
use crate::{
58
compile::{
@@ -21,8 +24,9 @@ use crate::{
2124
},
2225
config::ConfigObj,
2326
sql::{
24-
compiler_cache::CompilerCache, statement::SensitiveDataSanitizer, SessionManager,
25-
SessionState,
27+
compiler_cache::{CompilerCache, CompilerCacheEntry},
28+
statement::SensitiveDataSanitizer,
29+
SessionManager, SessionState,
2630
},
2731
transport::{LoadRequestMeta, MetaContext, SpanId, TransportService},
2832
CubeErrorCauseType,
@@ -78,6 +82,11 @@ pub trait QueryEngine {
7882

7983
fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType;
8084

85+
async fn get_cache_entry_and_refresh_cache_if_needed(
86+
&self,
87+
state: Arc<SessionState>,
88+
) -> Result<Arc<CompilerCacheEntry>, CompilationError>;
89+
8190
async fn plan(
8291
&self,
8392
stmt: Self::AstStatementType,
@@ -86,6 +95,28 @@ pub trait QueryEngine {
8695
meta: Arc<MetaContext>,
8796
state: Arc<SessionState>,
8897
) -> CompilationResult<(QueryPlan, Self::PlanMetadataType)> {
98+
let cache_entry = self
99+
.get_cache_entry_and_refresh_cache_if_needed(state.clone())
100+
.await?;
101+
102+
let planning_start = SystemTime::now();
103+
if let Some(span_id) = span_id.as_ref() {
104+
if let Some(auth_context) = state.auth_context() {
105+
self.transport_ref()
106+
.log_load_state(
107+
Some(span_id.clone()),
108+
auth_context,
109+
state.get_load_request_meta(),
110+
"SQL API Query Planning".to_string(),
111+
serde_json::json!({
112+
"query": span_id.query_key.clone(),
113+
}),
114+
)
115+
.await
116+
.map_err(|e| CompilationError::internal(e.to_string()))?;
117+
}
118+
}
119+
89120
let ctx = self.create_session_ctx(state.clone())?;
90121
let cube_ctx = self.create_cube_ctx(state.clone(), meta.clone(), ctx.clone())?;
91122

@@ -144,7 +175,7 @@ pub trait QueryEngine {
144175
let mut finalized_graph = self
145176
.compiler_cache_ref()
146177
.rewrite(
147-
state.auth_context().unwrap(),
178+
Arc::clone(&cache_entry),
148179
cube_ctx.clone(),
149180
converter.take_egraph(),
150181
&query_params.unwrap(),
@@ -192,6 +223,7 @@ pub trait QueryEngine {
192223
let result = rewriter
193224
.find_best_plan(
194225
root,
226+
cache_entry,
195227
state.auth_context().unwrap(),
196228
qtrace,
197229
span_id.clone(),
@@ -243,12 +275,31 @@ pub trait QueryEngine {
243275
// TODO: We should find what optimizers will be safety to use for OLAP queries
244276
guard.optimizer.rules = vec![];
245277
}
246-
if let Some(span_id) = span_id {
278+
if let Some(span_id) = &span_id {
247279
span_id.set_is_data_query(true).await;
248280
}
249281
};
250282

251283
log::debug!("Rewrite: {:#?}", rewrite_plan);
284+
285+
if let Some(span_id) = span_id.as_ref() {
286+
if let Some(auth_context) = state.auth_context() {
287+
self.transport_ref()
288+
.log_load_state(
289+
Some(span_id.clone()),
290+
auth_context,
291+
state.get_load_request_meta(),
292+
"SQL API Query Planning Success".to_string(),
293+
serde_json::json!({
294+
"query": span_id.query_key.clone(),
295+
"duration": planning_start.elapsed().unwrap().as_millis() as u64,
296+
}),
297+
)
298+
.await
299+
.map_err(|e| CompilationError::internal(e.to_string()))?;
300+
}
301+
}
302+
252303
let rewrite_plan = Self::evaluate_wrapped_sql(
253304
self.transport_ref().clone(),
254305
Arc::new(state.get_load_request_meta()),
@@ -501,6 +552,21 @@ impl QueryEngine for SqlQueryEngine {
501552
fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType {
502553
SensitiveDataSanitizer::new().replace(stmt.clone())
503554
}
555+
556+
async fn get_cache_entry_and_refresh_cache_if_needed(
557+
&self,
558+
state: Arc<SessionState>,
559+
) -> Result<Arc<CompilerCacheEntry>, CompilationError> {
560+
self.compiler_cache_ref()
561+
.get_cache_entry_and_refresh_if_needed(
562+
state.auth_context().ok_or_else(|| {
563+
CompilationError::internal("Unable to get auth context".to_string())
564+
})?,
565+
state.protocol.clone(),
566+
)
567+
.await
568+
.map_err(|e| CompilationError::internal(e.to_string()))
569+
}
504570
}
505571

506572
fn is_olap_query(parent: &LogicalPlan) -> Result<bool, CompilationError> {

rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
CubeContext,
1616
},
1717
config::ConfigObj,
18-
sql::AuthContextRef,
18+
sql::{compiler_cache::CompilerCacheEntry, AuthContextRef},
1919
transport::{MetaContext, SpanId},
2020
CubeError,
2121
};
@@ -310,7 +310,7 @@ impl Rewriter {
310310

311311
pub async fn run_rewrite_to_completion(
312312
&mut self,
313-
auth_context: AuthContextRef,
313+
cache_entry: Arc<CompilerCacheEntry>,
314314
qtrace: &mut Option<Qtrace>,
315315
) -> Result<CubeEGraph, CubeError> {
316316
let cube_context = self.cube_context.clone();
@@ -323,11 +323,7 @@ impl Rewriter {
323323
.sessions
324324
.server
325325
.compiler_cache
326-
.rewrite_rules(
327-
auth_context.clone(),
328-
cube_context.session_state.protocol.clone(),
329-
false,
330-
)
326+
.rewrite_rules(cache_entry, false)
331327
.await?;
332328

333329
let (plan, qtrace_egraph_iterations) = tokio::task::spawn_blocking(move || {
@@ -392,6 +388,7 @@ impl Rewriter {
392388
pub async fn find_best_plan(
393389
&mut self,
394390
root: Id,
391+
cache_entry: Arc<CompilerCacheEntry>,
395392
auth_context: AuthContextRef,
396393
qtrace: &mut Option<Qtrace>,
397394
span_id: Option<Arc<SpanId>>,
@@ -407,11 +404,7 @@ impl Rewriter {
407404
.sessions
408405
.server
409406
.compiler_cache
410-
.rewrite_rules(
411-
auth_context.clone(),
412-
cube_context.session_state.protocol.clone(),
413-
true,
414-
)
407+
.rewrite_rules(cache_entry, true)
415408
.await?;
416409

417410
let (plan, qtrace_egraph_iterations, qtrace_best_graph) =

rust/cubesql/cubesql/src/compile/router.rs

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::compile::{
33
StatusFlags,
44
};
55
use sqlparser::ast;
6-
use std::{collections::HashMap, sync::Arc, time::SystemTime};
6+
use std::{collections::HashMap, sync::Arc};
77

88
use crate::{
99
compile::{
@@ -61,50 +61,8 @@ impl QueryRouter {
6161
qtrace: &mut Option<Qtrace>,
6262
span_id: Option<Arc<SpanId>>,
6363
) -> CompilationResult<QueryPlan> {
64-
let planning_start = SystemTime::now();
65-
if let Some(span_id) = span_id.as_ref() {
66-
if let Some(auth_context) = self.state.auth_context() {
67-
self.session_manager
68-
.server
69-
.transport
70-
.log_load_state(
71-
Some(span_id.clone()),
72-
auth_context,
73-
self.state.get_load_request_meta(),
74-
"SQL API Query Planning".to_string(),
75-
serde_json::json!({
76-
"query": span_id.query_key.clone(),
77-
}),
78-
)
79-
.await
80-
.map_err(|e| CompilationError::internal(e.to_string()))?;
81-
}
82-
}
83-
let result = self
84-
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
85-
.await?;
86-
87-
if let Some(span_id) = span_id.as_ref() {
88-
if let Some(auth_context) = self.state.auth_context() {
89-
self.session_manager
90-
.server
91-
.transport
92-
.log_load_state(
93-
Some(span_id.clone()),
94-
auth_context,
95-
self.state.get_load_request_meta(),
96-
"SQL API Query Planning Success".to_string(),
97-
serde_json::json!({
98-
"query": span_id.query_key.clone(),
99-
"duration": planning_start.elapsed().unwrap().as_millis() as u64,
100-
}),
101-
)
102-
.await
103-
.map_err(|e| CompilationError::internal(e.to_string()))?;
104-
}
105-
}
106-
107-
return Ok(result);
64+
self.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
65+
.await
10866
}
10967

11068
pub async fn plan(

0 commit comments

Comments
 (0)