Skip to content

Commit a462b41

Browse files
authored
feat(tesseract): Custom granularities in pre-aggregations (#10127)
* wip: todos * add time_dimension_granularity_hierarchy() * implement min_granularity_for_time_dimensions * fix add_interval() for time-only intervals * fix rollup_granularity() in time dimension symbol * implement is_aligned_with_date_range() in Granularity * fix symbols factory allowing long_path member names * support custom granularities in dimension matcher * use time dimensions instead of dimensions for PreAggregation time_dimensions * fix matcher * fix make_pre_aggregation_union_source() * cargo fmt/clippy * run drivers tests for changes in tesseract * fix DimensionSymbolFactory try_new for short/long paths
1 parent 1e757cd commit a462b41

File tree

13 files changed

+306
-56
lines changed

13 files changed

+306
-56
lines changed

.github/workflows/drivers-tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ on:
2727
- 'packages/cubejs-snowflake-driver/**'
2828
- 'packages/cubejs-vertica-driver/**'
2929

30-
# To test SQL API Push down
3130
- 'packages/cubejs-backend-native/**'
3231
- 'rust/cubesql/**'
32+
- 'rust/cubesqlplanner/**'
3333
pull_request:
3434
paths:
3535
- '.github/workflows/drivers-tests.yml'
@@ -54,9 +54,9 @@ on:
5454
- 'packages/cubejs-snowflake-driver/**'
5555
- 'packages/cubejs-vertica-driver/**'
5656

57-
# To test SQL API Push down
5857
- 'packages/cubejs-backend-native/**'
5958
- 'rust/cubesql/**'
59+
- 'rust/cubesqlplanner/**'
6060
workflow_dispatch:
6161
inputs:
6262
use_tesseract_sql_planner:

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub struct CompiledPreAggregation {
4646
pub external: Option<bool>,
4747
pub measures: Vec<Rc<MemberSymbol>>,
4848
pub dimensions: Vec<Rc<MemberSymbol>>,
49-
pub time_dimensions: Vec<(Rc<MemberSymbol>, Option<String>)>,
49+
pub time_dimensions: Vec<Rc<MemberSymbol>>,
5050
pub allow_non_strict_date_range_match: bool,
5151
}
5252

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/dimension_matcher.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub struct DimensionMatcher<'a> {
3434
query_tools: Rc<QueryTools>,
3535
pre_aggregation: &'a CompiledPreAggregation,
3636
pre_aggregation_dimensions: HashMap<String, bool>,
37-
pre_aggregation_time_dimensions: HashMap<String, (Option<String>, bool)>,
37+
pre_aggregation_time_dimensions: HashMap<String, (Option<Rc<TimeDimensionSymbol>>, bool)>,
3838
result: MatchState,
3939
}
4040

@@ -48,7 +48,13 @@ impl<'a> DimensionMatcher<'a> {
4848
let pre_aggregation_time_dimensions = pre_aggregation
4949
.time_dimensions
5050
.iter()
51-
.map(|(dim, granularity)| (dim.full_name(), (granularity.clone(), false)))
51+
.map(|dim| {
52+
if let Ok(td) = dim.as_time_dimension() {
53+
(td.base_symbol().full_name(), (Some(td), false))
54+
} else {
55+
(dim.full_name(), (None, false))
56+
}
57+
})
5258
.collect::<HashMap<_, _>>();
5359
Self {
5460
query_tools,
@@ -194,30 +200,39 @@ impl<'a> DimensionMatcher<'a> {
194200
time_dimension.rollup_granularity(self.query_tools.clone())?
195201
};
196202
let base_symbol_name = time_dimension.base_symbol().full_name();
203+
197204
if let Some(found) = self
198205
.pre_aggregation_time_dimensions
199206
.get_mut(&base_symbol_name)
200207
{
201208
if add_to_matched_dimension {
202209
found.1 = true;
203210
}
204-
let pre_aggr_granularity = &found.0;
205-
if granularity.is_none() || pre_aggr_granularity == &granularity {
211+
212+
let pre_agg_td = &found.0;
213+
let pre_aggr_granularity = if let Some(pre_agg_td) = pre_agg_td {
214+
pre_agg_td.granularity().clone()
215+
} else {
216+
None
217+
};
218+
219+
if granularity.is_none() || pre_aggr_granularity == granularity {
206220
Ok(MatchState::Full)
207-
} else if pre_aggr_granularity.is_none()
208-
|| GranularityHelper::is_predefined_granularity(
209-
pre_aggr_granularity.as_ref().unwrap(),
210-
)
211-
{
212-
let min_granularity =
213-
GranularityHelper::min_granularity(&granularity, &pre_aggr_granularity)?;
214-
if &min_granularity == pre_aggr_granularity {
221+
} else if pre_aggr_granularity.is_none() {
222+
Ok(MatchState::NotMatched)
223+
} else if let Some(pre_agg_td) = pre_agg_td {
224+
let min_granularity = GranularityHelper::min_granularity_for_time_dimensions(
225+
(&granularity, time_dimension),
226+
(&pre_aggr_granularity, &pre_agg_td),
227+
)?;
228+
229+
if min_granularity == pre_aggr_granularity {
215230
Ok(MatchState::Partial)
216231
} else {
217232
Ok(MatchState::NotMatched)
218233
}
219234
} else {
220-
Ok(MatchState::NotMatched) //TODO Custom granularities!!!
235+
Ok(MatchState::NotMatched)
221236
}
222237
} else {
223238
if time_dimension.owned_by_cube() {

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -387,12 +387,7 @@ impl PreAggregationOptimizer {
387387
.dimensions
388388
.iter()
389389
.cloned()
390-
.chain(
391-
pre_aggregation
392-
.time_dimensions
393-
.iter()
394-
.map(|(d, _)| d.clone()),
395-
)
390+
.chain(pre_aggregation.time_dimensions.iter().map(|d| d.clone()))
396391
.collect(),
397392
measures: pre_aggregation.measures.to_vec(),
398393
multiplied_measures: HashSet::new(),

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use crate::planner::planners::ResolvedJoinItem;
1212
use crate::planner::query_tools::QueryTools;
1313
use crate::planner::sql_evaluator::collectors::collect_cube_names_from_symbols;
1414
use crate::planner::sql_evaluator::MemberSymbol;
15+
use crate::planner::sql_evaluator::TimeDimensionSymbol;
16+
use crate::planner::GranularityHelper;
1517
use cubenativeutils::CubeError;
1618
use itertools::Itertools;
1719
use std::collections::HashMap;
@@ -136,7 +138,30 @@ impl PreAggregationsCompiler {
136138
refs,
137139
Self::check_is_time_dimension,
138140
)?;
139-
vec![(dims[0].clone(), static_data.granularity.clone())]
141+
142+
if static_data.granularity.is_some() {
143+
let evaluator_compiler_cell = self.query_tools.evaluator_compiler().clone();
144+
let mut evaluator_compiler = evaluator_compiler_cell.borrow_mut();
145+
let base_symbol = dims[0].clone();
146+
147+
let granularity_obj = GranularityHelper::make_granularity_obj(
148+
self.query_tools.cube_evaluator().clone(),
149+
&mut evaluator_compiler,
150+
&base_symbol.cube_name(),
151+
&base_symbol.name(),
152+
static_data.granularity.clone(),
153+
)?;
154+
let symbol = MemberSymbol::new_time_dimension(TimeDimensionSymbol::new(
155+
base_symbol,
156+
static_data.granularity.clone(),
157+
granularity_obj,
158+
None,
159+
));
160+
161+
vec![symbol]
162+
} else {
163+
vec![dims[0].clone()]
164+
}
140165
} else {
141166
Vec::new()
142167
};
@@ -276,14 +301,16 @@ impl PreAggregationsCompiler {
276301
}
277302

278303
fn match_time_dimensions(
279-
a: &Vec<(Rc<MemberSymbol>, Option<String>)>,
280-
b: &Vec<(Rc<MemberSymbol>, Option<String>)>,
304+
a: &Vec<Rc<MemberSymbol>>,
305+
b: &Vec<Rc<MemberSymbol>>,
281306
) -> Result<(), CubeError> {
282-
if !a
283-
.iter()
284-
.zip(b.iter())
285-
.all(|(a, b)| a.0.name() == b.0.name() && a.1 == b.1)
286-
{
307+
if !a.iter().zip(b.iter()).all(|(a, b)| {
308+
if let (Ok(td_a), Ok(td_b)) = (a.as_time_dimension(), b.as_time_dimension()) {
309+
td_a.name() == td_a.name() && td_a.granularity() == td_b.granularity()
310+
} else {
311+
false
312+
}
313+
}) {
287314
return Err(CubeError::user(format!(
288315
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
289316
)));

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub struct PreAggregation {
1414
#[builder(default)]
1515
dimensions: Vec<Rc<MemberSymbol>>,
1616
#[builder(default)]
17-
time_dimensions: Vec<(Rc<MemberSymbol>, Option<String>)>,
17+
time_dimensions: Vec<Rc<MemberSymbol>>,
1818
external: bool,
1919
#[builder(default)]
2020
granularity: Option<String>,
@@ -39,7 +39,7 @@ impl PreAggregation {
3939
&self.dimensions
4040
}
4141

42-
pub fn time_dimensions(&self) -> &Vec<(Rc<MemberSymbol>, Option<String>)> {
42+
pub fn time_dimensions(&self) -> &Vec<Rc<MemberSymbol>> {
4343
&self.time_dimensions
4444
}
4545

@@ -97,11 +97,11 @@ impl PreAggregation {
9797
QualifiedColumnName::new(None, alias.clone()),
9898
);
9999
}
100-
for (dim, granularity) in self.time_dimensions().iter() {
101-
let base_symbol = if let Ok(td) = dim.as_time_dimension() {
102-
td.base_symbol().clone()
100+
for dim in self.time_dimensions().iter() {
101+
let (base_symbol, granularity) = if let Ok(td) = dim.as_time_dimension() {
102+
(td.base_symbol().clone(), td.granularity().clone())
103103
} else {
104-
dim.clone()
104+
(dim.clone(), None)
105105
};
106106
let suffix = if let Some(granularity) = &granularity {
107107
format!("_{}", granularity.clone())
@@ -110,7 +110,7 @@ impl PreAggregation {
110110
};
111111
let alias = format!("{}{}", base_symbol.alias(), suffix);
112112
res.insert(
113-
dim.full_name(),
113+
base_symbol.full_name(),
114114
QualifiedColumnName::new(None, alias.clone()),
115115
);
116116
}
@@ -184,11 +184,7 @@ impl PrettyPrint for PreAggregation {
184184
&self
185185
.time_dimensions()
186186
.iter()
187-
.map(|(d, granularity)| format!(
188-
"({} {})",
189-
d.full_name(),
190-
granularity.clone().unwrap_or("None".to_string())
191-
))
187+
.map(|d| d.full_name())
192188
.join(", ")
193189
),
194190
&state,

rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,25 @@ impl PreAggregationProcessor<'_> {
9595
Some(alias),
9696
);
9797
}
98-
for (dim, granularity) in pre_aggregation.time_dimensions().iter() {
98+
for dim in pre_aggregation.time_dimensions().iter() {
99+
let (alias, granularity) = if let Ok(td) = dim.as_time_dimension() {
100+
(td.base_symbol().alias(), td.granularity().clone())
101+
} else {
102+
(dim.alias(), None)
103+
};
104+
99105
let name_in_table = PlanSqlTemplates::memeber_alias_name(
100106
&item.cube_alias,
101107
&dim.name(),
102-
granularity,
108+
&granularity,
103109
);
110+
104111
let suffix = if let Some(granularity) = granularity {
105112
format!("_{}", granularity.clone())
106113
} else {
107114
"_day".to_string()
108115
};
109-
let alias = format!("{}{}", dim.alias(), suffix);
116+
let alias = format!("{}{}", alias, suffix);
110117
select_builder.add_projection_reference_member(
111118
&dim,
112119
QualifiedColumnName::new(None, name_in_table.clone()),

rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/dimension_symbol.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -340,13 +340,32 @@ impl DimensionSymbolFactory {
340340
full_name: &String,
341341
cube_evaluator: Rc<dyn CubeEvaluator>,
342342
) -> Result<Self, CubeError> {
343-
let mut iter = cube_evaluator
344-
.parse_path("dimensions".to_string(), full_name.clone())?
345-
.into_iter();
343+
let parts: Vec<&str> = full_name.split('.').collect();
344+
let mut iter;
345+
let member_short_path;
346+
347+
// try_new might be invoked with next full_name variants:
348+
// 1. "cube.member"
349+
// 2. "cube.member.granularity" might come from multistage things
350+
// 3. "cube.cube.cube...cube.member" might come from pre-agg references (as it include full join paths)
351+
// And we can not distinguish between "cube.member.granularity" and "cube.cube.member" here,
352+
// so we have to try-catch 2 variants of evaluation.
353+
if let Ok(iter_by_start) =
354+
cube_evaluator.parse_path("dimensions".to_string(), full_name.clone())
355+
{
356+
member_short_path = full_name.clone();
357+
iter = iter_by_start.into_iter();
358+
} else {
359+
member_short_path = format!("{}.{}", parts[parts.len() - 2], parts[parts.len() - 1]);
360+
iter = cube_evaluator
361+
.parse_path("dimensions".to_string(), member_short_path.clone())?
362+
.into_iter();
363+
}
364+
346365
let cube_name = iter.next().unwrap();
347366
let name = iter.next().unwrap();
348367
let granularity = iter.next();
349-
let definition = cube_evaluator.dimension_by_path(full_name.clone())?;
368+
let definition = cube_evaluator.dimension_by_path(member_short_path)?;
350369
Ok(Self {
351370
cube_name,
352371
name,

rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,12 +511,19 @@ impl MeasureSymbolFactory {
511511
full_name: &String,
512512
cube_evaluator: Rc<dyn CubeEvaluator>,
513513
) -> Result<Self, CubeError> {
514+
let parts: Vec<&str> = full_name.split('.').collect();
515+
let member_short_path = if parts.len() > 2 {
516+
format!("{}.{}", parts[parts.len() - 2], parts[parts.len() - 1])
517+
} else {
518+
full_name.clone()
519+
};
520+
514521
let mut iter = cube_evaluator
515-
.parse_path("measures".to_string(), full_name.clone())?
522+
.parse_path("measures".to_string(), member_short_path.clone())?
516523
.into_iter();
517524
let cube_name = iter.next().unwrap();
518525
let name = iter.next().unwrap();
519-
let definition = cube_evaluator.measure_by_path(full_name.clone())?;
526+
let definition = cube_evaluator.measure_by_path(member_short_path)?;
520527
let sql = definition.sql()?;
521528
Ok(Self {
522529
cube_name,

rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/time_dimension_symbol.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,13 +238,38 @@ impl TimeDimensionSymbol {
238238
query_tools: Rc<QueryTools>,
239239
) -> Result<Option<String>, CubeError> {
240240
if let Some(granularity_obj) = &self.granularity_obj {
241-
let date_range_granularity = self.date_range_granularity(query_tools.clone())?;
242-
let self_granularity = granularity_obj.min_granularity()?;
243-
244-
GranularityHelper::min_granularity(&date_range_granularity, &self_granularity)
241+
if let Some(date_range) = &self.date_range {
242+
let date_range_granularity = self.date_range_granularity(query_tools.clone())?;
243+
244+
// For predefined granularities or custom granularities not aligned with date range,
245+
// we need to return the minimum granularity
246+
if granularity_obj.is_predefined_granularity() {
247+
let self_granularity = granularity_obj.min_granularity()?;
248+
GranularityHelper::min_granularity(&date_range_granularity, &self_granularity)
249+
} else {
250+
let from_date_str = QueryDateTimeHelper::format_from_date(&date_range.0, 3)?;
251+
let to_date_str = QueryDateTimeHelper::format_to_date(&date_range.1, 3)?;
252+
let is_aligned = granularity_obj.is_aligned_with_date_range(
253+
&from_date_str,
254+
&to_date_str,
255+
query_tools.timezone(),
256+
)?;
257+
258+
if is_aligned {
259+
Ok(self.granularity.clone())
260+
} else {
261+
let self_granularity = granularity_obj.min_granularity()?;
262+
GranularityHelper::min_granularity(
263+
&date_range_granularity,
264+
&self_granularity,
265+
)
266+
}
267+
}
268+
} else {
269+
Ok(self.granularity.clone())
270+
}
245271
} else {
246272
let date_range_granularity = self.date_range_granularity(query_tools.clone())?;
247-
248273
Ok(date_range_granularity)
249274
}
250275
}

0 commit comments

Comments
 (0)