diff --git a/rust/cubesql/cubesql/src/compile/router.rs b/rust/cubesql/cubesql/src/compile/router.rs index 397f1239ee346..41e9e5c5213d9 100644 --- a/rust/cubesql/cubesql/src/compile/router.rs +++ b/rust/cubesql/cubesql/src/compile/router.rs @@ -3,7 +3,7 @@ use crate::compile::{ StatusFlags, }; use sqlparser::ast; -use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::SystemTime}; +use std::{collections::HashMap, sync::Arc, time::SystemTime}; use crate::{ compile::{ @@ -30,7 +30,7 @@ use datafusion::{ scalar::ScalarValue, }; use itertools::Itertools; -use sqlparser::ast::{escape_single_quote_string, ObjectName}; +use sqlparser::ast::escape_single_quote_string; #[derive(Clone)] pub struct QueryRouter { @@ -108,6 +108,23 @@ impl QueryRouter { } pub async fn plan( + &self, + stmt: ast::Statement, + qtrace: &mut Option, + span_id: Option>, + ) -> CompilationResult { + match stmt { + ast::Statement::Explain { + analyze, + statement, + verbose, + .. + } => self.explain_to_plan(statement, verbose, analyze).await, + other => self.plan_query(&other, qtrace, span_id).await, + } + } + + async fn plan_query( &self, stmt: &ast::Statement, qtrace: &mut Option, @@ -134,15 +151,6 @@ impl QueryRouter { (ast::Statement::ShowVariable { variable }, _) => { self.show_variable_to_plan(variable, span_id.clone()).await } - ( - ast::Statement::Explain { - statement, - verbose, - analyze, - .. - }, - _, - ) => self.explain_to_plan(&statement, *verbose, *analyze).await, (ast::Statement::StartTransaction { .. }, DatabaseProtocol::PostgreSQL) => { // TODO: Real support Ok(QueryPlan::MetaOk( @@ -260,68 +268,62 @@ impl QueryRouter { .await } - fn explain_to_plan( + async fn explain_to_plan( &self, - statement: &Box, + statement: Box, verbose: bool, analyze: bool, - ) -> Pin> + Send>> { - let self_cloned = self.clone(); - - let statement = statement.clone(); - // This Boxing construct here because of recursive call to self.plan() - Box::pin(async move { - // TODO span_id ? - let plan = self_cloned.plan(&statement, &mut None, None).await?; + ) -> Result { + // TODO span_id ? + let plan = self.plan_query(&statement, &mut None, None).await?; - match plan { - QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => Ok(QueryPlan::MetaTabular( - StatusFlags::empty(), - Box::new(dataframe::DataFrame::new( - vec![dataframe::Column::new( - "Execution Plan".to_string(), - ColumnType::String, - ColumnFlags::empty(), - )], - vec![dataframe::Row::new(vec![dataframe::TableValue::String( - "This query doesnt have a plan, because it already has values for response" - .to_string(), - )])], - )), + match plan { + QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => Ok(QueryPlan::MetaTabular( + StatusFlags::empty(), + Box::new(dataframe::DataFrame::new( + vec![dataframe::Column::new( + "Execution Plan".to_string(), + ColumnType::String, + ColumnFlags::empty(), + )], + vec![dataframe::Row::new(vec![dataframe::TableValue::String( + "This query doesnt have a plan, because it already has values for response" + .to_string(), + )])], )), - QueryPlan::DataFusionSelect(plan, context) - | QueryPlan::CreateTempTable(plan, context, _, _) => { - // EXPLAIN over CREATE TABLE AS shows the SELECT query plan - let plan = Arc::new(plan); - let schema = LogicalPlan::explain_schema(); - let schema = schema.to_dfschema_ref().map_err(|err| { - CompilationError::internal(format!( - "Unable to get DF schema for explain plan: {}", - err - )) - })?; - - let explain_plan = if analyze { - LogicalPlan::Analyze(Analyze { - verbose, - input: plan, - schema, - }) - } else { - let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)]; - - LogicalPlan::Explain(Explain { - verbose, - plan, - stringified_plans, - schema, - }) - }; + )), + QueryPlan::DataFusionSelect(plan, context) + | QueryPlan::CreateTempTable(plan, context, _, _) => { + // EXPLAIN over CREATE TABLE AS shows the SELECT query plan + let plan = Arc::new(plan); + let schema = LogicalPlan::explain_schema(); + let schema = schema.to_dfschema_ref().map_err(|err| { + CompilationError::internal(format!( + "Unable to get DF schema for explain plan: {}", + err + )) + })?; - Ok(QueryPlan::DataFusionSelect(explain_plan, context)) - } + let explain_plan = if analyze { + LogicalPlan::Analyze(Analyze { + verbose, + input: plan, + schema, + }) + } else { + let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)]; + + LogicalPlan::Explain(Explain { + verbose, + plan, + stringified_plans, + schema, + }) + }; + + Ok(QueryPlan::DataFusionSelect(explain_plan, context)) } - }) + } } fn set_role_to_plan( @@ -535,7 +537,7 @@ impl QueryRouter { )); }; - let ObjectName(ident_parts) = name; + let ast::ObjectName(ident_parts) = name; let Some(table_name) = ident_parts.last() else { return Err(CompilationError::internal( "table name contains no ident parts".to_string(), @@ -585,7 +587,7 @@ impl QueryRouter { "DROP TABLE supports dropping only one table at a time".to_string(), )); } - let ObjectName(ident_parts) = names.first().unwrap(); + let ast::ObjectName(ident_parts) = names.first().unwrap(); let Some(table_name) = ident_parts.last() else { return Err(CompilationError::internal( "table name contains no ident parts".to_string(), @@ -674,7 +676,7 @@ pub async fn convert_statement_to_cube_query( } let planner = QueryRouter::new(session.state.clone(), meta, session.session_manager.clone()); - planner.plan(&stmt, qtrace, span_id).await + planner.plan(stmt, qtrace, span_id).await } pub async fn convert_sql_to_cube_query(