Skip to content

Commit 1339f57

Browse files
authored
feat(cubesql): Tableau Standard Gregorian missing date groupings support through SQL push down and some other functions(#7172)
* feat(cubesql): Tableau Standard Gregorian missing date groupings support through SQL push down and some other functions * BigQuery fixes
1 parent 06a66c6 commit 1339f57

File tree

9 files changed

+312
-7
lines changed

9 files changed

+312
-7
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2432,6 +2432,14 @@ class BaseQuery {
24322432

24332433
COALESCE: 'COALESCE({{ args_concat }})',
24342434
CONCAT: 'CONCAT({{ args_concat }})',
2435+
FLOOR: 'FLOOR({{ args_concat }})',
2436+
CEIL: 'CEIL({{ args_concat }})',
2437+
TRUNC: 'TRUNC({{ args_concat }})',
2438+
LEAST: 'LEAST({{ args_concat }})',
2439+
LOWER: 'LOWER({{ args_concat }})',
2440+
UPPER: 'UPPER({{ args_concat }})',
2441+
LEFT: 'LEFT({{ args_concat }})',
2442+
RIGHT: 'RIGHT({{ args_concat }})',
24352443
},
24362444
statements: {
24372445
select: 'SELECT {{ select_concat | map(attribute=\'aliased\') | join(\', \') }} \n' +

packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ export class BigqueryQuery extends BaseQuery {
152152
templates.functions.DATETRUNC = 'DATETIME_TRUNC(CAST({{ args[1] }} AS DATETIME), {{ date_part }})';
153153
templates.expressions.binary = '{% if op == \'%\' %}MOD({{ left }}, {{ right }}){% else %}({{ left }} {{ op }} {{ right }}){% endif %}';
154154
templates.expressions.interval = 'INTERVAL {{ interval }}';
155-
templates.expressions.extract = 'EXTRACT({% if date_part == \'DOW\' %}DAYOFWEEK{% else %}{{ date_part }}{% endif %} FROM {{ expr }})';
155+
templates.expressions.extract = 'EXTRACT({% if date_part == \'DOW\' %}DAYOFWEEK{% elif date_part == \'DOY\' %}DAYOFYEAR{% else %}{{ date_part }}{% endif %} FROM {{ expr }})';
156156
return templates;
157157
}
158158
}

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use cubeclient::models::{V1LoadRequestQuery, V1LoadResult, V1LoadResultAnnotatio
1010
pub use datafusion::{
1111
arrow::{
1212
array::{
13-
ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int64Builder, StringBuilder,
13+
ArrayRef, BooleanBuilder, Date32Builder, Float64Builder, Int32Builder, Int64Builder,
14+
StringBuilder,
1415
},
1516
datatypes::{DataType, SchemaRef},
1617
error::{ArrowError, Result as ArrowResult},
@@ -896,6 +897,31 @@ pub fn transform_response<V: ValueObject>(
896897
}
897898
)
898899
}
900+
DataType::Int32 => {
901+
build_column!(
902+
DataType::Int32,
903+
Int32Builder,
904+
response,
905+
field_name,
906+
{
907+
(FieldValue::Number(number), builder) => builder.append_value(number.round() as i32)?,
908+
(FieldValue::String(s), builder) => match s.parse::<i32>() {
909+
Ok(v) => builder.append_value(v)?,
910+
Err(error) => {
911+
warn!(
912+
"Unable to parse value as i32: {}",
913+
error.to_string()
914+
);
915+
916+
builder.append_null()?
917+
}
918+
},
919+
},
920+
{
921+
(ScalarValue::Int32(v), builder) => builder.append_option(v.clone())?,
922+
}
923+
)
924+
}
899925
DataType::Int64 => {
900926
build_column!(
901927
DataType::Int64,

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,7 @@ impl CubeScanWrapperNode {
10211021
);
10221022
Ok((resulting_sql, sql_query))
10231023
}
1024-
// Expr::ScalarUDF { .. } => {}
1024+
10251025
// Expr::TableUDF { .. } => {}
10261026
Expr::Literal(literal) => {
10271027
Ok(match literal {
@@ -1127,6 +1127,36 @@ impl CubeScanWrapperNode {
11271127
}
11281128
})
11291129
}
1130+
Expr::ScalarUDF { fun, args } => {
1131+
let mut sql_args = Vec::new();
1132+
for arg in args {
1133+
let (sql, query) = Self::generate_sql_for_expr(
1134+
plan.clone(),
1135+
sql_query,
1136+
sql_generator.clone(),
1137+
arg,
1138+
ungrouped_scan_node.clone(),
1139+
)
1140+
.await?;
1141+
sql_query = query;
1142+
sql_args.push(sql);
1143+
}
1144+
Ok((
1145+
Self::escape_interpolation_quotes(
1146+
sql_generator
1147+
.get_sql_templates()
1148+
.scalar_function(fun.name.to_string(), sql_args, None)
1149+
.map_err(|e| {
1150+
DataFusionError::Internal(format!(
1151+
"Can't generate SQL for scalar function: {}",
1152+
e
1153+
))
1154+
})?,
1155+
ungrouped_scan_node.is_some(),
1156+
),
1157+
sql_query,
1158+
))
1159+
}
11301160
Expr::ScalarFunction { fun, args } => {
11311161
if let BuiltinScalarFunction::DatePart = &fun {
11321162
if args.len() >= 2 {
@@ -1199,7 +1229,7 @@ impl CubeScanWrapperNode {
11991229
Self::escape_interpolation_quotes(
12001230
sql_generator
12011231
.get_sql_templates()
1202-
.scalar_function(fun, sql_args, date_part)
1232+
.scalar_function(fun.to_string(), sql_args, date_part)
12031233
.map_err(|e| {
12041234
DataFusionError::Internal(format!(
12051235
"Can't generate SQL for scalar function: {}",
@@ -1260,7 +1290,7 @@ impl CubeScanWrapperNode {
12601290
// Expr::QualifiedWildcard { .. } => {}
12611291
x => {
12621292
return Err(DataFusionError::Internal(format!(
1263-
"Can't generate SQL for expr: {:?}",
1293+
"SQL generation for expression is not supported: {:?}",
12641294
x
12651295
)))
12661296
}

rust/cubesql/cubesql/src/compile/mod.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18690,6 +18690,93 @@ ORDER BY \"COUNT(count)\" DESC"
1869018690
.contains("EXTRACT"));
1869118691
}
1869218692

18693+
#[tokio::test]
18694+
async fn test_wrapper_tableau_week_number() {
18695+
if !Rewriter::sql_push_down_enabled() {
18696+
return;
18697+
}
18698+
init_logger();
18699+
18700+
let query_plan = convert_select_to_query_plan(
18701+
"SELECT CAST(FLOOR((7 + EXTRACT(DOY FROM order_date) - 1 + EXTRACT(DOW FROM DATE_TRUNC('YEAR', order_date))) / 7) AS INT) AS \"wk:created_at:ok\", AVG(avgPrice) mp FROM KibanaSampleDataEcommerce a GROUP BY 1 ORDER BY 1 DESC"
18702+
.to_string(),
18703+
DatabaseProtocol::PostgreSQL,
18704+
)
18705+
.await;
18706+
18707+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
18708+
println!(
18709+
"Physical plan: {}",
18710+
displayable(physical_plan.as_ref()).indent()
18711+
);
18712+
18713+
let logical_plan = query_plan.as_logical_plan();
18714+
assert!(logical_plan
18715+
.find_cube_scan_wrapper()
18716+
.wrapped_sql
18717+
.unwrap()
18718+
.sql
18719+
.contains("EXTRACT"));
18720+
}
18721+
18722+
#[tokio::test]
18723+
async fn test_wrapper_tableau_week_mmmm_yyyy() {
18724+
if !Rewriter::sql_push_down_enabled() {
18725+
return;
18726+
}
18727+
init_logger();
18728+
18729+
let query_plan = convert_select_to_query_plan(
18730+
"SELECT ((CAST(TRUNC(EXTRACT(YEAR FROM order_date)) AS INT) * 100) + CAST(TRUNC(EXTRACT(MONTH FROM order_date)) AS INT)) AS \"my:created_at:ok\", AVG(avgPrice) mp FROM KibanaSampleDataEcommerce a GROUP BY 1 ORDER BY 1 DESC"
18731+
.to_string(),
18732+
DatabaseProtocol::PostgreSQL,
18733+
)
18734+
.await;
18735+
18736+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
18737+
println!(
18738+
"Physical plan: {}",
18739+
displayable(physical_plan.as_ref()).indent()
18740+
);
18741+
18742+
let logical_plan = query_plan.as_logical_plan();
18743+
assert!(logical_plan
18744+
.find_cube_scan_wrapper()
18745+
.wrapped_sql
18746+
.unwrap()
18747+
.sql
18748+
.contains("EXTRACT"));
18749+
}
18750+
18751+
#[tokio::test]
18752+
async fn test_wrapper_tableau_iso_quarter() {
18753+
if !Rewriter::sql_push_down_enabled() {
18754+
return;
18755+
}
18756+
init_logger();
18757+
18758+
let query_plan = convert_select_to_query_plan(
18759+
"SELECT (LEAST(CAST((EXTRACT(WEEK FROM order_date) - 1) AS BIGINT) / 13, 3) + 1) AS \"iqr:created_at:ok\", AVG(avgPrice) mp FROM KibanaSampleDataEcommerce a GROUP BY 1 ORDER BY 1 DESC"
18760+
.to_string(),
18761+
DatabaseProtocol::PostgreSQL,
18762+
)
18763+
.await;
18764+
18765+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
18766+
println!(
18767+
"Physical plan: {}",
18768+
displayable(physical_plan.as_ref()).indent()
18769+
);
18770+
18771+
let logical_plan = query_plan.as_logical_plan();
18772+
assert!(logical_plan
18773+
.find_cube_scan_wrapper()
18774+
.wrapped_sql
18775+
.unwrap()
18776+
.sql
18777+
.contains("EXTRACT"));
18778+
}
18779+
1869318780
#[tokio::test]
1869418781
async fn test_thoughtspot_pg_date_trunc_year() {
1869518782
init_logger();

rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod order;
1313
mod projection;
1414
mod scalar_function;
1515
mod sort_expr;
16+
mod udf_function;
1617
mod wrapper_pull_up;
1718

1819
use crate::compile::{
@@ -44,6 +45,7 @@ impl RewriteRules for WrapperRules {
4445
self.order_rules(&mut rules);
4546
self.aggregate_function_rules(&mut rules);
4647
self.scalar_function_rules(&mut rules);
48+
self.udf_function_rules(&mut rules);
4749
self.extract_rules(&mut rules);
4850
self.alias_rules(&mut rules);
4951
self.case_rules(&mut rules);
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use crate::{
2+
compile::rewrite::{
3+
analysis::LogicalPlanAnalysis, rewrite, rules::wrapper::WrapperRules, transforming_rewrite,
4+
udf_expr_var_arg, udf_fun_expr_args, udf_fun_expr_args_empty_tail, wrapper_pullup_replacer,
5+
wrapper_pushdown_replacer, LogicalPlanLanguage, ScalarUDFExprFun,
6+
WrapperPullupReplacerAliasToCube,
7+
},
8+
var, var_iter,
9+
};
10+
use egg::{EGraph, Rewrite, Subst};
11+
12+
impl WrapperRules {
13+
pub fn udf_function_rules(
14+
&self,
15+
rules: &mut Vec<Rewrite<LogicalPlanLanguage, LogicalPlanAnalysis>>,
16+
) {
17+
rules.extend(vec![
18+
rewrite(
19+
"wrapper-push-down-udf",
20+
wrapper_pushdown_replacer(
21+
udf_expr_var_arg("?fun", "?args"),
22+
"?alias_to_cube",
23+
"?ungrouped",
24+
"?cube_members",
25+
),
26+
udf_expr_var_arg(
27+
"?fun",
28+
wrapper_pushdown_replacer(
29+
"?args",
30+
"?alias_to_cube",
31+
"?ungrouped",
32+
"?cube_members",
33+
),
34+
),
35+
),
36+
transforming_rewrite(
37+
"wrapper-pull-up-udf",
38+
udf_expr_var_arg(
39+
"?fun",
40+
wrapper_pullup_replacer(
41+
"?args",
42+
"?alias_to_cube",
43+
"?ungrouped",
44+
"?cube_members",
45+
),
46+
),
47+
wrapper_pullup_replacer(
48+
udf_expr_var_arg("?fun", "?args"),
49+
"?alias_to_cube",
50+
"?ungrouped",
51+
"?cube_members",
52+
),
53+
self.transform_udf_expr("?fun", "?alias_to_cube"),
54+
),
55+
rewrite(
56+
"wrapper-push-down-udf-args",
57+
wrapper_pushdown_replacer(
58+
udf_fun_expr_args("?left", "?right"),
59+
"?alias_to_cube",
60+
"?ungrouped",
61+
"?cube_members",
62+
),
63+
udf_fun_expr_args(
64+
wrapper_pushdown_replacer(
65+
"?left",
66+
"?alias_to_cube",
67+
"?ungrouped",
68+
"?cube_members",
69+
),
70+
wrapper_pushdown_replacer(
71+
"?right",
72+
"?alias_to_cube",
73+
"?ungrouped",
74+
"?cube_members",
75+
),
76+
),
77+
),
78+
rewrite(
79+
"wrapper-pull-up-udf-args",
80+
udf_fun_expr_args(
81+
wrapper_pullup_replacer(
82+
"?left",
83+
"?alias_to_cube",
84+
"?ungrouped",
85+
"?cube_members",
86+
),
87+
wrapper_pullup_replacer(
88+
"?right",
89+
"?alias_to_cube",
90+
"?ungrouped",
91+
"?cube_members",
92+
),
93+
),
94+
wrapper_pullup_replacer(
95+
udf_fun_expr_args("?left", "?right"),
96+
"?alias_to_cube",
97+
"?ungrouped",
98+
"?cube_members",
99+
),
100+
),
101+
rewrite(
102+
"wrapper-push-down-udf-empty-tail",
103+
wrapper_pushdown_replacer(
104+
udf_fun_expr_args_empty_tail(),
105+
"?alias_to_cube",
106+
"?ungrouped",
107+
"?cube_members",
108+
),
109+
wrapper_pullup_replacer(
110+
udf_fun_expr_args_empty_tail(),
111+
"?alias_to_cube",
112+
"?ungrouped",
113+
"?cube_members",
114+
),
115+
),
116+
]);
117+
}
118+
119+
fn transform_udf_expr(
120+
&self,
121+
fun_var: &'static str,
122+
alias_to_cube_var: &'static str,
123+
) -> impl Fn(&mut EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, &mut Subst) -> bool {
124+
let fun_var = var!(fun_var);
125+
let alias_to_cube_var = var!(alias_to_cube_var);
126+
let meta = self.cube_context.meta.clone();
127+
move |egraph, subst| {
128+
for alias_to_cube in var_iter!(
129+
egraph[subst[alias_to_cube_var]],
130+
WrapperPullupReplacerAliasToCube
131+
)
132+
.cloned()
133+
{
134+
if let Some(sql_generator) = meta.sql_generator_by_alias_to_cube(&alias_to_cube) {
135+
for fun in var_iter!(egraph[subst[fun_var]], ScalarUDFExprFun).cloned() {
136+
if sql_generator
137+
.get_sql_templates()
138+
.templates
139+
.contains_key(&format!("functions/{}", fun.to_uppercase()))
140+
{
141+
return true;
142+
}
143+
}
144+
}
145+
}
146+
false
147+
}
148+
}
149+
}

rust/cubesql/cubesql/src/compile/test/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ pub fn get_test_tenant_ctx() -> Arc<MetaContext> {
209209
),
210210
("functions/DATETRUNC".to_string(), "DATE_TRUNC({{ args_concat }})".to_string()),
211211
("functions/DATEPART".to_string(), "DATE_PART({{ args_concat }})".to_string()),
212+
("functions/FLOOR".to_string(), "FLOOR({{ args_concat }})".to_string()),
213+
("functions/TRUNC".to_string(), "TRUNC({{ args_concat }})".to_string()),
214+
("functions/LEAST".to_string(), "LEAST({{ args_concat }})".to_string()),
212215
("expressions/extract".to_string(), "EXTRACT({{ date_part }} FROM {{ expr }})".to_string()),
213216
(
214217
"statements/select".to_string(),

0 commit comments

Comments
 (0)