From d81cf88485bf0e572384a5e9ac0dafd04461f2bc Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 16 Oct 2024 11:56:17 +0200 Subject: [PATCH 1/2] chore(cubesql): QueryEngine - support custom metadata for QueryPlan --- rust/cubesql/cubesql/src/compile/plan.rs | 30 ++++++++---------- .../cubesql/src/compile/query_engine.rs | 31 ++++++++++++------- rust/cubesql/cubesql/src/compile/router.rs | 16 +++++----- rust/cubesql/cubesql/src/compile/test/mod.rs | 8 +++-- .../cubesql/src/sql/postgres/extended.rs | 4 +-- rust/cubesql/cubesql/src/sql/postgres/shim.rs | 4 +-- 6 files changed, 50 insertions(+), 43 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/plan.rs b/rust/cubesql/cubesql/src/compile/plan.rs index 2451fad3105a5..a9699302f8e43 100644 --- a/rust/cubesql/cubesql/src/compile/plan.rs +++ b/rust/cubesql/cubesql/src/compile/plan.rs @@ -45,10 +45,9 @@ pub enum QueryPlan { MetaOk(StatusFlags, CommandCompletion), MetaTabular(StatusFlags, Box), // Query will be executed via Data Fusion - DataFusionSelect(StatusFlags, LogicalPlan, DFSessionContext), + DataFusionSelect(LogicalPlan, DFSessionContext), // Query will be executed via DataFusion and saved to session CreateTempTable( - StatusFlags, LogicalPlan, DFSessionContext, String, @@ -70,16 +69,13 @@ impl fmt::Debug for QueryPlan { flags )) }, - QueryPlan::DataFusionSelect(flags, _, _) => { - f.write_str(&format!( - "DataFusionSelect(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden)", - flags - )) + QueryPlan::DataFusionSelect(_, _) => { + f.write_str(&"DataFusionSelect(LogicalPlan: hidden, DFSessionContext: hidden)") }, - QueryPlan::CreateTempTable(flags, _, _, name, _) => { + QueryPlan::CreateTempTable(_, _, name, _) => { f.write_str(&format!( - "CreateTempTable(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden, Name: {:?}, SessionState: hidden", - flags, name + "CreateTempTable(LogicalPlan: hidden, DFSessionContext: hidden, Name: {}, SessionState: hidden", + name )) }, } @@ -89,8 +85,8 @@ impl fmt::Debug for QueryPlan { impl QueryPlan { pub fn as_logical_plan(&self) -> LogicalPlan { match self { - QueryPlan::DataFusionSelect(_, plan, _) - | QueryPlan::CreateTempTable(_, plan, _, _, _) => plan.clone(), + QueryPlan::DataFusionSelect(plan, _) + | QueryPlan::CreateTempTable(plan, _, _, _) => plan.clone(), QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => { panic!("This query doesnt have a plan, because it already has values for response") } @@ -99,8 +95,8 @@ impl QueryPlan { pub async fn as_physical_plan(&self) -> Result, CubeError> { match self { - QueryPlan::DataFusionSelect(_, plan, ctx) - | QueryPlan::CreateTempTable(_, plan, ctx, _, _) => { + QueryPlan::DataFusionSelect(plan, ctx) + | QueryPlan::CreateTempTable(plan, ctx, _, _) => { DataFrame::new(ctx.state.clone(), plan) .create_physical_plan() .await @@ -114,8 +110,8 @@ impl QueryPlan { pub fn print(&self, pretty: bool) -> Result { match self { - QueryPlan::DataFusionSelect(_, plan, _) - | QueryPlan::CreateTempTable(_, plan, _, _, _) => { + QueryPlan::DataFusionSelect(plan, _) + | QueryPlan::CreateTempTable(plan, _, _, _) => { if pretty { Ok(plan.display_indent().to_string()) } else { @@ -134,7 +130,7 @@ pub async fn get_df_batches( plan: &QueryPlan, ) -> Result>, CubeError> { match plan { - QueryPlan::DataFusionSelect(_, plan, ctx) => { + QueryPlan::DataFusionSelect(plan, ctx) => { let df = DataFrame::new(ctx.state.clone(), &plan); let safe_stream = async move { std::panic::AssertUnwindSafe(df.execute_stream()) diff --git a/rust/cubesql/cubesql/src/compile/query_engine.rs b/rust/cubesql/cubesql/src/compile/query_engine.rs index ac49e217b88fc..82a7aa40661da 100644 --- a/rust/cubesql/cubesql/src/compile/query_engine.rs +++ b/rust/cubesql/cubesql/src/compile/query_engine.rs @@ -17,7 +17,7 @@ use crate::{ analysis::LogicalPlanAnalysis, converter::{LogicalPlanToLanguageContext, LogicalPlanToLanguageConverter}, }, - CompilationError, CompilationResult, DatabaseProtocol, QueryPlan, Rewriter, StatusFlags, + CompilationError, CompilationResult, DatabaseProtocol, QueryPlan, Rewriter, }, config::ConfigObj, sql::{ @@ -46,8 +46,12 @@ use datafusion::{ #[async_trait::async_trait] pub trait QueryEngine { + /// Custom type for AST statement type, It allows to use any parsers for SQL type AstStatementType: std::fmt::Display + Send; + /// Additional metadata for results of plan method instead of extending query plan + type PlanMetadataType: std::fmt::Debug + Send; + fn compiler_cache_ref(&self) -> &Arc; fn transport_ref(&self) -> &Arc; @@ -70,7 +74,7 @@ pub trait QueryEngine { &self, cube_ctx: &CubeContext, stmt: &Self::AstStatementType, - ) -> Result; + ) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError>; fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType; @@ -81,11 +85,11 @@ pub trait QueryEngine { span_id: Option>, meta: Arc, state: Arc, - ) -> CompilationResult { + ) -> CompilationResult<(QueryPlan, Self::PlanMetadataType)> { let ctx = self.create_session_ctx(state.clone())?; let cube_ctx = self.create_cube_ctx(state.clone(), meta.clone(), ctx.clone())?; - let plan = self.create_logical_plan(&cube_ctx, &stmt).map_err(|err| { + let (plan, metadata) = self.create_logical_plan(&cube_ctx, &stmt).map_err(|err| { let message = format!("Initial planning error: {}", err,); let meta = Some(HashMap::from([ ("query".to_string(), stmt.to_string()), @@ -249,10 +253,12 @@ pub trait QueryEngine { qtrace.set_best_plan_and_cube_scans(&rewrite_plan); } - Ok(QueryPlan::DataFusionSelect( - StatusFlags::empty(), - rewrite_plan, - ctx, + Ok(( + QueryPlan::DataFusionSelect( + rewrite_plan, + ctx, + ), + metadata )) } @@ -308,6 +314,8 @@ impl SqlQueryEngine { impl QueryEngine for SqlQueryEngine { type AstStatementType = sqlparser::ast::Statement; + type PlanMetadataType = (); + fn create_cube_ctx( &self, state: Arc, @@ -470,12 +478,11 @@ impl QueryEngine for SqlQueryEngine { &self, cube_ctx: &CubeContext, stmt: &Self::AstStatementType, - ) -> Result { + ) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError> { let df_query_planner = SqlToRel::new_with_options(cube_ctx, true); - let plan = - df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone()))); + let plan = df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())))?; - plan + Ok((plan, ())) } fn compiler_cache_ref(&self) -> &Arc { diff --git a/rust/cubesql/cubesql/src/compile/router.rs b/rust/cubesql/cubesql/src/compile/router.rs index 53b3bb0a50a6c..7e8506e4956bf 100644 --- a/rust/cubesql/cubesql/src/compile/router.rs +++ b/rust/cubesql/cubesql/src/compile/router.rs @@ -356,8 +356,8 @@ impl QueryRouter { )])], )), )), - QueryPlan::DataFusionSelect(flags, plan, context) - | QueryPlan::CreateTempTable(flags, plan, context, _, _) => { + QueryPlan::DataFusionSelect(plan, context) + | QueryPlan::CreateTempTable(plan, context, _, _) => { // EXPLAIN over CREATE TABLE AS shows the SELECT query plan let plan = Arc::new(plan); let schema = LogicalPlan::explain_schema(); @@ -385,7 +385,7 @@ impl QueryRouter { }) }; - Ok(QueryPlan::DataFusionSelect(flags, explain_plan, context)) + Ok(QueryPlan::DataFusionSelect(explain_plan, context)) } } }) @@ -596,7 +596,7 @@ impl QueryRouter { span_id: Option>, ) -> Result { let plan = self.select_to_plan(stmt, qtrace, span_id).await?; - let QueryPlan::DataFusionSelect(flags, plan, ctx) = plan else { + let QueryPlan::DataFusionSelect(plan, ctx) = plan else { return Err(CompilationError::internal( "unable to build DataFusion plan from Query".to_string(), )); @@ -608,8 +608,8 @@ impl QueryRouter { "table name contains no ident parts".to_string(), )); }; + Ok(QueryPlan::CreateTempTable( - flags, plan, ctx, table_name.value.to_string(), @@ -708,9 +708,11 @@ impl QueryRouter { } let sql_query_engine = SqlQueryEngine::new(self.session_manager.clone()); - sql_query_engine + let (plan, _) = sql_query_engine .plan(stmt, qtrace, span_id, self.meta.clone(), self.state.clone()) - .await + .await?; + + Ok(plan) } } diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index ae39eb9fbce38..2ea8d1df0e54d 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -933,19 +933,21 @@ impl TestContext { .await .map_err(|e| CubeError::internal(format!("Error during planning: {}", e)))?; match query { - QueryPlan::DataFusionSelect(flags, plan, ctx) => { + QueryPlan::DataFusionSelect(plan, ctx) => { let df = DFDataFrame::new(ctx.state, &plan); let batches = df.collect().await?; let frame = batches_to_dataframe(&df.schema().into(), batches)?; output.push(frame.print()); - output_flags = flags; } QueryPlan::MetaTabular(flags, frame) => { output.push(frame.print()); output_flags = flags; } - QueryPlan::MetaOk(flags, _) | QueryPlan::CreateTempTable(flags, _, _, _, _) => { + QueryPlan::CreateTempTable(_, _, _, _) => { + // nothing to do + } + QueryPlan::MetaOk(flags, _) => { output_flags = flags; } } diff --git a/rust/cubesql/cubesql/src/sql/postgres/extended.rs b/rust/cubesql/cubesql/src/sql/postgres/extended.rs index f039609e25faa..903dcc521bd91 100644 --- a/rust/cubesql/cubesql/src/sql/postgres/extended.rs +++ b/rust/cubesql/cubesql/src/sql/postgres/extended.rs @@ -492,7 +492,7 @@ impl Portal { return; } - QueryPlan::DataFusionSelect(_, plan, ctx) => { + QueryPlan::DataFusionSelect(plan, ctx) => { let df = DFDataFrame::new(ctx.state.clone(), &plan); let safe_stream = async move { std::panic::AssertUnwindSafe(df.execute_stream()) @@ -511,7 +511,7 @@ impl Portal { Err(err) => return yield Err(CubeError::panic(err).into()), } } - QueryPlan::CreateTempTable(_, plan, ctx, name, temp_tables) => { + QueryPlan::CreateTempTable(plan, ctx, name, temp_tables) => { let df = DFDataFrame::new(ctx.state.clone(), &plan); let record_batch = df.collect(); let row_count = match record_batch.await { diff --git a/rust/cubesql/cubesql/src/sql/postgres/shim.rs b/rust/cubesql/cubesql/src/sql/postgres/shim.rs index 8de9ac405c5ff..5a4b7a48c3de6 100644 --- a/rust/cubesql/cubesql/src/sql/postgres/shim.rs +++ b/rust/cubesql/cubesql/src/sql/postgres/shim.rs @@ -72,7 +72,7 @@ impl QueryPlanExt for QueryPlan { required_format: protocol::Format, ) -> Result, ConnectionError> { match &self { - QueryPlan::MetaOk(_, _) | QueryPlan::CreateTempTable(_, _, _, _, _) => Ok(None), + QueryPlan::MetaOk(_, _) | QueryPlan::CreateTempTable(_, _, _, _) => Ok(None), QueryPlan::MetaTabular(_, frame) => { let mut result = vec![]; @@ -86,7 +86,7 @@ impl QueryPlanExt for QueryPlan { Ok(Some(protocol::RowDescription::new(result))) } - QueryPlan::DataFusionSelect(_, logical_plan, _) => { + QueryPlan::DataFusionSelect(logical_plan, _) => { let mut result = vec![]; for field in logical_plan.schema().fields() { From 825a2a49e527cef3593357f72cb8038fd5baa7e3 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 16 Oct 2024 11:57:52 +0200 Subject: [PATCH 2/2] chore(cubesql): QueryEngine - support custom metadata for QueryPlan --- rust/cubesql/cubesql/src/compile/plan.rs | 15 +++++---------- rust/cubesql/cubesql/src/compile/query_engine.rs | 11 +++-------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/plan.rs b/rust/cubesql/cubesql/src/compile/plan.rs index a9699302f8e43..e9f474da1afbb 100644 --- a/rust/cubesql/cubesql/src/compile/plan.rs +++ b/rust/cubesql/cubesql/src/compile/plan.rs @@ -47,12 +47,7 @@ pub enum QueryPlan { // Query will be executed via Data Fusion DataFusionSelect(LogicalPlan, DFSessionContext), // Query will be executed via DataFusion and saved to session - CreateTempTable( - LogicalPlan, - DFSessionContext, - String, - Arc, - ), + CreateTempTable(LogicalPlan, DFSessionContext, String, Arc), } impl fmt::Debug for QueryPlan { @@ -85,8 +80,9 @@ impl fmt::Debug for QueryPlan { impl QueryPlan { pub fn as_logical_plan(&self) -> LogicalPlan { match self { - QueryPlan::DataFusionSelect(plan, _) - | QueryPlan::CreateTempTable(plan, _, _, _) => plan.clone(), + QueryPlan::DataFusionSelect(plan, _) | QueryPlan::CreateTempTable(plan, _, _, _) => { + plan.clone() + } QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => { panic!("This query doesnt have a plan, because it already has values for response") } @@ -110,8 +106,7 @@ impl QueryPlan { pub fn print(&self, pretty: bool) -> Result { match self { - QueryPlan::DataFusionSelect(plan, _) - | QueryPlan::CreateTempTable(plan, _, _, _) => { + QueryPlan::DataFusionSelect(plan, _) | QueryPlan::CreateTempTable(plan, _, _, _) => { if pretty { Ok(plan.display_indent().to_string()) } else { diff --git a/rust/cubesql/cubesql/src/compile/query_engine.rs b/rust/cubesql/cubesql/src/compile/query_engine.rs index 82a7aa40661da..c7ba608a36e3c 100644 --- a/rust/cubesql/cubesql/src/compile/query_engine.rs +++ b/rust/cubesql/cubesql/src/compile/query_engine.rs @@ -253,13 +253,7 @@ pub trait QueryEngine { qtrace.set_best_plan_and_cube_scans(&rewrite_plan); } - Ok(( - QueryPlan::DataFusionSelect( - rewrite_plan, - ctx, - ), - metadata - )) + Ok((QueryPlan::DataFusionSelect(rewrite_plan, ctx), metadata)) } fn evaluate_wrapped_sql( @@ -480,7 +474,8 @@ impl QueryEngine for SqlQueryEngine { stmt: &Self::AstStatementType, ) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError> { let df_query_planner = SqlToRel::new_with_options(cube_ctx, true); - let plan = df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())))?; + let plan = + df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())))?; Ok((plan, ())) }