Skip to content

Commit 5118b3d

Browse files
committed
chore(cubesql): Do not call async Node functions while planning
1 parent 583e566 commit 5118b3d

File tree

5 files changed

+144
-66
lines changed

5 files changed

+144
-66
lines changed

rust/cubesql/cubesql/src/compile/query_engine.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use datafusion::{
4343
sql::{parser::Statement as DFStatement, planner::SqlToRel},
4444
variable::VarType,
4545
};
46+
use uuid::Uuid;
4647

4748
#[async_trait::async_trait]
4849
pub trait QueryEngine {
@@ -81,6 +82,7 @@ pub trait QueryEngine {
8182
span_id: Option<Arc<SpanId>>,
8283
meta: Arc<MetaContext>,
8384
state: Arc<SessionState>,
85+
compiler_id: Uuid,
8486
) -> CompilationResult<QueryPlan> {
8587
let ctx = self.create_session_ctx(state.clone())?;
8688
let cube_ctx = self.create_cube_ctx(state.clone(), meta.clone(), ctx.clone())?;
@@ -140,7 +142,7 @@ pub trait QueryEngine {
140142
let mut finalized_graph = self
141143
.compiler_cache_ref()
142144
.rewrite(
143-
state.auth_context().unwrap(),
145+
compiler_id,
144146
cube_ctx.clone(),
145147
converter.take_egraph(),
146148
&query_params.unwrap(),
@@ -186,7 +188,13 @@ pub trait QueryEngine {
186188
let mut rewriter = Rewriter::new(finalized_graph, cube_ctx.clone());
187189

188190
let result = rewriter
189-
.find_best_plan(root, state.auth_context().unwrap(), qtrace, span_id.clone())
191+
.find_best_plan(
192+
root,
193+
compiler_id,
194+
state.auth_context().unwrap(),
195+
qtrace,
196+
span_id.clone(),
197+
)
190198
.await
191199
.map_err(|e| match e.cause {
192200
CubeErrorCauseType::Internal(_) => CompilationError::Internal(

rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use std::{
3131
sync::Arc,
3232
time::Duration,
3333
};
34+
use uuid::Uuid;
3435

3536
pub struct Rewriter {
3637
graph: EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
@@ -229,7 +230,7 @@ impl Rewriter {
229230

230231
pub async fn run_rewrite_to_completion(
231232
&mut self,
232-
auth_context: AuthContextRef,
233+
compiler_id: Uuid,
233234
qtrace: &mut Option<Qtrace>,
234235
) -> Result<EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, CubeError> {
235236
let cube_context = self.cube_context.clone();
@@ -243,7 +244,7 @@ impl Rewriter {
243244
.server
244245
.compiler_cache
245246
.rewrite_rules(
246-
auth_context.clone(),
247+
compiler_id,
247248
cube_context.session_state.protocol.clone(),
248249
false,
249250
)
@@ -311,6 +312,7 @@ impl Rewriter {
311312
pub async fn find_best_plan(
312313
&mut self,
313314
root: Id,
315+
compiler_id: Uuid,
314316
auth_context: AuthContextRef,
315317
qtrace: &mut Option<Qtrace>,
316318
span_id: Option<Arc<SpanId>>,
@@ -326,7 +328,7 @@ impl Rewriter {
326328
.server
327329
.compiler_cache
328330
.rewrite_rules(
329-
auth_context.clone(),
331+
compiler_id,
330332
cube_context.session_state.protocol.clone(),
331333
true,
332334
)

rust/cubesql/cubesql/src/compile/router.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion::{
3131
};
3232
use itertools::Itertools;
3333
use sqlparser::ast::{escape_single_quote_string, ObjectName};
34+
use uuid::Uuid;
3435

3536
#[derive(Clone)]
3637
pub struct QueryRouter {
@@ -52,6 +53,20 @@ impl QueryRouter {
5253
}
5354
}
5455

56+
async fn get_compiler_id_and_refresh_cache_if_needed(&self) -> CompilationResult<Uuid> {
57+
self.session_manager
58+
.server
59+
.compiler_cache
60+
.get_compiler_id_and_refresh_if_needed(
61+
self.state.auth_context().ok_or_else(|| {
62+
CompilationError::internal("Unable to get auth context".to_string())
63+
})?,
64+
self.state.protocol.clone(),
65+
)
66+
.await
67+
.map_err(|e| CompilationError::internal(e.to_string()))
68+
}
69+
5570
/// Common case for both planners: meta & olap
5671
/// This method tries to detect what planner to use as earlier as possible
5772
/// and forward context to correct planner
@@ -61,6 +76,9 @@ impl QueryRouter {
6176
qtrace: &mut Option<Qtrace>,
6277
span_id: Option<Arc<SpanId>>,
6378
) -> CompilationResult<QueryPlan> {
79+
self.reauthenticate_if_needed().await?;
80+
let compiler_id = self.get_compiler_id_and_refresh_cache_if_needed().await?;
81+
6482
let planning_start = SystemTime::now();
6583
if let Some(span_id) = span_id.as_ref() {
6684
if let Some(auth_context) = self.state.auth_context() {
@@ -81,7 +99,7 @@ impl QueryRouter {
8199
}
82100
}
83101
let result = self
84-
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
102+
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone(), compiler_id)
85103
.await?;
86104

87105
if let Some(span_id) = span_id.as_ref() {
@@ -230,6 +248,7 @@ impl QueryRouter {
230248
variable: &Vec<ast::Ident>,
231249
span_id: Option<Arc<SpanId>>,
232250
) -> CompilationResult<QueryPlan> {
251+
let compiler_id = self.get_compiler_id_and_refresh_cache_if_needed().await?;
233252
let name = variable.to_vec()[0].value.clone();
234253
if self.state.protocol == DatabaseProtocol::PostgreSQL {
235254
let full_variable = variable.iter().map(|v| v.value.to_lowercase()).join("_");
@@ -256,7 +275,7 @@ impl QueryRouter {
256275
)?
257276
};
258277

259-
self.create_df_logical_plan(stmt, &mut None, span_id.clone())
278+
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), compiler_id)
260279
.await
261280
} else if name.eq_ignore_ascii_case("databases") || name.eq_ignore_ascii_case("schemas") {
262281
Ok(QueryPlan::MetaTabular(
@@ -289,7 +308,7 @@ impl QueryRouter {
289308
&mut None,
290309
)?;
291310

292-
self.create_df_logical_plan(stmt, &mut None, span_id.clone())
311+
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), compiler_id)
293312
.await
294313
} else if name.eq_ignore_ascii_case("warnings") {
295314
Ok(QueryPlan::MetaTabular(
@@ -322,6 +341,7 @@ impl QueryRouter {
322341
},
323342
&mut None,
324343
span_id.clone(),
344+
compiler_id,
325345
)
326346
.await
327347
}
@@ -693,8 +713,8 @@ impl QueryRouter {
693713
stmt: ast::Statement,
694714
qtrace: &mut Option<Qtrace>,
695715
span_id: Option<Arc<SpanId>>,
716+
compiler_id: Uuid,
696717
) -> CompilationResult<QueryPlan> {
697-
self.reauthenticate_if_needed().await?;
698718
match &stmt {
699719
ast::Statement::Query(query) => match &query.body {
700720
ast::SetExpr::Select(select) if select.into.is_some() => {
@@ -709,7 +729,14 @@ impl QueryRouter {
709729

710730
let sql_query_engine = SqlQueryEngine::new(self.session_manager.clone());
711731
sql_query_engine
712-
.plan(stmt, qtrace, span_id, self.meta.clone(), self.state.clone())
732+
.plan(
733+
stmt,
734+
qtrace,
735+
span_id,
736+
self.meta.clone(),
737+
self.state.clone(),
738+
compiler_id,
739+
)
713740
.await
714741
}
715742
}

0 commit comments

Comments
 (0)