Skip to content

Commit 1b5c19f

Browse files
committed
feat(cubesql): Support LIMIT for SQL push down
1 parent 29080d7 commit 1b5c19f

File tree

6 files changed

+160
-15
lines changed

6 files changed

+160
-15
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2401,7 +2401,9 @@ class BaseQuery {
24012401
statements: {
24022402
select: 'SELECT {{ select_concat | map(attribute=\'aliased\') | join(\', \') }} \n' +
24032403
'FROM (\n {{ from }}\n) AS {{ from_alias }} \n' +
2404-
'{% if group_by %} GROUP BY {{ group_by | map(attribute=\'index\') | join(\', \') }}{% endif %}',
2404+
'{% if group_by %} GROUP BY {{ group_by | map(attribute=\'index\') | join(\', \') }}{% endif %}' +
2405+
'{% if limit %}\nLIMIT {{ limit }}{% endif %}' +
2406+
'{% if offset %}\nOFFSET {{ offset }}{% endif %}',
24052407
},
24062408
expressions: {
24072409
column_aliased: '{{expr}} {{quoted_alias}}',

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,8 @@ impl CubeScanWrapperNode {
264264
joins: _joins,
265265
filter_expr: _filter_expr,
266266
having_expr: _having_expr,
267-
limit: _limit,
268-
offset: _offset,
267+
limit,
268+
offset,
269269
order_expr: _order_expr,
270270
alias,
271271
}) = wrapped_select_node
@@ -306,6 +306,7 @@ impl CubeScanWrapperNode {
306306
generator.clone(),
307307
&column_remapping,
308308
&mut next_remapping,
309+
alias.clone(),
309310
can_rename_columns,
310311
)
311312
.await?;
@@ -317,6 +318,7 @@ impl CubeScanWrapperNode {
317318
generator.clone(),
318319
&column_remapping,
319320
&mut next_remapping,
321+
alias.clone(),
320322
can_rename_columns,
321323
)
322324
.await?;
@@ -328,6 +330,7 @@ impl CubeScanWrapperNode {
328330
generator.clone(),
329331
&column_remapping,
330332
&mut next_remapping,
333+
alias.clone(),
331334
can_rename_columns,
332335
)
333336
.await?;
@@ -343,6 +346,8 @@ impl CubeScanWrapperNode {
343346
None,
344347
None,
345348
Vec::new(),
349+
limit,
350+
offset,
346351
)
347352
.map_err(|e| {
348353
DataFusionError::Internal(format!(
@@ -393,14 +398,14 @@ impl CubeScanWrapperNode {
393398
generator: Arc<dyn SqlGenerator>,
394399
column_remapping: &Option<HashMap<Column, Column>>,
395400
next_remapping: &mut HashMap<Column, Column>,
401+
from_alias: Option<String>,
396402
can_rename_columns: bool,
397403
) -> result::Result<(Vec<AliasedColumn>, SqlQuery), CubeError> {
398404
let non_id_regex = Regex::new(r"[^a-zA-Z0-9_]")
399405
.map_err(|e| CubeError::internal(format!("Can't parse regex: {}", e)))?;
400406
let mut aliased_columns = Vec::new();
401-
for expr in exprs {
407+
for original_expr in exprs {
402408
let expr = if let Some(column_remapping) = column_remapping.as_ref() {
403-
let original_expr = expr;
404409
let mut expr = replace_col(
405410
original_expr.clone(),
406411
&column_remapping.iter().map(|(k, v)| (k, v)).collect(),
@@ -419,17 +424,17 @@ impl CubeScanWrapperNode {
419424
}
420425
expr
421426
} else {
422-
expr
427+
original_expr.clone()
423428
};
424429
let (expr_sql, new_sql_query) =
425430
Self::generate_sql_for_expr(plan.clone(), sql, generator.clone(), expr.clone())
426431
.await?;
427432
sql = new_sql_query;
428433

429-
let original_alias = expr_name(&expr, &schema)?;
434+
let original_alias = expr_name(&original_expr, &schema)?;
430435
let alias = if can_rename_columns {
431-
let mut truncated_alias =
432-
non_id_regex.replace_all(&original_alias, "_").to_string();
436+
let alias = expr_name(&expr, &schema)?;
437+
let mut truncated_alias = non_id_regex.replace_all(&alias, "_").to_string();
433438
truncated_alias.truncate(16);
434439
let mut alias = truncated_alias.clone();
435440
for i in 1..10000 {
@@ -448,6 +453,16 @@ impl CubeScanWrapperNode {
448453
Column::from_name(&original_alias),
449454
Column::from_name(&alias),
450455
);
456+
next_remapping.insert(
457+
Column {
458+
name: original_alias.clone(),
459+
relation: from_alias.clone(),
460+
},
461+
Column {
462+
name: alias.clone(),
463+
relation: from_alias.clone(),
464+
},
465+
);
451466
} else {
452467
return Err(CubeError::internal(format!(
453468
"Can't generate SQL for column expr: duplicate alias {}",

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17986,6 +17986,39 @@ ORDER BY \"COUNT(count)\" DESC"
1798617986
);
1798717987
}
1798817988

17989+
#[tokio::test]
17990+
async fn test_case_wrapper_with_limit() {
17991+
init_logger();
17992+
17993+
let query_plan = convert_select_to_query_plan(
17994+
"SELECT * FROM (SELECT CASE WHEN customer_gender = 'female' THEN 'f' ELSE 'm' END, MIN(avgPrice) mp FROM (SELECT avgPrice, customer_gender FROM KibanaSampleDataEcommerce LIMIT 1) a GROUP BY 1) q LIMIT 1123"
17995+
.to_string(),
17996+
DatabaseProtocol::PostgreSQL,
17997+
)
17998+
.await;
17999+
18000+
let logical_plan = query_plan.as_logical_plan();
18001+
assert!(logical_plan
18002+
.find_cube_scan_wrapper()
18003+
.wrapped_sql
18004+
.unwrap()
18005+
.sql
18006+
.contains("CASE WHEN"));
18007+
18008+
assert!(logical_plan
18009+
.find_cube_scan_wrapper()
18010+
.wrapped_sql
18011+
.unwrap()
18012+
.sql
18013+
.contains("LIMIT 1123"));
18014+
18015+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
18016+
println!(
18017+
"Physical plan: {}",
18018+
displayable(physical_plan.as_ref()).indent()
18019+
);
18020+
}
18021+
1798918022
#[tokio::test]
1799018023
async fn test_thoughtspot_pg_date_trunc_year() {
1799118024
init_logger();

rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper.rs

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
agg_fun_expr, aggregate, alias_expr,
66
analysis::LogicalPlanAnalysis,
77
binary_expr, case_expr_var_arg, column_expr, cube_scan, cube_scan_wrapper,
8-
fun_expr_var_arg, literal_expr, projection, rewrite,
8+
fun_expr_var_arg, limit, literal_expr, projection, rewrite,
99
rewriter::RewriteRules,
1010
rules::{replacer_pull_up_node, replacer_push_down_node},
1111
scalar_fun_expr_args, scalar_fun_expr_args_empty_tail, transforming_rewrite,
@@ -14,8 +14,9 @@ use crate::{
1414
wrapped_select_having_expr_empty_tail, wrapped_select_joins_empty_tail,
1515
wrapped_select_order_expr_empty_tail, wrapped_select_projection_expr_empty_tail,
1616
wrapper_pullup_replacer, wrapper_pushdown_replacer, AggregateFunctionExprDistinct,
17-
AggregateFunctionExprFun, CubeScanAliasToCube, LogicalPlanLanguage, ProjectionAlias,
18-
ScalarFunctionExprFun, WrappedSelectAlias, WrappedSelectSelectType, WrappedSelectType,
17+
AggregateFunctionExprFun, CubeScanAliasToCube, LimitFetch, LimitSkip,
18+
LogicalPlanLanguage, ProjectionAlias, ScalarFunctionExprFun, WrappedSelectAlias,
19+
WrappedSelectLimit, WrappedSelectOffset, WrappedSelectSelectType, WrappedSelectType,
1920
WrapperPullupReplacerAliasToCube,
2021
},
2122
},
@@ -263,6 +264,60 @@ impl RewriteRules for WrapperRules {
263264
),
264265
self.transform_projection("?projection_alias", "?select_alias"),
265266
),
267+
// Limit
268+
transforming_rewrite(
269+
"wrapper-push-down-limit-to-cube-scan",
270+
limit(
271+
"?offset",
272+
"?limit",
273+
cube_scan_wrapper(
274+
wrapper_pullup_replacer(
275+
wrapped_select(
276+
"?select_type",
277+
"?projection_expr",
278+
"?group_expr",
279+
"?aggr_expr",
280+
"?cube_scan_input",
281+
"?joins",
282+
"?filter_expr",
283+
"?having_expr",
284+
"WrappedSelectLimit:None",
285+
"WrappedSelectOffset:None",
286+
"?order_expr",
287+
"?select_alias",
288+
),
289+
"?alias_to_cube",
290+
),
291+
"CubeScanWrapperFinalized:false".to_string(),
292+
),
293+
),
294+
cube_scan_wrapper(
295+
wrapper_pullup_replacer(
296+
wrapped_select(
297+
"?select_type",
298+
"?projection_expr",
299+
"?group_expr",
300+
"?aggr_expr",
301+
"?cube_scan_input",
302+
"?joins",
303+
"?filter_expr",
304+
"?having_expr",
305+
"?wrapped_select_limit",
306+
"?wrapped_select_offset",
307+
"?order_expr",
308+
"?select_alias",
309+
),
310+
"?alias_to_cube",
311+
),
312+
"CubeScanWrapperFinalized:false",
313+
),
314+
self.transform_limit(
315+
"?limit",
316+
"?offset",
317+
"?wrapped_select_limit",
318+
"?wrapped_select_offset",
319+
),
320+
),
266321
// Aggregate function
267322
rewrite(
268323
"wrapper-push-down-aggregate-function",
@@ -547,6 +602,40 @@ impl WrapperRules {
547602
}
548603
}
549604

605+
fn transform_limit(
606+
&self,
607+
limit_var: &'static str,
608+
offset_var: &'static str,
609+
wrapped_select_limit_var: &'static str,
610+
wrapped_select_offset_var: &'static str,
611+
) -> impl Fn(&mut EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, &mut Subst) -> bool {
612+
let limit_var = var!(limit_var);
613+
let offset_var = var!(offset_var);
614+
let wrapped_select_limit_var = var!(wrapped_select_limit_var);
615+
let wrapped_select_offset_var = var!(wrapped_select_offset_var);
616+
move |egraph, subst| {
617+
for limit in var_iter!(egraph[subst[limit_var]], LimitFetch).cloned() {
618+
for offset in var_iter!(egraph[subst[offset_var]], LimitSkip).cloned() {
619+
subst.insert(
620+
wrapped_select_limit_var,
621+
egraph.add(LogicalPlanLanguage::WrappedSelectLimit(WrappedSelectLimit(
622+
limit,
623+
))),
624+
);
625+
626+
subst.insert(
627+
wrapped_select_offset_var,
628+
egraph.add(LogicalPlanLanguage::WrappedSelectOffset(
629+
WrappedSelectOffset(offset),
630+
)),
631+
);
632+
return true;
633+
}
634+
}
635+
false
636+
}
637+
}
638+
550639
fn transform_agg_fun_expr(
551640
&self,
552641
fun_var: &'static str,

rust/cubesql/cubesql/src/compile/test/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,10 @@ pub fn get_test_tenant_ctx() -> Arc<MetaContext> {
210210
(
211211
"statements/select".to_string(),
212212
r#"SELECT {{ select_concat | map(attribute='aliased') | join(', ') }}
213-
FROM ({{ from }}) AS {{ from_alias }}
214-
{% if group_by %} GROUP BY {{ group_by | map(attribute='index') | join(', ') }}{% endif %}"#.to_string(),
213+
FROM ({{ from }}) AS {{ from_alias }}
214+
{% if group_by %} GROUP BY {{ group_by | map(attribute='index') | join(', ') }}{% endif %}{% if limit %}
215+
LIMIT {{ limit }}{% endif %}{% if offset %}
216+
OFFSET {{ offset }}{% endif %}"#.to_string(),
215217
),
216218
(
217219
"expressions/column_aliased".to_string(),

rust/cubesql/cubesql/src/transport/service.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ impl SqlTemplates {
291291
_filter: Option<String>,
292292
_having: Option<String>,
293293
_order_by: Vec<AliasedColumn>,
294+
limit: Option<usize>,
295+
offset: Option<usize>,
294296
) -> Result<String, CubeError> {
295297
let group_by = self.to_template_columns(group_by)?;
296298
let aggregate = self.to_template_columns(aggregate)?;
@@ -309,7 +311,9 @@ impl SqlTemplates {
309311
group_by => group_by,
310312
aggregate => aggregate,
311313
projection => projection,
312-
from_alias => alias
314+
from_alias => alias,
315+
limit => limit,
316+
offset => offset,
313317
},
314318
)
315319
}

0 commit comments

Comments
 (0)