Skip to content

Commit 96c1549

Browse files
MazterQyoumwillsey
andcommitted
feat(cubesql): Flatten list expression rewrites to improve performance
Co-authored-by: Max Willsey <[email protected]>
1 parent 8b0a056 commit 96c1549

File tree

30 files changed

+1873
-715
lines changed

30 files changed

+1873
-715
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesql/cubesql/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ nanoid = "0.3.0"
4848
tokio-util = { version = "0.6.2", features=["compat"] }
4949
comfy-table = "7.1.0"
5050
bitflags = "1.3.2"
51-
egg = {rev = "7e60716cc757448bd672f2dc28ef9a0d074dce71", git = "https://github.com/egraphs-good/egg.git"}
51+
egg = { rev = "bdf05cee0a145a524fe8c6c33aa577ac50ace7c9", git = "https://github.com/cube-js/egg.git" }
5252
paste = "1.0.6"
5353
csv = "1.1.6"
5454
tracing = { version = "0.1.40", features = ["async-await"] }

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl QueryPlanner {
113113
stmt: &ast::Statement,
114114
qtrace: &mut Option<Qtrace>,
115115
span_id: Option<Arc<SpanId>>,
116+
flat_list: bool,
116117
) -> CompilationResult<QueryPlan> {
117118
let planning_start = SystemTime::now();
118119
if let Some(span_id) = span_id.as_ref() {
@@ -134,7 +135,7 @@ impl QueryPlanner {
134135
}
135136
}
136137
let result = self
137-
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
138+
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone(), flat_list)
138139
.await?;
139140

140141
if let Some(span_id) = span_id.as_ref() {
@@ -165,16 +166,20 @@ impl QueryPlanner {
165166
stmt: &ast::Statement,
166167
qtrace: &mut Option<Qtrace>,
167168
span_id: Option<Arc<SpanId>>,
169+
flat_list: bool,
168170
) -> CompilationResult<QueryPlan> {
169171
let plan = match (stmt, &self.state.protocol) {
170172
(ast::Statement::Query(q), _) => {
171173
if let ast::SetExpr::Select(select) = &q.body {
172174
if let Some(into) = &select.into {
173-
return self.select_into_to_plan(into, q, qtrace, span_id).await;
175+
return self
176+
.select_into_to_plan(into, q, qtrace, span_id, flat_list)
177+
.await;
174178
}
175179
}
176180

177-
self.select_to_plan(stmt, qtrace, span_id.clone()).await
181+
self.select_to_plan(stmt, qtrace, span_id.clone(), flat_list)
182+
.await
178183
}
179184
(ast::Statement::SetTransaction { .. }, _) => Ok(QueryPlan::MetaTabular(
180185
StatusFlags::empty(),
@@ -204,10 +209,12 @@ impl QueryPlanner {
204209
self.set_variable_to_plan(&key_values).await
205210
}
206211
(ast::Statement::ShowVariable { variable }, _) => {
207-
self.show_variable_to_plan(variable, span_id.clone()).await
212+
self.show_variable_to_plan(variable, span_id.clone(), flat_list)
213+
.await
208214
}
209215
(ast::Statement::ShowVariables { filter }, DatabaseProtocol::MySQL) => {
210-
self.show_variables_to_plan(&filter, span_id.clone()).await
216+
self.show_variables_to_plan(&filter, span_id.clone(), flat_list)
217+
.await
211218
}
212219
(ast::Statement::ShowCreate { obj_name, obj_type }, DatabaseProtocol::MySQL) => {
213220
self.show_create_to_plan(&obj_name, &obj_type)
@@ -221,8 +228,15 @@ impl QueryPlanner {
221228
},
222229
DatabaseProtocol::MySQL,
223230
) => {
224-
self.show_columns_to_plan(*extended, *full, &filter, &table_name, span_id.clone())
225-
.await
231+
self.show_columns_to_plan(
232+
*extended,
233+
*full,
234+
&filter,
235+
&table_name,
236+
span_id.clone(),
237+
flat_list,
238+
)
239+
.await
226240
}
227241
(
228242
ast::Statement::ShowTables {
@@ -233,14 +247,22 @@ impl QueryPlanner {
233247
},
234248
DatabaseProtocol::MySQL,
235249
) => {
236-
self.show_tables_to_plan(*extended, *full, &filter, &db_name, span_id.clone())
237-
.await
250+
self.show_tables_to_plan(
251+
*extended,
252+
*full,
253+
&filter,
254+
&db_name,
255+
span_id.clone(),
256+
flat_list,
257+
)
258+
.await
238259
}
239260
(ast::Statement::ShowCollation { filter }, DatabaseProtocol::MySQL) => {
240-
self.show_collation_to_plan(&filter, span_id.clone()).await
261+
self.show_collation_to_plan(&filter, span_id.clone(), flat_list)
262+
.await
241263
}
242264
(ast::Statement::ExplainTable { table_name, .. }, DatabaseProtocol::MySQL) => {
243-
self.explain_table_to_plan(&table_name, span_id.clone())
265+
self.explain_table_to_plan(&table_name, span_id.clone(), flat_list)
244266
.await
245267
}
246268
(
@@ -251,7 +273,10 @@ impl QueryPlanner {
251273
..
252274
},
253275
_,
254-
) => self.explain_to_plan(&statement, *verbose, *analyze).await,
276+
) => {
277+
self.explain_to_plan(&statement, *verbose, *analyze, flat_list)
278+
.await
279+
}
255280
(ast::Statement::Use { db_name }, DatabaseProtocol::MySQL) => {
256281
self.use_to_plan(&db_name)
257282
}
@@ -304,7 +329,7 @@ impl QueryPlanner {
304329
&& *temporary =>
305330
{
306331
let stmt = ast::Statement::Query(query.clone());
307-
self.create_table_to_plan(name, &stmt, qtrace, span_id.clone())
332+
self.create_table_to_plan(name, &stmt, qtrace, span_id.clone(), flat_list)
308333
.await
309334
}
310335
(
@@ -339,6 +364,7 @@ impl QueryPlanner {
339364
&self,
340365
variable: &Vec<ast::Ident>,
341366
span_id: Option<Arc<SpanId>>,
367+
flat_list: bool,
342368
) -> CompilationResult<QueryPlan> {
343369
let name = variable.to_vec()[0].value.clone();
344370
if self.state.protocol == DatabaseProtocol::PostgreSQL {
@@ -366,7 +392,7 @@ impl QueryPlanner {
366392
)?
367393
};
368394

369-
self.create_df_logical_plan(stmt, &mut None, span_id.clone())
395+
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
370396
.await
371397
} else if name.eq_ignore_ascii_case("databases") || name.eq_ignore_ascii_case("schemas") {
372398
Ok(QueryPlan::MetaTabular(
@@ -399,7 +425,7 @@ impl QueryPlanner {
399425
&mut None,
400426
)?;
401427

402-
self.create_df_logical_plan(stmt, &mut None, span_id.clone())
428+
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
403429
.await
404430
} else if name.eq_ignore_ascii_case("warnings") {
405431
Ok(QueryPlan::MetaTabular(
@@ -432,6 +458,7 @@ impl QueryPlanner {
432458
},
433459
&mut None,
434460
span_id.clone(),
461+
flat_list,
435462
)
436463
.await
437464
}
@@ -441,6 +468,7 @@ impl QueryPlanner {
441468
&self,
442469
filter: &Option<ast::ShowStatementFilter>,
443470
span_id: Option<Arc<SpanId>>,
471+
flat_list: bool,
444472
) -> Result<QueryPlan, CompilationError> {
445473
let filter = match filter {
446474
Some(stmt @ ast::ShowStatementFilter::Like(_)) => {
@@ -467,7 +495,7 @@ impl QueryPlanner {
467495
&mut None,
468496
)?;
469497

470-
self.create_df_logical_plan(stmt, &mut None, span_id.clone())
498+
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
471499
.await
472500
}
473501

@@ -539,6 +567,7 @@ impl QueryPlanner {
539567
filter: &Option<ast::ShowStatementFilter>,
540568
table_name: &ast::ObjectName,
541569
span_id: Option<Arc<SpanId>>,
570+
flat_list: bool,
542571
) -> Result<QueryPlan, CompilationError> {
543572
let extended = match extended {
544573
false => "".to_string(),
@@ -600,7 +629,7 @@ impl QueryPlanner {
600629
&mut None,
601630
)?;
602631

603-
self.create_df_logical_plan(stmt, &mut None, span_id.clone())
632+
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
604633
.await
605634
}
606635

@@ -612,6 +641,7 @@ impl QueryPlanner {
612641
filter: &Option<ast::ShowStatementFilter>,
613642
db_name: &Option<ast::Ident>,
614643
span_id: Option<Arc<SpanId>>,
644+
flat_list: bool,
615645
) -> Result<QueryPlan, CompilationError> {
616646
let db_name = match db_name {
617647
Some(db_name) => db_name.clone(),
@@ -660,14 +690,15 @@ WHERE `TABLE_SCHEMA` = '{}'",
660690
&mut None,
661691
)?;
662692

663-
self.create_df_logical_plan(stmt, &mut None, span_id.clone())
693+
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
664694
.await
665695
}
666696

667697
async fn show_collation_to_plan(
668698
&self,
669699
filter: &Option<ast::ShowStatementFilter>,
670700
span_id: Option<Arc<SpanId>>,
701+
flat_list: bool,
671702
) -> Result<QueryPlan, CompilationError> {
672703
let filter = match filter {
673704
Some(stmt @ ast::ShowStatementFilter::Like(_)) => {
@@ -695,17 +726,18 @@ WHERE `TABLE_SCHEMA` = '{}'",
695726
&mut None,
696727
)?;
697728

698-
self.create_df_logical_plan(stmt, &mut None, span_id.clone())
729+
self.create_df_logical_plan(stmt, &mut None, span_id.clone(), flat_list)
699730
.await
700731
}
701732

702733
async fn explain_table_to_plan(
703734
&self,
704735
table_name: &ast::ObjectName,
705736
span_id: Option<Arc<SpanId>>,
737+
flat_list: bool,
706738
) -> Result<QueryPlan, CompilationError> {
707739
// EXPLAIN <table> matches the SHOW COLUMNS output exactly, reuse the plan
708-
self.show_columns_to_plan(false, false, &None, table_name, span_id)
740+
self.show_columns_to_plan(false, false, &None, table_name, span_id, flat_list)
709741
.await
710742
}
711743

@@ -714,14 +746,17 @@ WHERE `TABLE_SCHEMA` = '{}'",
714746
statement: &Box<ast::Statement>,
715747
verbose: bool,
716748
analyze: bool,
749+
flat_list: bool,
717750
) -> Pin<Box<dyn Future<Output = Result<QueryPlan, CompilationError>> + Send>> {
718751
let self_cloned = self.clone();
719752

720753
let statement = statement.clone();
721754
// This Boxing construct here because of recursive call to self.plan()
722755
Box::pin(async move {
723756
// TODO span_id ?
724-
let plan = self_cloned.plan(&statement, &mut None, None).await?;
757+
let plan = self_cloned
758+
.plan(&statement, &mut None, None, flat_list)
759+
.await?;
725760

726761
match plan {
727762
QueryPlan::MetaOk(_, _) | QueryPlan::MetaTabular(_, _) => Ok(QueryPlan::MetaTabular(
@@ -995,8 +1030,11 @@ WHERE `TABLE_SCHEMA` = '{}'",
9951030
stmt: &ast::Statement,
9961031
qtrace: &mut Option<Qtrace>,
9971032
span_id: Option<Arc<SpanId>>,
1033+
flat_list: bool,
9981034
) -> Result<QueryPlan, CompilationError> {
999-
let plan = self.select_to_plan(stmt, qtrace, span_id).await?;
1035+
let plan = self
1036+
.select_to_plan(stmt, qtrace, span_id, flat_list)
1037+
.await?;
10001038
let QueryPlan::DataFusionSelect(flags, plan, ctx) = plan else {
10011039
return Err(CompilationError::internal(
10021040
"unable to build DataFusion plan from Query".to_string(),
@@ -1024,6 +1062,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
10241062
query: &Box<ast::Query>,
10251063
qtrace: &mut Option<Qtrace>,
10261064
span_id: Option<Arc<SpanId>>,
1065+
flat_list: bool,
10271066
) -> Result<QueryPlan, CompilationError> {
10281067
if !into.temporary || !into.table {
10291068
return Err(CompilationError::unsupported(
@@ -1040,7 +1079,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
10401079
));
10411080
}
10421081
let new_stmt = ast::Statement::Query(new_query);
1043-
self.create_table_to_plan(&into.name, &new_stmt, qtrace, span_id)
1082+
self.create_table_to_plan(&into.name, &new_stmt, qtrace, span_id, flat_list)
10441083
.await
10451084
}
10461085

@@ -1240,6 +1279,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
12401279
stmt: ast::Statement,
12411280
qtrace: &mut Option<Qtrace>,
12421281
span_id: Option<Arc<SpanId>>,
1282+
flat_list: bool,
12431283
) -> CompilationResult<QueryPlan> {
12441284
self.reauthenticate_if_needed().await?;
12451285

@@ -1310,7 +1350,7 @@ WHERE `TABLE_SCHEMA` = '{}'",
13101350
log::debug!("Initial Plan: {:#?}", optimized_plan);
13111351

13121352
let cube_ctx = Arc::new(cube_ctx);
1313-
let mut converter = LogicalPlanToLanguageConverter::new(cube_ctx.clone());
1353+
let mut converter = LogicalPlanToLanguageConverter::new(cube_ctx.clone(), flat_list);
13141354
let mut query_params = Some(HashMap::new());
13151355
let root = converter
13161356
.add_logical_plan_replace_params(
@@ -1532,7 +1572,8 @@ pub async fn convert_statement_to_cube_query(
15321572
}
15331573

15341574
let planner = QueryPlanner::new(session.state.clone(), meta, session.session_manager.clone());
1535-
planner.plan(&stmt, qtrace, span_id).await
1575+
let flat_list = session.server.config_obj.push_down_pull_up_split();
1576+
planner.plan(&stmt, qtrace, span_id, flat_list).await
15361577
}
15371578

15381579
#[derive(Debug, PartialEq, Serialize)]

0 commit comments

Comments
 (0)