Skip to content

Commit 6d95a86

Browse files
committed
feat(tesseract): Lambda rollup support
1 parent d2383e6 commit 6d95a86

File tree

8 files changed

+367
-35
lines changed

8 files changed

+367
-35
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,12 +336,12 @@ export class BaseQuery {
336336
*/
337337
this.customSubQueryJoins = this.options.subqueryJoins ?? [];
338338
this.useNativeSqlPlanner = this.options.useNativeSqlPlanner ?? getEnv('nativeSqlPlanner');
339-
this.canUseNativeSqlPlannerPreAggregation = false;
340-
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
339+
this.canUseNativeSqlPlannerPreAggregation = true;
340+
/* if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
341341
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });
342342
343343
this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0;
344-
}
344+
} */
345345
this.queryLevelJoinHints = this.options.joinHints ?? [];
346346
this.prebuildJoin();
347347

@@ -830,6 +830,7 @@ export class BaseQuery {
830830
return this.newQueryWithoutNative().buildSqlAndParams(exportAnnotatedSql);
831831
}
832832
}
833+
console.log("!!!!! RRRRR");
833834

834835
return this.buildSqlAndParamsRust(exportAnnotatedSql);
835836
}
@@ -905,6 +906,8 @@ export class BaseQuery {
905906
if (preAggregation) {
906907
this.preAggregations.preAggregationForQuery = preAggregation;
907908
}
909+
console.log("!!! pags", preAggregation);
910+
console.log("!!! query", query);
908911
return [query, paramsArray];
909912
}
910913

packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1404,7 +1404,7 @@ export class PreAggregations {
14041404
if (tables.length === 1) {
14051405
return tables[0].tableName;
14061406
}
1407-
const union = tables.map(table => `SELECT ${table.columns.join(', ')} FROM ${table.tableName}`).join(' UNION ALL ');
1407+
const union = tables.map(table => `SELECT ${table.columns.join(', ')} FROM ${table.tableName} as ${table.tableName}_pre_agg`).join(' UNION ALL ');
14081408
return `(${union})`;
14091409
}
14101410

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,50 @@ describe('PreAggregations', () => {
278278
}
279279
})
280280
281+
cube('visitor_checkins2', {
282+
sql: \`
283+
select * from visitor_checkins
284+
\`,
285+
286+
sqlAlias: 'vc2',
287+
288+
measures: {
289+
count: {
290+
type: 'count'
291+
}
292+
},
293+
294+
dimensions: {
295+
id: {
296+
type: 'number',
297+
sql: 'id',
298+
primaryKey: true
299+
},
300+
visitor_id: {
301+
type: 'number',
302+
sql: 'visitor_id'
303+
},
304+
source: {
305+
type: 'string',
306+
sql: 'source'
307+
},
308+
created_at: {
309+
type: 'time',
310+
sql: 'created_at'
311+
}
312+
},
313+
preAggregations: {
314+
forLambdaS: {
315+
type: 'rollup',
316+
measureReferences: [count],
317+
dimensionReferences: [visitor_id],
318+
timeDimensionReference: created_at,
319+
partitionGranularity: 'day',
320+
granularity: 'day'
321+
},
322+
}
323+
})
324+
281325
282326
cube('visitor_checkins', {
283327
sql: \`
@@ -316,6 +360,10 @@ describe('PreAggregations', () => {
316360
main: {
317361
type: 'originalSql'
318362
},
363+
lambda: {
364+
type: 'rollupLambda',
365+
rollups: [visitor_checkins.forLambda, visitor_checkins2.forLambdaS],
366+
},
319367
forJoin: {
320368
type: 'rollup',
321369
measureReferences: [count],
@@ -327,6 +375,14 @@ describe('PreAggregations', () => {
327375
dimensionReferences: [visitors.source],
328376
rollupReferences: [visitor_checkins.forJoin, visitors.forJoin],
329377
},
378+
forLambda: {
379+
type: 'rollup',
380+
measureReferences: [count],
381+
dimensionReferences: [visitor_id],
382+
timeDimensionReference: created_at,
383+
partitionGranularity: 'day',
384+
granularity: 'day'
385+
},
330386
joinedPartitioned: {
331387
type: 'rollupJoin',
332388
measureReferences: [count],
@@ -2138,6 +2194,50 @@ describe('PreAggregations', () => {
21382194
});
21392195
});
21402196

2197+
it('rollup lambda', async () => {
2198+
await compiler.compile();
2199+
2200+
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
2201+
measures: [
2202+
'visitor_checkins.count',
2203+
],
2204+
dimensions: ['visitor_checkins.visitor_id'],
2205+
timeDimensions: [{
2206+
dimension: 'visitor_checkins.created_at',
2207+
granularity: 'day',
2208+
dateRange: ['2016-12-26', '2017-01-08']
2209+
}],
2210+
timezone: 'America/Los_Angeles',
2211+
preAggregationsSchema: '',
2212+
order: [{
2213+
id: 'visitor_checkins.visitor_id',
2214+
}],
2215+
});
2216+
2217+
const queryAndParams = query.buildSqlAndParams();
2218+
console.log(queryAndParams);
2219+
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
2220+
console.log(preAggregationsDescription);
2221+
2222+
console.log(query.preAggregations?.rollupMatchResultDescriptions());
2223+
2224+
const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription);
2225+
console.log("!!! ", preAggregationsDescription);
2226+
2227+
console.log(JSON.stringify(queries.concat(queryAndParams)));
2228+
2229+
return dbRunner.evaluateQueryWithPreAggregations(query).then(res => {
2230+
console.log(JSON.stringify(res));
2231+
expect(res).toEqual(
2232+
[
2233+
{ visitors__source: 'google', vc__count: '1' },
2234+
{ visitors__source: 'some', vc__count: '5' },
2235+
{ visitors__source: null, vc__count: null },
2236+
],
2237+
);
2238+
});
2239+
});
2240+
21412241
it('rollup join', async () => {
21422242
await compiler.compile();
21432243

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,24 @@ use std::rc::Rc;
44

55
#[derive(Clone)]
66
pub struct PreAggregationJoinItem {
7-
pub from: PreAggregationTable,
8-
pub to: PreAggregationTable,
7+
pub from: Rc<PreAggregationSource>,
8+
pub to: Rc<PreAggregationSource>,
99
pub from_members: Vec<Rc<MemberSymbol>>,
1010
pub to_members: Vec<Rc<MemberSymbol>>,
1111
pub on_sql: Rc<SqlCall>,
1212
}
1313

1414
#[derive(Clone)]
1515
pub struct PreAggregationJoin {
16-
pub root: PreAggregationTable,
16+
pub root: Rc<PreAggregationSource>,
1717
pub items: Vec<PreAggregationJoinItem>,
1818
}
1919

20+
#[derive(Clone)]
21+
pub struct PreAggregationUnion {
22+
pub items: Vec<Rc<PreAggregationTable>>,
23+
}
24+
2025
#[derive(Clone)]
2126
pub struct PreAggregationTable {
2227
pub cube_name: String,
@@ -26,8 +31,9 @@ pub struct PreAggregationTable {
2631

2732
#[derive(Clone)]
2833
pub enum PreAggregationSource {
29-
Table(PreAggregationTable),
34+
Single(PreAggregationTable),
3035
Join(PreAggregationJoin),
36+
Union(PreAggregationUnion),
3137
}
3238

3339
#[derive(Clone)]

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs

Lines changed: 111 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::cube_bridge::pre_aggregation_description::PreAggregationDescription;
66
use crate::logical_plan::PreAggregationJoin;
77
use crate::logical_plan::PreAggregationJoinItem;
88
use crate::logical_plan::PreAggregationTable;
9+
use crate::logical_plan::PreAggregationUnion;
910
use crate::planner::planners::JoinPlanner;
1011
use crate::planner::planners::ResolvedJoinItem;
1112
use crate::planner::query_tools::QueryTools;
@@ -103,6 +104,11 @@ impl PreAggregationsCompiler {
103104
};
104105

105106
let static_data = description.static_data();
107+
108+
if static_data.pre_aggregation_type == "rollupLambda" {
109+
return self.build_lambda(name, &description);
110+
}
111+
106112
let measures = if let Some(refs) = description.measure_references()? {
107113
Self::symbols_from_ref(
108114
self.query_tools.clone(),
@@ -151,7 +157,7 @@ impl PreAggregationsCompiler {
151157
let source = if static_data.pre_aggregation_type == "rollupJoin" {
152158
PreAggregationSource::Join(self.build_join_source(&measures, &dimensions, &rollups)?)
153159
} else {
154-
PreAggregationSource::Table(PreAggregationTable {
160+
PreAggregationSource::Single(PreAggregationTable {
155161
cube_name: name.cube_name.clone(),
156162
name: name.name.clone(),
157163
alias: static_data.sql_alias.clone(),
@@ -173,6 +179,108 @@ impl PreAggregationsCompiler {
173179
Ok(res)
174180
}
175181

182+
fn build_lambda(
183+
&mut self,
184+
name: &PreAggregationFullName,
185+
description: &Rc<dyn PreAggregationDescription>,
186+
) -> Result<Rc<CompiledPreAggregation>, CubeError> {
187+
let rollups = if let Some(refs) = description.rollup_references()? {
188+
let r = self
189+
.query_tools
190+
.cube_evaluator()
191+
.evaluate_rollup_references(name.cube_name.clone(), refs)?;
192+
r
193+
} else {
194+
Vec::new()
195+
};
196+
if rollups.len() == 0 {
197+
return Err(CubeError::user(format!(
198+
"rollupLambda '{}.{}' should reference at least one rollup",
199+
name.cube_name, name.name
200+
)));
201+
}
202+
203+
let pre_aggrs_for_lambda = rollups
204+
.iter()
205+
.map(|item| -> Result<_, CubeError> {
206+
self.compile_pre_aggregation(&PreAggregationFullName::from_string(item)?)
207+
})
208+
.collect::<Result<Vec<_>, _>>()?;
209+
210+
let mut sources = vec![];
211+
for (i, rollup) in pre_aggrs_for_lambda.clone().iter().enumerate() {
212+
match rollup.source.as_ref() {
213+
PreAggregationSource::Single(table) => {
214+
sources.push(Rc::new(table.clone()));
215+
}
216+
_ => {
217+
return Err(CubeError::user(format!("Rollup lambda can't be nested")));
218+
}
219+
}
220+
if i > 1 {
221+
Self::match_symbols(&rollup.measures, &pre_aggrs_for_lambda[0].measures)?;
222+
Self::match_symbols(&rollup.dimensions, &pre_aggrs_for_lambda[0].dimensions)?;
223+
Self::match_time_dimensions(
224+
&rollup.time_dimensions,
225+
&pre_aggrs_for_lambda[0].time_dimensions,
226+
)?;
227+
}
228+
}
229+
230+
let measures = pre_aggrs_for_lambda[0].measures.clone();
231+
let dimensions = pre_aggrs_for_lambda[0].dimensions.clone();
232+
let time_dimensions = pre_aggrs_for_lambda[0].time_dimensions.clone();
233+
let allow_non_strict_date_range_match = description
234+
.static_data()
235+
.allow_non_strict_date_range_match
236+
.unwrap_or(false);
237+
let granularity = pre_aggrs_for_lambda[0].granularity.clone();
238+
let source = PreAggregationSource::Union(PreAggregationUnion { items: sources });
239+
240+
let static_data = description.static_data();
241+
let res = Rc::new(CompiledPreAggregation {
242+
name: static_data.name.clone(),
243+
cube_name: name.cube_name.clone(),
244+
source: Rc::new(source),
245+
granularity,
246+
external: static_data.external,
247+
measures,
248+
dimensions,
249+
time_dimensions,
250+
allow_non_strict_date_range_match,
251+
});
252+
self.compiled_cache.insert(name.clone(), res.clone());
253+
Ok(res)
254+
}
255+
256+
fn match_symbols(
257+
a: &Vec<Rc<MemberSymbol>>,
258+
b: &Vec<Rc<MemberSymbol>>,
259+
) -> Result<(), CubeError> {
260+
if !a.iter().zip(b.iter()).all(|(a, b)| a.name() == b.name()) {
261+
return Err(CubeError::user(format!(
262+
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
263+
)));
264+
}
265+
Ok(())
266+
}
267+
268+
fn match_time_dimensions(
269+
a: &Vec<(Rc<MemberSymbol>, Option<String>)>,
270+
b: &Vec<(Rc<MemberSymbol>, Option<String>)>,
271+
) -> Result<(), CubeError> {
272+
if !a
273+
.iter()
274+
.zip(b.iter())
275+
.all(|(a, b)| a.0.name() == b.0.name() && a.1 == b.1)
276+
{
277+
return Err(CubeError::user(format!(
278+
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
279+
)));
280+
}
281+
Ok(())
282+
}
283+
176284
fn build_join_source(
177285
&mut self,
178286
measures: &Vec<Rc<MemberSymbol>>,
@@ -249,21 +357,9 @@ impl PreAggregationsCompiler {
249357
let to_pre_aggr =
250358
self.find_pre_aggregation_for_join(pre_aggrs_for_join, &join_item.to_members)?;
251359

252-
let from_table = match from_pre_aggr.source.as_ref() {
253-
PreAggregationSource::Table(t) => t.clone(),
254-
PreAggregationSource::Join(_) => {
255-
return Err(CubeError::user(format!("Rollup join can't be nested")));
256-
}
257-
};
258-
let to_table = match to_pre_aggr.source.as_ref() {
259-
PreAggregationSource::Table(t) => t.clone(),
260-
PreAggregationSource::Join(_) => {
261-
return Err(CubeError::user(format!("Rollup join can't be nested")));
262-
}
263-
};
264360
let res = PreAggregationJoinItem {
265-
from: from_table,
266-
to: to_table,
361+
from: from_pre_aggr.source.clone(),
362+
to: to_pre_aggr.source.clone(),
267363
from_members: join_item.from_members.clone(),
268364
to_members: join_item.to_members.clone(),
269365
on_sql: join_item.on_sql.clone(),

0 commit comments

Comments
 (0)