Skip to content

Commit c65f4ed

Browse files
committed
chore(cubesql): QueryEngine - support custom metadata for QueryPlan
1 parent 70a36d8 commit c65f4ed

File tree

6 files changed

+48
-44
lines changed

6 files changed

+48
-44
lines changed

rust/cubesql/cubesql/src/compile/plan.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ pub enum QueryPlan {
4545
MetaOk(StatusFlags, CommandCompletion),
4646
MetaTabular(StatusFlags, Box<dataframe::DataFrame>),
4747
// Query will be executed via Data Fusion
48-
DataFusionSelect(StatusFlags, LogicalPlan, DFSessionContext),
48+
DataFusionSelect(LogicalPlan, DFSessionContext),
4949
// Query will be executed via DataFusion and saved to session
5050
CreateTempTable(
51-
StatusFlags,
5251
LogicalPlan,
5352
DFSessionContext,
5453
String,
@@ -70,17 +69,11 @@ impl fmt::Debug for QueryPlan {
7069
flags
7170
))
7271
},
73-
QueryPlan::DataFusionSelect(flags, _, _) => {
74-
f.write_str(&format!(
75-
"DataFusionSelect(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden)",
76-
flags
77-
))
72+
QueryPlan::DataFusionSelect(_, _) => {
73+
f.write_str(&"DataFusionSelect(LogicalPlan: hidden, DFSessionContext: hidden)")
7874
},
79-
QueryPlan::CreateTempTable(flags, _, _, name, _) => {
80-
f.write_str(&format!(
81-
"CreateTempTable(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden, Name: {:?}, SessionState: hidden",
82-
flags, name
83-
))
75+
QueryPlan::CreateTempTable(_, _, name, _) => {
76+
f.write_str(&"CreateTempTable(StatusFlags: {:?}, LogicalPlan: hidden, DFSessionContext: hidden, Name: {:?}, SessionState: hidden")
8477
},
8578
}
8679
}
@@ -89,8 +82,8 @@ impl fmt::Debug for QueryPlan {
8982
impl QueryPlan {
9083
pub fn as_logical_plan(&self) -> LogicalPlan {
9184
match self {
92-
QueryPlan::DataFusionSelect(_, plan, _)
93-
| QueryPlan::CreateTempTable(_, plan, _, _, _) => plan.clone(),
85+
QueryPlan::DataFusionSelect(plan, _)
86+
| QueryPlan::CreateTempTable(plan, _, _, _) => plan.clone(),
9487
QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => {
9588
panic!("This query doesnt have a plan, because it already has values for response")
9689
}
@@ -99,8 +92,8 @@ impl QueryPlan {
9992

10093
pub async fn as_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
10194
match self {
102-
QueryPlan::DataFusionSelect(_, plan, ctx)
103-
| QueryPlan::CreateTempTable(_, plan, ctx, _, _) => {
95+
QueryPlan::DataFusionSelect(plan, ctx)
96+
| QueryPlan::CreateTempTable(plan, ctx, _, _) => {
10497
DataFrame::new(ctx.state.clone(), plan)
10598
.create_physical_plan()
10699
.await
@@ -114,8 +107,8 @@ impl QueryPlan {
114107

115108
pub fn print(&self, pretty: bool) -> Result<String, CubeError> {
116109
match self {
117-
QueryPlan::DataFusionSelect(_, plan, _)
118-
| QueryPlan::CreateTempTable(_, plan, _, _, _) => {
110+
QueryPlan::DataFusionSelect(plan, _)
111+
| QueryPlan::CreateTempTable(plan, _, _, _) => {
119112
if pretty {
120113
Ok(plan.display_indent().to_string())
121114
} else {
@@ -134,7 +127,7 @@ pub async fn get_df_batches(
134127
plan: &QueryPlan,
135128
) -> Result<Pin<Box<dyn RecordBatchStream + Send>>, CubeError> {
136129
match plan {
137-
QueryPlan::DataFusionSelect(_, plan, ctx) => {
130+
QueryPlan::DataFusionSelect(plan, ctx) => {
138131
let df = DataFrame::new(ctx.state.clone(), &plan);
139132
let safe_stream = async move {
140133
std::panic::AssertUnwindSafe(df.execute_stream())

rust/cubesql/cubesql/src/compile/query_engine.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,12 @@ use datafusion::{
4646

4747
#[async_trait::async_trait]
4848
pub trait QueryEngine {
49+
/// Custom type for AST statement type, It allows to use any parsers for SQL
4950
type AstStatementType: std::fmt::Display + Send;
5051

52+
/// Additional metadata for results of plan method instead of extending query plan
53+
type PlanMetadataType: std::fmt::Debug + Send;
54+
5155
fn compiler_cache_ref(&self) -> &Arc<dyn CompilerCache>;
5256

5357
fn transport_ref(&self) -> &Arc<dyn TransportService>;
@@ -70,7 +74,7 @@ pub trait QueryEngine {
7074
&self,
7175
cube_ctx: &CubeContext,
7276
stmt: &Self::AstStatementType,
73-
) -> Result<LogicalPlan, DataFusionError>;
77+
) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError>;
7478

7579
fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType;
7680

@@ -81,11 +85,11 @@ pub trait QueryEngine {
8185
span_id: Option<Arc<SpanId>>,
8286
meta: Arc<MetaContext>,
8387
state: Arc<SessionState>,
84-
) -> CompilationResult<QueryPlan> {
88+
) -> CompilationResult<(QueryPlan, Self::PlanMetadataType)> {
8589
let ctx = self.create_session_ctx(state.clone())?;
8690
let cube_ctx = self.create_cube_ctx(state.clone(), meta.clone(), ctx.clone())?;
8791

88-
let plan = self.create_logical_plan(&cube_ctx, &stmt).map_err(|err| {
92+
let (plan, metadata) = self.create_logical_plan(&cube_ctx, &stmt).map_err(|err| {
8993
let message = format!("Initial planning error: {}", err,);
9094
let meta = Some(HashMap::from([
9195
("query".to_string(), stmt.to_string()),
@@ -249,10 +253,12 @@ pub trait QueryEngine {
249253
qtrace.set_best_plan_and_cube_scans(&rewrite_plan);
250254
}
251255

252-
Ok(QueryPlan::DataFusionSelect(
253-
StatusFlags::empty(),
254-
rewrite_plan,
255-
ctx,
256+
Ok((
257+
QueryPlan::DataFusionSelect(
258+
rewrite_plan,
259+
ctx,
260+
),
261+
metadata
256262
))
257263
}
258264

@@ -308,6 +314,8 @@ impl SqlQueryEngine {
308314
impl QueryEngine for SqlQueryEngine {
309315
type AstStatementType = sqlparser::ast::Statement;
310316

317+
type PlanMetadataType = ();
318+
311319
fn create_cube_ctx(
312320
&self,
313321
state: Arc<SessionState>,
@@ -470,12 +478,11 @@ impl QueryEngine for SqlQueryEngine {
470478
&self,
471479
cube_ctx: &CubeContext,
472480
stmt: &Self::AstStatementType,
473-
) -> Result<LogicalPlan, DataFusionError> {
481+
) -> Result<(LogicalPlan, Self::PlanMetadataType), DataFusionError> {
474482
let df_query_planner = SqlToRel::new_with_options(cube_ctx, true);
475-
let plan =
476-
df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())));
483+
let plan = df_query_planner.statement_to_plan(DFStatement::Statement(Box::new(stmt.clone())))?;
477484

478-
plan
485+
Ok((plan, ()))
479486
}
480487

481488
fn compiler_cache_ref(&self) -> &Arc<dyn CompilerCache> {

rust/cubesql/cubesql/src/compile/router.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,8 @@ impl QueryRouter {
356356
)])],
357357
)),
358358
)),
359-
QueryPlan::DataFusionSelect(flags, plan, context)
360-
| QueryPlan::CreateTempTable(flags, plan, context, _, _) => {
359+
QueryPlan::DataFusionSelect(plan, context)
360+
| QueryPlan::CreateTempTable(plan, context, _, _) => {
361361
// EXPLAIN over CREATE TABLE AS shows the SELECT query plan
362362
let plan = Arc::new(plan);
363363
let schema = LogicalPlan::explain_schema();
@@ -385,7 +385,7 @@ impl QueryRouter {
385385
})
386386
};
387387

388-
Ok(QueryPlan::DataFusionSelect(flags, explain_plan, context))
388+
Ok(QueryPlan::DataFusionSelect(explain_plan, context))
389389
}
390390
}
391391
})
@@ -596,7 +596,7 @@ impl QueryRouter {
596596
span_id: Option<Arc<SpanId>>,
597597
) -> Result<QueryPlan, CompilationError> {
598598
let plan = self.select_to_plan(stmt, qtrace, span_id).await?;
599-
let QueryPlan::DataFusionSelect(flags, plan, ctx) = plan else {
599+
let QueryPlan::DataFusionSelect(plan, ctx) = plan else {
600600
return Err(CompilationError::internal(
601601
"unable to build DataFusion plan from Query".to_string(),
602602
));
@@ -608,8 +608,8 @@ impl QueryRouter {
608608
"table name contains no ident parts".to_string(),
609609
));
610610
};
611+
611612
Ok(QueryPlan::CreateTempTable(
612-
flags,
613613
plan,
614614
ctx,
615615
table_name.value.to_string(),
@@ -708,9 +708,11 @@ impl QueryRouter {
708708
}
709709

710710
let sql_query_engine = SqlQueryEngine::new(self.session_manager.clone());
711-
sql_query_engine
711+
let (plan, _) = sql_query_engine
712712
.plan(stmt, qtrace, span_id, self.meta.clone(), self.state.clone())
713-
.await
713+
.await?;
714+
715+
Ok(plan)
714716
}
715717
}
716718

rust/cubesql/cubesql/src/compile/test/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -933,19 +933,21 @@ impl TestContext {
933933
.await
934934
.map_err(|e| CubeError::internal(format!("Error during planning: {}", e)))?;
935935
match query {
936-
QueryPlan::DataFusionSelect(flags, plan, ctx) => {
936+
QueryPlan::DataFusionSelect(plan, ctx) => {
937937
let df = DFDataFrame::new(ctx.state, &plan);
938938
let batches = df.collect().await?;
939939
let frame = batches_to_dataframe(&df.schema().into(), batches)?;
940940

941941
output.push(frame.print());
942-
output_flags = flags;
943942
}
944943
QueryPlan::MetaTabular(flags, frame) => {
945944
output.push(frame.print());
946945
output_flags = flags;
947946
}
948-
QueryPlan::MetaOk(flags, _) | QueryPlan::CreateTempTable(flags, _, _, _, _) => {
947+
QueryPlan::CreateTempTable(_, _, _, _) => {
948+
// nothing to do
949+
}
950+
QueryPlan::MetaOk(flags, _) => {
949951
output_flags = flags;
950952
}
951953
}

rust/cubesql/cubesql/src/sql/postgres/extended.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ impl Portal {
492492

493493
return;
494494
}
495-
QueryPlan::DataFusionSelect(_, plan, ctx) => {
495+
QueryPlan::DataFusionSelect(plan, ctx) => {
496496
let df = DFDataFrame::new(ctx.state.clone(), &plan);
497497
let safe_stream = async move {
498498
std::panic::AssertUnwindSafe(df.execute_stream())
@@ -511,7 +511,7 @@ impl Portal {
511511
Err(err) => return yield Err(CubeError::panic(err).into()),
512512
}
513513
}
514-
QueryPlan::CreateTempTable(_, plan, ctx, name, temp_tables) => {
514+
QueryPlan::CreateTempTable(plan, ctx, name, temp_tables) => {
515515
let df = DFDataFrame::new(ctx.state.clone(), &plan);
516516
let record_batch = df.collect();
517517
let row_count = match record_batch.await {

rust/cubesql/cubesql/src/sql/postgres/shim.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl QueryPlanExt for QueryPlan {
7272
required_format: protocol::Format,
7373
) -> Result<Option<protocol::RowDescription>, ConnectionError> {
7474
match &self {
75-
QueryPlan::MetaOk(_, _) | QueryPlan::CreateTempTable(_, _, _, _, _) => Ok(None),
75+
QueryPlan::MetaOk(_, _) | QueryPlan::CreateTempTable(_, _, _, _) => Ok(None),
7676
QueryPlan::MetaTabular(_, frame) => {
7777
let mut result = vec![];
7878

@@ -86,7 +86,7 @@ impl QueryPlanExt for QueryPlan {
8686

8787
Ok(Some(protocol::RowDescription::new(result)))
8888
}
89-
QueryPlan::DataFusionSelect(_, logical_plan, _) => {
89+
QueryPlan::DataFusionSelect(logical_plan, _) => {
9090
let mut result = vec![];
9191

9292
for field in logical_plan.schema().fields() {

0 commit comments

Comments
 (0)