Skip to content

Commit eb56169

Browse files
authored
feat(tesseract): Lambda rollup support (#9806)
1 parent e3a204c commit eb56169

File tree

11 files changed

+417
-29
lines changed

11 files changed

+417
-29
lines changed

packages/cubejs-backend-shared/src/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ const variables: Record<string, (...args: any) => any> = {
224224
.default('1')
225225
.asInt(),
226226
nativeSqlPlanner: () => get('CUBEJS_TESSERACT_SQL_PLANNER').default('false').asBool(),
227+
nativeSqlPlannerPreAggregations: () => get('CUBEJS_TESSERACT_PRE_AGGREGATIONS').default('false').asBool(),
227228
nativeOrchestrator: () => get('CUBEJS_TESSERACT_ORCHESTRATOR')
228229
.default('true')
229230
.asBoolStrict(),

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,8 @@ export class BaseQuery {
336336
*/
337337
this.customSubQueryJoins = this.options.subqueryJoins ?? [];
338338
this.useNativeSqlPlanner = this.options.useNativeSqlPlanner ?? getEnv('nativeSqlPlanner');
339-
this.canUseNativeSqlPlannerPreAggregation = false;
340-
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
339+
this.canUseNativeSqlPlannerPreAggregation = getEnv('nativeSqlPlannerPreAggregations');
340+
if (this.useNativeSqlPlanner && !this.canUseNativeSqlPlannerPreAggregation && !this.neverUseSqlPlannerPreaggregation()) {
341341
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });
342342

343343
this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0;

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,50 @@ describe('PreAggregations', () => {
278278
}
279279
})
280280
281+
cube('visitor_checkins2', {
282+
sql: \`
283+
select * from visitor_checkins
284+
\`,
285+
286+
sqlAlias: 'vc2',
287+
288+
measures: {
289+
count: {
290+
type: 'count'
291+
}
292+
},
293+
294+
dimensions: {
295+
id: {
296+
type: 'number',
297+
sql: 'id',
298+
primaryKey: true
299+
},
300+
visitor_id: {
301+
type: 'number',
302+
sql: 'visitor_id'
303+
},
304+
source: {
305+
type: 'string',
306+
sql: 'source'
307+
},
308+
created_at: {
309+
type: 'time',
310+
sql: 'created_at'
311+
}
312+
},
313+
preAggregations: {
314+
forLambdaS: {
315+
type: 'rollup',
316+
measureReferences: [count],
317+
dimensionReferences: [visitor_id],
318+
timeDimensionReference: created_at,
319+
partitionGranularity: 'day',
320+
granularity: 'day'
321+
},
322+
}
323+
})
324+
281325
282326
cube('visitor_checkins', {
283327
sql: \`
@@ -316,6 +360,10 @@ describe('PreAggregations', () => {
316360
main: {
317361
type: 'originalSql'
318362
},
363+
lambda: {
364+
type: 'rollupLambda',
365+
rollups: [visitor_checkins.forLambda, visitor_checkins2.forLambdaS],
366+
},
319367
forJoin: {
320368
type: 'rollup',
321369
measureReferences: [count],
@@ -327,6 +375,14 @@ describe('PreAggregations', () => {
327375
dimensionReferences: [visitors.source],
328376
rollupReferences: [visitor_checkins.forJoin, visitors.forJoin],
329377
},
378+
forLambda: {
379+
type: 'rollup',
380+
measureReferences: [count],
381+
dimensionReferences: [visitor_id],
382+
timeDimensionReference: created_at,
383+
partitionGranularity: 'day',
384+
granularity: 'day'
385+
},
330386
joinedPartitioned: {
331387
type: 'rollupJoin',
332388
measureReferences: [count],
@@ -1922,7 +1978,7 @@ describe('PreAggregations', () => {
19221978
}, {
19231979
id: 'visitors.source'
19241980
}],
1925-
cubestoreSupportMultistage: getEnv("nativeSqlPlanner")
1981+
cubestoreSupportMultistage: getEnv('nativeSqlPlanner')
19261982
});
19271983

19281984
const queryAndParams = query.buildSqlAndParams();
@@ -2000,7 +2056,7 @@ describe('PreAggregations', () => {
20002056
}, {
20012057
id: 'visitors.source'
20022058
}],
2003-
cubestoreSupportMultistage: getEnv("nativeSqlPlanner")
2059+
cubestoreSupportMultistage: getEnv('nativeSqlPlanner')
20042060
});
20052061

20062062
const queryAndParams = query.buildSqlAndParams();
@@ -2138,6 +2194,78 @@ describe('PreAggregations', () => {
21382194
});
21392195
});
21402196

2197+
if (getEnv('nativeSqlPlanner') && getEnv('nativeSqlPlannerPreAggregations')) {
2198+
it('rollup lambda', async () => {
2199+
await compiler.compile();
2200+
2201+
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
2202+
measures: [
2203+
'visitor_checkins.count',
2204+
],
2205+
dimensions: ['visitor_checkins.visitor_id'],
2206+
timeDimensions: [{
2207+
dimension: 'visitor_checkins.created_at',
2208+
granularity: 'day',
2209+
dateRange: ['2016-12-26', '2017-01-08']
2210+
}],
2211+
timezone: 'America/Los_Angeles',
2212+
preAggregationsSchema: '',
2213+
order: [{
2214+
id: 'visitor_checkins.visitor_id',
2215+
}],
2216+
});
2217+
2218+
const queryAndParams = query.buildSqlAndParams();
2219+
console.log(queryAndParams);
2220+
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
2221+
console.log(preAggregationsDescription);
2222+
2223+
console.log(query.preAggregations?.rollupMatchResultDescriptions());
2224+
2225+
const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription);
2226+
2227+
console.log(JSON.stringify(queries.concat(queryAndParams)));
2228+
2229+
return dbRunner.evaluateQueryWithPreAggregations(query).then(res => {
2230+
console.log(JSON.stringify(res));
2231+
expect(res).toEqual(
2232+
[
2233+
{
2234+
vc__visitor_id: 1,
2235+
vc__created_at_day: '2017-01-02T00:00:00.000Z',
2236+
vc__count: '2'
2237+
},
2238+
{
2239+
vc__visitor_id: 1,
2240+
vc__created_at_day: '2017-01-03T00:00:00.000Z',
2241+
vc__count: '2'
2242+
},
2243+
{
2244+
vc__visitor_id: 1,
2245+
vc__created_at_day: '2017-01-04T00:00:00.000Z',
2246+
vc__count: '2'
2247+
},
2248+
{
2249+
vc__visitor_id: 2,
2250+
vc__created_at_day: '2017-01-04T00:00:00.000Z',
2251+
vc__count: '4'
2252+
},
2253+
{
2254+
vc__visitor_id: 3,
2255+
vc__created_at_day: '2017-01-05T00:00:00.000Z',
2256+
vc__count: '2'
2257+
}
2258+
]
2259+
);
2260+
});
2261+
});
2262+
} else {
2263+
it.skip('rollup lambda: baseQuery generate wrong sql for not external pre-aggregations', async () => {
2264+
// This should be fixed in Tesseract.
2265+
2266+
});
2267+
}
2268+
21412269
it('rollup join', async () => {
21422270
await compiler.compile();
21432271

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,24 @@ use std::rc::Rc;
44

55
#[derive(Clone)]
66
pub struct PreAggregationJoinItem {
7-
pub from: PreAggregationTable,
8-
pub to: PreAggregationTable,
7+
pub from: Rc<PreAggregationSource>,
8+
pub to: Rc<PreAggregationSource>,
99
pub from_members: Vec<Rc<MemberSymbol>>,
1010
pub to_members: Vec<Rc<MemberSymbol>>,
1111
pub on_sql: Rc<SqlCall>,
1212
}
1313

1414
#[derive(Clone)]
1515
pub struct PreAggregationJoin {
16-
pub root: PreAggregationTable,
16+
pub root: Rc<PreAggregationSource>,
1717
pub items: Vec<PreAggregationJoinItem>,
1818
}
1919

20+
#[derive(Clone)]
21+
pub struct PreAggregationUnion {
22+
pub items: Vec<Rc<PreAggregationTable>>,
23+
}
24+
2025
#[derive(Clone)]
2126
pub struct PreAggregationTable {
2227
pub cube_name: String,
@@ -26,8 +31,9 @@ pub struct PreAggregationTable {
2631

2732
#[derive(Clone)]
2833
pub enum PreAggregationSource {
29-
Table(PreAggregationTable),
34+
Single(PreAggregationTable),
3035
Join(PreAggregationJoin),
36+
Union(PreAggregationUnion),
3137
}
3238

3339
#[derive(Clone)]

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs

Lines changed: 111 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::cube_bridge::pre_aggregation_description::PreAggregationDescription;
66
use crate::logical_plan::PreAggregationJoin;
77
use crate::logical_plan::PreAggregationJoinItem;
88
use crate::logical_plan::PreAggregationTable;
9+
use crate::logical_plan::PreAggregationUnion;
910
use crate::planner::planners::JoinPlanner;
1011
use crate::planner::planners::ResolvedJoinItem;
1112
use crate::planner::query_tools::QueryTools;
@@ -103,6 +104,11 @@ impl PreAggregationsCompiler {
103104
};
104105

105106
let static_data = description.static_data();
107+
108+
if static_data.pre_aggregation_type == "rollupLambda" {
109+
return self.build_lambda(name, &description);
110+
}
111+
106112
let measures = if let Some(refs) = description.measure_references()? {
107113
Self::symbols_from_ref(
108114
self.query_tools.clone(),
@@ -151,7 +157,7 @@ impl PreAggregationsCompiler {
151157
let source = if static_data.pre_aggregation_type == "rollupJoin" {
152158
PreAggregationSource::Join(self.build_join_source(&measures, &dimensions, &rollups)?)
153159
} else {
154-
PreAggregationSource::Table(PreAggregationTable {
160+
PreAggregationSource::Single(PreAggregationTable {
155161
cube_name: name.cube_name.clone(),
156162
name: name.name.clone(),
157163
alias: static_data.sql_alias.clone(),
@@ -173,6 +179,108 @@ impl PreAggregationsCompiler {
173179
Ok(res)
174180
}
175181

182+
fn build_lambda(
183+
&mut self,
184+
name: &PreAggregationFullName,
185+
description: &Rc<dyn PreAggregationDescription>,
186+
) -> Result<Rc<CompiledPreAggregation>, CubeError> {
187+
let rollups = if let Some(refs) = description.rollup_references()? {
188+
let r = self
189+
.query_tools
190+
.cube_evaluator()
191+
.evaluate_rollup_references(name.cube_name.clone(), refs)?;
192+
r
193+
} else {
194+
Vec::new()
195+
};
196+
if rollups.is_empty() {
197+
return Err(CubeError::user(format!(
198+
"rollupLambda '{}.{}' should reference at least one rollup",
199+
name.cube_name, name.name
200+
)));
201+
}
202+
203+
let pre_aggrs_for_lambda = rollups
204+
.iter()
205+
.map(|item| -> Result<_, CubeError> {
206+
self.compile_pre_aggregation(&PreAggregationFullName::from_string(item)?)
207+
})
208+
.collect::<Result<Vec<_>, _>>()?;
209+
210+
let mut sources = vec![];
211+
for (i, rollup) in pre_aggrs_for_lambda.clone().iter().enumerate() {
212+
match rollup.source.as_ref() {
213+
PreAggregationSource::Single(table) => {
214+
sources.push(Rc::new(table.clone()));
215+
}
216+
_ => {
217+
return Err(CubeError::user(format!("Rollup lambda can't be nested")));
218+
}
219+
}
220+
if i > 1 {
221+
Self::match_symbols(&rollup.measures, &pre_aggrs_for_lambda[0].measures)?;
222+
Self::match_symbols(&rollup.dimensions, &pre_aggrs_for_lambda[0].dimensions)?;
223+
Self::match_time_dimensions(
224+
&rollup.time_dimensions,
225+
&pre_aggrs_for_lambda[0].time_dimensions,
226+
)?;
227+
}
228+
}
229+
230+
let measures = pre_aggrs_for_lambda[0].measures.clone();
231+
let dimensions = pre_aggrs_for_lambda[0].dimensions.clone();
232+
let time_dimensions = pre_aggrs_for_lambda[0].time_dimensions.clone();
233+
let allow_non_strict_date_range_match = description
234+
.static_data()
235+
.allow_non_strict_date_range_match
236+
.unwrap_or(false);
237+
let granularity = pre_aggrs_for_lambda[0].granularity.clone();
238+
let source = PreAggregationSource::Union(PreAggregationUnion { items: sources });
239+
240+
let static_data = description.static_data();
241+
let res = Rc::new(CompiledPreAggregation {
242+
name: static_data.name.clone(),
243+
cube_name: name.cube_name.clone(),
244+
source: Rc::new(source),
245+
granularity,
246+
external: static_data.external,
247+
measures,
248+
dimensions,
249+
time_dimensions,
250+
allow_non_strict_date_range_match,
251+
});
252+
self.compiled_cache.insert(name.clone(), res.clone());
253+
Ok(res)
254+
}
255+
256+
fn match_symbols(
257+
a: &Vec<Rc<MemberSymbol>>,
258+
b: &Vec<Rc<MemberSymbol>>,
259+
) -> Result<(), CubeError> {
260+
if !a.iter().zip(b.iter()).all(|(a, b)| a.name() == b.name()) {
261+
return Err(CubeError::user(format!(
262+
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
263+
)));
264+
}
265+
Ok(())
266+
}
267+
268+
fn match_time_dimensions(
269+
a: &Vec<(Rc<MemberSymbol>, Option<String>)>,
270+
b: &Vec<(Rc<MemberSymbol>, Option<String>)>,
271+
) -> Result<(), CubeError> {
272+
if !a
273+
.iter()
274+
.zip(b.iter())
275+
.all(|(a, b)| a.0.name() == b.0.name() && a.1 == b.1)
276+
{
277+
return Err(CubeError::user(format!(
278+
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
279+
)));
280+
}
281+
Ok(())
282+
}
283+
176284
fn build_join_source(
177285
&mut self,
178286
measures: &Vec<Rc<MemberSymbol>>,
@@ -249,21 +357,9 @@ impl PreAggregationsCompiler {
249357
let to_pre_aggr =
250358
self.find_pre_aggregation_for_join(pre_aggrs_for_join, &join_item.to_members)?;
251359

252-
let from_table = match from_pre_aggr.source.as_ref() {
253-
PreAggregationSource::Table(t) => t.clone(),
254-
PreAggregationSource::Join(_) => {
255-
return Err(CubeError::user(format!("Rollup join can't be nested")));
256-
}
257-
};
258-
let to_table = match to_pre_aggr.source.as_ref() {
259-
PreAggregationSource::Table(t) => t.clone(),
260-
PreAggregationSource::Join(_) => {
261-
return Err(CubeError::user(format!("Rollup join can't be nested")));
262-
}
263-
};
264360
let res = PreAggregationJoinItem {
265-
from: from_table,
266-
to: to_table,
361+
from: from_pre_aggr.source.clone(),
362+
to: to_pre_aggr.source.clone(),
267363
from_members: join_item.from_members.clone(),
268364
to_members: join_item.to_members.clone(),
269365
on_sql: join_item.on_sql.clone(),

0 commit comments

Comments
 (0)