Skip to content

Commit 49ea3cf

Browse files
authored
feat(cubesql): ORDER BY SQL push down support (#7115)
1 parent f4890d7 commit 49ea3cf

File tree

6 files changed

+315
-31
lines changed

6 files changed

+315
-31
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,7 @@ class BaseQuery {
15421542
let index;
15431543

15441544
index = this.dimensionsForSelect().findIndex(
1545-
d => equalIgnoreCase(d.dimension, id)
1545+
d => equalIgnoreCase(d.dimension, id) || equalIgnoreCase(d.expressionName, id)
15461546
);
15471547

15481548
if (index > -1) {
@@ -2415,13 +2415,15 @@ class BaseQuery {
24152415
select: 'SELECT {{ select_concat | map(attribute=\'aliased\') | join(\', \') }} \n' +
24162416
'FROM (\n {{ from }}\n) AS {{ from_alias }} \n' +
24172417
'{% if group_by %} GROUP BY {{ group_by | map(attribute=\'index\') | join(\', \') }}{% endif %}' +
2418+
'{% if order_by %} ORDER BY {{ order_by | map(attribute=\'expr\') | join(\', \') }}{% endif %}' +
24182419
'{% if limit %}\nLIMIT {{ limit }}{% endif %}' +
24192420
'{% if offset %}\nOFFSET {{ offset }}{% endif %}',
24202421
},
24212422
expressions: {
24222423
column_aliased: '{{expr}} {{quoted_alias}}',
24232424
case: 'CASE {% if expr %}{{ expr }} {% endif %}{% for when, then in when_then %}WHEN {{ when }} THEN {{ then }}{% endfor %}{% if else_expr %} ELSE {{ else_expr }}{% endif %} END',
2424-
binary: '{{ left }} {{ op }} {{ right }}'
2425+
binary: '{{ left }} {{ op }} {{ right }}',
2426+
sort: '{{ expr }} {% if asc %}ASC{% else %}DESC{% endif %}{% if nulls_first %} NULLS FIRST{% endif %}',
24252427
},
24262428
quotes: {
24272429
identifiers: '"',

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ impl CubeScanWrapperNode {
131131
fn expr_name(e: &Expr, schema: &Arc<DFSchema>) -> Result<String> {
132132
match e {
133133
Expr::Column(col) => Ok(col.name.clone()),
134+
Expr::Sort { expr, .. } => expr_name(expr, schema),
134135
_ => e.name(schema),
135136
}
136137
}
@@ -276,7 +277,7 @@ impl CubeScanWrapperNode {
276277
having_expr: _having_expr,
277278
limit,
278279
offset,
279-
order_expr: _order_expr,
280+
order_expr,
280281
alias,
281282
ungrouped,
282283
}) = wrapped_select_node
@@ -368,7 +369,7 @@ impl CubeScanWrapperNode {
368369
let (projection, sql) = Self::generate_column_expr(
369370
plan.clone(),
370371
schema.clone(),
371-
projection_expr,
372+
projection_expr.clone(),
372373
sql,
373374
generator.clone(),
374375
&column_remapping,
@@ -381,7 +382,7 @@ impl CubeScanWrapperNode {
381382
let (group_by, sql) = Self::generate_column_expr(
382383
plan.clone(),
383384
schema.clone(),
384-
group_expr,
385+
group_expr.clone(),
385386
sql,
386387
generator.clone(),
387388
&column_remapping,
@@ -391,10 +392,23 @@ impl CubeScanWrapperNode {
391392
ungrouped_scan_node.clone(),
392393
)
393394
.await?;
394-
let (aggregate, mut sql) = Self::generate_column_expr(
395+
let (aggregate, sql) = Self::generate_column_expr(
395396
plan.clone(),
396397
schema.clone(),
397-
aggr_expr,
398+
aggr_expr.clone(),
399+
sql,
400+
generator.clone(),
401+
&column_remapping,
402+
&mut next_remapping,
403+
alias.clone(),
404+
can_rename_columns,
405+
ungrouped_scan_node.clone(),
406+
)
407+
.await?;
408+
let (order, mut sql) = Self::generate_column_expr(
409+
plan.clone(),
410+
schema.clone(),
411+
order_expr.clone(),
398412
sql,
399413
generator.clone(),
400414
&column_remapping,
@@ -437,6 +451,60 @@ impl CubeScanWrapperNode {
437451
})
438452
.collect::<Result<_>>()?,
439453
);
454+
if !order_expr.is_empty() {
455+
load_request.order = Some(
456+
order_expr
457+
.iter()
458+
.map(|o| -> Result<_> { match o {
459+
Expr::Sort {
460+
expr,
461+
asc,
462+
..
463+
} => {
464+
let col_name = expr_name(&expr, &schema)?;
465+
let aliased_column = aggr_expr
466+
.iter()
467+
.find_position(|e| {
468+
expr_name(e, &schema).map(|n| &n == &col_name).unwrap_or(false)
469+
})
470+
.map(|(i, _)| aggregate[i].clone()).or_else(|| {
471+
projection_expr
472+
.iter()
473+
.find_position(|e| {
474+
expr_name(e, &schema).map(|n| &n == &col_name).unwrap_or(false)
475+
})
476+
.map(|(i, _)| {
477+
projection[i].clone()
478+
})
479+
}).or_else(|| {
480+
group_expr
481+
.iter()
482+
.find_position(|e| {
483+
expr_name(e, &schema).map(|n| &n == &col_name).unwrap_or(false)
484+
})
485+
.map(|(i, _)| group_by[i].clone())
486+
}).ok_or_else(|| {
487+
DataFusionError::Execution(format!(
488+
"Can't find column {} in projection {:?} or aggregate {:?} or group {:?}",
489+
col_name,
490+
projection,
491+
aggregate,
492+
group_by
493+
))
494+
})?;
495+
Ok(vec![
496+
aliased_column.alias.clone(),
497+
if *asc { "asc".to_string() } else { "desc".to_string() },
498+
])
499+
}
500+
_ => Err(DataFusionError::Execution(format!(
501+
"Expected sort expression, found {:?}",
502+
o
503+
))),
504+
}})
505+
.collect::<Result<Vec<_>>>()?,
506+
);
507+
}
440508
load_request.ungrouped =
441509
if let WrappedSelectType::Projection = select_type {
442510
load_request.ungrouped.clone()
@@ -479,7 +547,7 @@ impl CubeScanWrapperNode {
479547
from_alias.unwrap_or("".to_string()),
480548
None,
481549
None,
482-
Vec::new(),
550+
order,
483551
limit,
484552
offset,
485553
)
@@ -841,7 +909,33 @@ impl CubeScanWrapperNode {
841909
}
842910
// Expr::Cast { .. } => {}
843911
// Expr::TryCast { .. } => {}
844-
// Expr::Sort { .. } => {}
912+
Expr::Sort {
913+
expr,
914+
asc,
915+
nulls_first,
916+
} => {
917+
let (expr, sql_query) = Self::generate_sql_for_expr(
918+
plan.clone(),
919+
sql_query,
920+
sql_generator.clone(),
921+
*expr,
922+
ungrouped_scan_node.clone(),
923+
)
924+
.await?;
925+
let resulting_sql = Self::escape_interpolation_quotes(
926+
sql_generator
927+
.get_sql_templates()
928+
.sort_expr(expr, asc, nulls_first)
929+
.map_err(|e| {
930+
DataFusionError::Internal(format!(
931+
"Can't generate SQL for sort expr: {}",
932+
e
933+
))
934+
})?,
935+
ungrouped_scan_node.is_some(),
936+
);
937+
Ok((resulting_sql, sql_query))
938+
}
845939
// Expr::ScalarUDF { .. } => {}
846940
// Expr::TableUDF { .. } => {}
847941
Expr::Literal(literal) => {

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18409,6 +18409,35 @@ ORDER BY \"COUNT(count)\" DESC"
1840918409
);
1841018410
}
1841118411

18412+
#[tokio::test]
18413+
async fn test_case_wrapper_ungrouped_sorted() {
18414+
if !Rewriter::sql_push_down_enabled() {
18415+
return;
18416+
}
18417+
init_logger();
18418+
18419+
let query_plan = convert_select_to_query_plan(
18420+
"SELECT CASE WHEN customer_gender = 'female' THEN 'f' ELSE 'm' END, AVG(avgPrice) mp FROM KibanaSampleDataEcommerce a GROUP BY 1 ORDER BY 1 DESC"
18421+
.to_string(),
18422+
DatabaseProtocol::PostgreSQL,
18423+
)
18424+
.await;
18425+
18426+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
18427+
println!(
18428+
"Physical plan: {}",
18429+
displayable(physical_plan.as_ref()).indent()
18430+
);
18431+
18432+
let logical_plan = query_plan.as_logical_plan();
18433+
assert!(logical_plan
18434+
.find_cube_scan_wrapper()
18435+
.wrapped_sql
18436+
.unwrap()
18437+
.sql
18438+
.contains("ORDER BY"));
18439+
}
18440+
1841218441
#[tokio::test]
1841318442
async fn test_case_wrapper_with_limit() {
1841418443
if !Rewriter::sql_push_down_enabled() {

0 commit comments

Comments
 (0)