Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 71 additions & 69 deletions rust/cubesql/cubesql/src/compile/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
StatusFlags,
};
use sqlparser::ast;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::SystemTime};
use std::{collections::HashMap, sync::Arc, time::SystemTime};

use crate::{
compile::{
Expand All @@ -30,7 +30,7 @@
scalar::ScalarValue,
};
use itertools::Itertools;
use sqlparser::ast::{escape_single_quote_string, ObjectName};
use sqlparser::ast::escape_single_quote_string;

#[derive(Clone)]
pub struct QueryRouter {
Expand Down Expand Up @@ -108,6 +108,23 @@
}

pub async fn plan(
&self,
stmt: ast::Statement,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
) -> CompilationResult<QueryPlan> {
match stmt {
ast::Statement::Explain {
analyze,
statement,
verbose,
..
} => self.explain_to_plan(statement, verbose, analyze).await,
other => self.plan_query(&other, qtrace, span_id).await,
}
}

async fn plan_query(
&self,
stmt: &ast::Statement,
qtrace: &mut Option<Qtrace>,
Expand All @@ -134,15 +151,6 @@
(ast::Statement::ShowVariable { variable }, _) => {
self.show_variable_to_plan(variable, span_id.clone()).await
}
(
ast::Statement::Explain {
statement,
verbose,
analyze,
..
},
_,
) => self.explain_to_plan(&statement, *verbose, *analyze).await,
(ast::Statement::StartTransaction { .. }, DatabaseProtocol::PostgreSQL) => {
// TODO: Real support
Ok(QueryPlan::MetaOk(
Expand Down Expand Up @@ -260,68 +268,62 @@
.await
}

fn explain_to_plan(
async fn explain_to_plan(
&self,
statement: &Box<ast::Statement>,
statement: Box<ast::Statement>,
verbose: bool,
analyze: bool,
) -> Pin<Box<dyn Future<Output = Result<QueryPlan, CompilationError>> + Send>> {
let self_cloned = self.clone();

let statement = statement.clone();
// This Boxing construct here because of recursive call to self.plan()
Box::pin(async move {
// TODO span_id ?
let plan = self_cloned.plan(&statement, &mut None, None).await?;
) -> Result<QueryPlan, CompilationError> {
// TODO span_id ?
let plan = self.plan_query(&statement, &mut None, None).await?;

match plan {
QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => Ok(QueryPlan::MetaTabular(
StatusFlags::empty(),
Box::new(dataframe::DataFrame::new(
vec![dataframe::Column::new(
"Execution Plan".to_string(),
ColumnType::String,
ColumnFlags::empty(),
)],
vec![dataframe::Row::new(vec![dataframe::TableValue::String(
"This query doesnt have a plan, because it already has values for response"
.to_string(),
)])],
)),
match plan {
QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => Ok(QueryPlan::MetaTabular(
StatusFlags::empty(),
Box::new(dataframe::DataFrame::new(
vec![dataframe::Column::new(
"Execution Plan".to_string(),
ColumnType::String,
ColumnFlags::empty(),
)],
vec![dataframe::Row::new(vec![dataframe::TableValue::String(
"This query doesnt have a plan, because it already has values for response"
.to_string(),
)])],

Check warning on line 292 in rust/cubesql/cubesql/src/compile/router.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/router.rs#L281-L292

Added lines #L281 - L292 were not covered by tests
)),
QueryPlan::DataFusionSelect(plan, context)
| QueryPlan::CreateTempTable(plan, context, _, _) => {
// EXPLAIN over CREATE TABLE AS shows the SELECT query plan
let plan = Arc::new(plan);
let schema = LogicalPlan::explain_schema();
let schema = schema.to_dfschema_ref().map_err(|err| {
CompilationError::internal(format!(
"Unable to get DF schema for explain plan: {}",
err
))
})?;

let explain_plan = if analyze {
LogicalPlan::Analyze(Analyze {
verbose,
input: plan,
schema,
})
} else {
let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];

LogicalPlan::Explain(Explain {
verbose,
plan,
stringified_plans,
schema,
})
};
)),

Check warning on line 294 in rust/cubesql/cubesql/src/compile/router.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/router.rs#L294

Added line #L294 was not covered by tests
QueryPlan::DataFusionSelect(plan, context)
| QueryPlan::CreateTempTable(plan, context, _, _) => {

Check warning on line 296 in rust/cubesql/cubesql/src/compile/router.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/router.rs#L296

Added line #L296 was not covered by tests
// EXPLAIN over CREATE TABLE AS shows the SELECT query plan
let plan = Arc::new(plan);
let schema = LogicalPlan::explain_schema();
let schema = schema.to_dfschema_ref().map_err(|err| {
CompilationError::internal(format!(
"Unable to get DF schema for explain plan: {}",
err
))

Check warning on line 304 in rust/cubesql/cubesql/src/compile/router.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/router.rs#L301-L304

Added lines #L301 - L304 were not covered by tests
})?;

Ok(QueryPlan::DataFusionSelect(explain_plan, context))
}
let explain_plan = if analyze {
LogicalPlan::Analyze(Analyze {
verbose,
input: plan,
schema,
})
} else {
let stringified_plans = vec![plan.to_stringified(PlanType::InitialLogicalPlan)];

LogicalPlan::Explain(Explain {
verbose,
plan,
stringified_plans,
schema,
})
};

Ok(QueryPlan::DataFusionSelect(explain_plan, context))
}
})
}
}

fn set_role_to_plan(
Expand Down Expand Up @@ -535,7 +537,7 @@
));
};

let ObjectName(ident_parts) = name;
let ast::ObjectName(ident_parts) = name;
let Some(table_name) = ident_parts.last() else {
return Err(CompilationError::internal(
"table name contains no ident parts".to_string(),
Expand Down Expand Up @@ -585,7 +587,7 @@
"DROP TABLE supports dropping only one table at a time".to_string(),
));
}
let ObjectName(ident_parts) = names.first().unwrap();
let ast::ObjectName(ident_parts) = names.first().unwrap();
let Some(table_name) = ident_parts.last() else {
return Err(CompilationError::internal(
"table name contains no ident parts".to_string(),
Expand Down Expand Up @@ -674,7 +676,7 @@
}

let planner = QueryRouter::new(session.state.clone(), meta, session.session_manager.clone());
planner.plan(&stmt, qtrace, span_id).await
planner.plan(stmt, qtrace, span_id).await
}

pub async fn convert_sql_to_cube_query(
Expand Down
Loading