Skip to content

Commit cb507c1

Browse files
authored
feat(tesseract): Make measure an entry point for multi-fact join for now and support template for params (#9053)
1 parent 59dc373 commit cb507c1

File tree

13 files changed

+190
-48
lines changed

13 files changed

+190
-48
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3326,7 +3326,6 @@ export class BaseQuery {
33263326
join_types: {
33273327
inner: 'INNER',
33283328
left: 'LEFT',
3329-
full: 'FULL',
33303329
},
33313330
window_frame_types: {
33323331
rows: 'ROWS',

packages/cubejs-schema-compiler/src/adapter/BigqueryQuery.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ export class BigqueryQuery extends BaseQuery {
257257
templates.types.decimal = 'BIGDECIMAL({{ precision }},{{ scale }})';
258258
templates.types.binary = 'BYTES';
259259
templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM';
260+
templates.join_types.full = 'FULL';
260261
return templates;
261262
}
262263
}

packages/cubejs-schema-compiler/src/adapter/SnowflakeQuery.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export class SnowflakeQuery extends BaseQuery {
116116
templates.expressions.interval = 'INTERVAL \'{{ interval }}\'';
117117
templates.expressions.timestamp_literal = '\'{{ value }}\'::timestamp_tz';
118118
templates.operators.is_not_distinct_from = 'IS NOT DISTINCT FROM';
119+
templates.join_types.full = 'FULL';
119120
delete templates.types.interval;
120121
return templates;
121122
}

rust/cubesqlplanner/cubesqlplanner/src/plan/builder/select.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
use crate::plan::{
2-
AliasedExpr, Cte, Expr, Filter, From, MemberExpression, OrderBy, Schema, SchemaColumn, Select,
3-
SingleAliasedSource, SingleSource,
2+
AliasedExpr, Cte, Expr, Filter, From, MemberExpression, OrderBy, QualifiedColumnName, Schema,
3+
SchemaColumn, Select, SingleAliasedSource, SingleSource,
44
};
55

6+
use crate::plan::expression::FunctionExpression;
67
use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory;
78
use crate::planner::{BaseMember, VisitorContext};
89
use std::collections::HashMap;
@@ -57,6 +58,36 @@ impl SelectBuilder {
5758
.add_column(SchemaColumn::new(alias.clone(), Some(member.full_name())));
5859
}
5960

61+
pub fn add_projection_coalesce_member(
62+
&mut self,
63+
member: &Rc<dyn BaseMember>,
64+
references: Vec<QualifiedColumnName>,
65+
alias: Option<String>,
66+
) {
67+
let alias = if let Some(alias) = alias {
68+
alias
69+
} else {
70+
member.alias_name()
71+
};
72+
73+
let expr = Expr::Function(FunctionExpression {
74+
function: "COALESCE".to_string(),
75+
arguments: references
76+
.into_iter()
77+
// TODO unwrap
78+
.map(|r| Expr::Reference(r))
79+
.collect(),
80+
});
81+
let aliased_expr = AliasedExpr {
82+
expr,
83+
alias: alias.clone(),
84+
};
85+
86+
self.projection_columns.push(aliased_expr);
87+
self.result_schema
88+
.add_column(SchemaColumn::new(alias.clone(), Some(member.full_name())));
89+
}
90+
6091
pub fn set_filter(&mut self, filter: Option<Filter>) {
6192
self.filter = filter;
6293
}

rust/cubesqlplanner/cubesqlplanner/src/plan/expression.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,17 @@ impl MemberExpression {
2323
}
2424
}
2525

26+
#[derive(Clone)]
27+
pub struct FunctionExpression {
28+
pub function: String,
29+
pub arguments: Vec<Expr>,
30+
}
31+
2632
#[derive(Clone)]
2733
pub enum Expr {
2834
Member(MemberExpression),
2935
Reference(QualifiedColumnName),
36+
Function(FunctionExpression),
3037
}
3138

3239
impl Expr {
@@ -46,6 +53,18 @@ impl Expr {
4653
Self::Reference(reference) => {
4754
templates.column_reference(reference.source(), &reference.name())
4855
}
56+
Expr::Function(FunctionExpression {
57+
function,
58+
arguments,
59+
}) => templates.scalar_function(
60+
function.to_string(),
61+
arguments
62+
.iter()
63+
.map(|e| e.to_sql(&templates, context.clone()))
64+
.collect::<Result<Vec<_>, _>>()?,
65+
None,
66+
None,
67+
),
4968
}
5069
}
5170
}

rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,13 @@ impl RollingWindowJoinCondition {
8888
}
8989

9090
pub struct DimensionJoinCondition {
91-
conditions: Vec<(Expr, Expr)>,
91+
// AND (... OR ...)
92+
conditions: Vec<Vec<(Expr, Expr)>>,
9293
null_check: bool,
9394
}
9495

9596
impl DimensionJoinCondition {
96-
pub fn new(conditions: Vec<(Expr, Expr)>, null_check: bool) -> Self {
97+
pub fn new(conditions: Vec<Vec<(Expr, Expr)>>, null_check: bool) -> Self {
9798
Self {
9899
conditions,
99100
null_check,
@@ -110,8 +111,17 @@ impl DimensionJoinCondition {
110111
} else {
111112
self.conditions
112113
.iter()
113-
.map(|(left, right)| -> Result<String, CubeError> {
114-
self.dimension_condition(templates, context.clone(), left, right)
114+
.map(|or_conditions| -> Result<_, CubeError> {
115+
Ok(format!(
116+
"({})",
117+
or_conditions
118+
.iter()
119+
.map(|(left, right)| -> Result<String, CubeError> {
120+
self.dimension_condition(templates, context.clone(), left, right)
121+
})
122+
.collect::<Result<Vec<_>, _>>()?
123+
.join(" OR ")
124+
))
115125
})
116126
.collect::<Result<Vec<_>, _>>()?
117127
.join(" AND ")
@@ -139,7 +149,7 @@ pub enum JoinCondition {
139149
}
140150

141151
impl JoinCondition {
142-
pub fn new_dimension_join(conditions: Vec<(Expr, Expr)>, null_check: bool) -> Self {
152+
pub fn new_dimension_join(conditions: Vec<Vec<(Expr, Expr)>>, null_check: bool) -> Self {
143153
Self::DimensionJoinCondition(DimensionJoinCondition::new(conditions, null_check))
144154
}
145155

rust/cubesqlplanner/cubesqlplanner/src/planner/params_allocator.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::planner::sql_templates::PlanSqlTemplates;
12
use cubenativeutils::CubeError;
23
use lazy_static::lazy_static;
34
use regex::{Captures, Regex};
@@ -8,12 +9,16 @@ lazy_static! {
89
static ref PARAMS_MATCH_RE: Regex = Regex::new(r"\$_(\d+)_\$").unwrap();
910
}
1011
pub struct ParamsAllocator {
12+
sql_templates: PlanSqlTemplates,
1113
params: Vec<String>,
1214
}
1315

1416
impl ParamsAllocator {
15-
pub fn new() -> ParamsAllocator {
16-
ParamsAllocator { params: Vec::new() }
17+
pub fn new(sql_templates: PlanSqlTemplates) -> ParamsAllocator {
18+
ParamsAllocator {
19+
sql_templates,
20+
params: Vec::new(),
21+
}
1722
}
1823

1924
pub fn make_placeholder(&self, index: usize) -> String {
@@ -38,31 +43,51 @@ impl ParamsAllocator {
3843
let (sql, params) = self.add_native_allocated_params(sql, &native_allocated_params)?;
3944
let mut params_in_sql_order = Vec::new();
4045
let mut param_index_map: HashMap<usize, usize> = HashMap::new();
46+
let mut error = None;
4147
let result_sql = if should_reuse_params {
4248
PARAMS_MATCH_RE
4349
.replace_all(&sql, |caps: &Captures| {
4450
let ind: usize = caps[1].to_string().parse().unwrap();
4551
let new_index = if let Some(index) = param_index_map.get(&ind) {
4652
index.clone()
4753
} else {
48-
params_in_sql_order.push(params[ind].clone());
4954
let index = params_in_sql_order.len();
55+
params_in_sql_order.push(params[ind].clone());
5056
param_index_map.insert(ind, index);
5157
index
5258
};
53-
format!("${}", new_index) //TODO get placeholder from js part
59+
match self.sql_templates.param(new_index) {
60+
Ok(res) => res,
61+
Err(e) => {
62+
if error.is_none() {
63+
error = Some(e);
64+
}
65+
"$error$".to_string()
66+
}
67+
}
5468
})
5569
.to_string()
5670
} else {
5771
PARAMS_MATCH_RE
5872
.replace_all(&sql, |caps: &Captures| {
5973
let ind: usize = caps[1].to_string().parse().unwrap();
60-
params_in_sql_order.push(params[ind].clone());
6174
let index = params_in_sql_order.len();
62-
format!("${}", index) //TODO get placeholder from js part
75+
params_in_sql_order.push(params[ind].clone());
76+
match self.sql_templates.param(index) {
77+
Ok(res) => res,
78+
Err(e) => {
79+
if error.is_none() {
80+
error = Some(e);
81+
}
82+
"$error$".to_string()
83+
}
84+
}
6385
})
6486
.to_string()
6587
};
88+
if let Some(error) = error {
89+
return Err(error);
90+
}
6691
Ok((result_sql, params_in_sql_order))
6792
}
6893

rust/cubesqlplanner/cubesqlplanner/src/planner/planners/full_key_query_aggregate_planner.rs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,33 +60,38 @@ impl FullKeyAggregateQueryPlanner {
6060
let mut join_builder = JoinBuilder::new_from_subselect(joins[0].clone(), format!("q_0"));
6161
let dimensions_to_select = self.query_properties.dimensions_for_select();
6262
for (i, join) in joins.iter().enumerate().skip(1) {
63-
let left_alias = format!("q_{}", i - 1);
6463
let right_alias = format!("q_{}", i);
6564
let left_schema = joins[i - 1].schema();
6665
let right_schema = joins[i].schema();
6766
// TODO every next join should join to all previous dimensions through OR: q_0.a = q_1.a, q_0.a = q_2.a OR q_1.a = q_2.a, ...
6867
let conditions = dimensions_to_select
6968
.iter()
7069
.map(|dim| {
71-
let alias_in_left_query = left_schema.resolve_member_alias(dim);
72-
let left_ref = Expr::Reference(QualifiedColumnName::new(
73-
Some(left_alias.clone()),
74-
alias_in_left_query,
75-
));
76-
let alias_in_right_query = right_schema.resolve_member_alias(dim);
77-
let right_ref = Expr::Reference(QualifiedColumnName::new(
78-
Some(right_alias.clone()),
79-
alias_in_right_query,
80-
));
81-
(left_ref, right_ref)
70+
(0..i)
71+
.map(|left_i| {
72+
let left_alias = format!("q_{}", left_i);
73+
let alias_in_left_query = left_schema.resolve_member_alias(dim);
74+
let left_ref = Expr::Reference(QualifiedColumnName::new(
75+
Some(left_alias.clone()),
76+
alias_in_left_query,
77+
));
78+
let alias_in_right_query = right_schema.resolve_member_alias(dim);
79+
let right_ref = Expr::Reference(QualifiedColumnName::new(
80+
Some(right_alias.clone()),
81+
alias_in_right_query,
82+
));
83+
(left_ref, right_ref)
84+
})
85+
.collect::<Vec<_>>()
8286
})
8387
.collect_vec();
8488
let on = JoinCondition::new_dimension_join(conditions, true);
8589
let next_alias = format!("q_{}", i);
86-
if self.plan_sql_templates.supports_is_not_distinct_from() {
87-
join_builder.inner_join_subselect(join.clone(), next_alias, on);
88-
} else {
90+
if self.plan_sql_templates.supports_full_join() {
8991
join_builder.full_join_subselect(join.clone(), next_alias, on);
92+
} else {
93+
// TODO in case of full join is not supported there should be correct blending query that keeps NULL values
94+
join_builder.inner_join_subselect(join.clone(), next_alias, on);
9095
}
9196
}
9297

@@ -103,9 +108,26 @@ impl FullKeyAggregateQueryPlanner {
103108
&dimensions_source,
104109
&mut render_references,
105110
)?;
111+
let references = (0..joins.len())
112+
.map(|i| {
113+
let alias = format!("q_{}", i);
114+
references_builder
115+
.find_reference_for_member(
116+
&member.member_evaluator().full_name(),
117+
&Some(alias.clone()),
118+
)
119+
.ok_or_else(|| {
120+
CubeError::internal(format!(
121+
"Reference for join not found for {} in {}",
122+
member.member_evaluator().full_name(),
123+
alias
124+
))
125+
})
126+
})
127+
.collect::<Result<Vec<_>, _>>()?;
106128
let alias = references_builder
107129
.resolve_alias_for_member(&member.full_name(), &dimensions_source);
108-
select_builder.add_projection_member(member, alias);
130+
select_builder.add_projection_coalesce_member(member, references, alias);
109131
}
110132

111133
for member in BaseMemberHelper::iter_as_base_member(&outer_measures) {

rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -267,24 +267,28 @@ impl MultiStageMemberQueryPlanner {
267267
Some(root_alias.clone()),
268268
);
269269
for (i, input) in inputs.iter().enumerate().skip(1) {
270-
let left_alias = format!("q_{}", i - 1);
271270
let right_alias = format!("q_{}", i);
272271
let left_schema = cte_schemas.get(&inputs[i - 1]).unwrap().clone();
273272
let cte_schema = cte_schemas.get(input).unwrap().clone();
274273
let conditions = dimensions
275274
.iter()
276275
.map(|dim| {
277-
let alias_in_left_query = left_schema.resolve_member_alias(dim);
278-
let left_ref = Expr::Reference(QualifiedColumnName::new(
279-
Some(left_alias.clone()),
280-
alias_in_left_query,
281-
));
282-
let alias_in_right_query = cte_schema.resolve_member_alias(dim);
283-
let right_ref = Expr::Reference(QualifiedColumnName::new(
284-
Some(right_alias.clone()),
285-
alias_in_right_query,
286-
));
287-
(left_ref, right_ref)
276+
(0..i)
277+
.map(|left_alias| {
278+
let left_alias = format!("q_{}", left_alias);
279+
let alias_in_left_query = left_schema.resolve_member_alias(dim);
280+
let left_ref = Expr::Reference(QualifiedColumnName::new(
281+
Some(left_alias.clone()),
282+
alias_in_left_query,
283+
));
284+
let alias_in_right_query = cte_schema.resolve_member_alias(dim);
285+
let right_ref = Expr::Reference(QualifiedColumnName::new(
286+
Some(right_alias.clone()),
287+
alias_in_right_query,
288+
));
289+
(left_ref, right_ref)
290+
})
291+
.collect()
288292
})
289293
.collect_vec();
290294
let on = JoinCondition::new_dimension_join(conditions, true);

rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl MultipliedMeasuresQueryPlanner {
134134
Some(pk_cube_alias.clone()),
135135
alias_in_subquery,
136136
));
137-
(keys_query_ref, subquery_ref)
137+
vec![(keys_query_ref, subquery_ref)]
138138
})
139139
.collect_vec();
140140

@@ -165,7 +165,7 @@ impl MultipliedMeasuresQueryPlanner {
165165
alias_in_keys_query,
166166
));
167167
let pk_cube_expr = Expr::Member(MemberExpression::new(dim.clone()));
168-
(keys_query_ref, pk_cube_expr)
168+
vec![(keys_query_ref, pk_cube_expr)]
169169
})
170170
.collect_vec();
171171
join_builder.left_join_cube(

0 commit comments

Comments
 (0)