Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ const variables: Record<string, (...args: any) => any> = {
.default('1')
.asInt(),
nativeSqlPlanner: () => get('CUBEJS_TESSERACT_SQL_PLANNER').default('false').asBool(),
nativeSqlPlannerPreAggregations: () => get('CUBEJS_TESSERACT_PRE_AGGREGATIONS').default('false').asBool(),
nativeOrchestrator: () => get('CUBEJS_TESSERACT_ORCHESTRATOR')
.default('true')
.asBoolStrict(),
Expand Down
5 changes: 3 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,9 @@ export class BaseQuery {
*/
this.customSubQueryJoins = this.options.subqueryJoins ?? [];
this.useNativeSqlPlanner = this.options.useNativeSqlPlanner ?? getEnv('nativeSqlPlanner');
this.canUseNativeSqlPlannerPreAggregation = false;
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
this.canUseNativeSqlPlannerPreAggregation = getEnv('nativeSqlPlannerPreAggregations');
this.canUseNativeSqlPlannerPreAggregation = getEnv('nativeSqlPlannerPreAggregations');
if (this.useNativeSqlPlanner && !this.canUseNativeSqlPlannerPreAggregation && !this.neverUseSqlPlannerPreaggregation()) {
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });

this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,50 @@ describe('PreAggregations', () => {
}
})

cube('visitor_checkins2', {
sql: \`
select * from visitor_checkins
\`,

sqlAlias: 'vc2',

measures: {
count: {
type: 'count'
}
},

dimensions: {
id: {
type: 'number',
sql: 'id',
primaryKey: true
},
visitor_id: {
type: 'number',
sql: 'visitor_id'
},
source: {
type: 'string',
sql: 'source'
},
created_at: {
type: 'time',
sql: 'created_at'
}
},
preAggregations: {
forLambdaS: {
type: 'rollup',
measureReferences: [count],
dimensionReferences: [visitor_id],
timeDimensionReference: created_at,
partitionGranularity: 'day',
granularity: 'day'
},
}
})


cube('visitor_checkins', {
sql: \`
Expand Down Expand Up @@ -316,6 +360,10 @@ describe('PreAggregations', () => {
main: {
type: 'originalSql'
},
lambda: {
type: 'rollupLambda',
rollups: [visitor_checkins.forLambda, visitor_checkins2.forLambdaS],
},
forJoin: {
type: 'rollup',
measureReferences: [count],
Expand All @@ -327,6 +375,14 @@ describe('PreAggregations', () => {
dimensionReferences: [visitors.source],
rollupReferences: [visitor_checkins.forJoin, visitors.forJoin],
},
forLambda: {
type: 'rollup',
measureReferences: [count],
dimensionReferences: [visitor_id],
timeDimensionReference: created_at,
partitionGranularity: 'day',
granularity: 'day'
},
joinedPartitioned: {
type: 'rollupJoin',
measureReferences: [count],
Expand Down Expand Up @@ -1922,7 +1978,7 @@ describe('PreAggregations', () => {
}, {
id: 'visitors.source'
}],
cubestoreSupportMultistage: getEnv("nativeSqlPlanner")
cubestoreSupportMultistage: getEnv('nativeSqlPlanner')
});

const queryAndParams = query.buildSqlAndParams();
Expand Down Expand Up @@ -2000,7 +2056,7 @@ describe('PreAggregations', () => {
}, {
id: 'visitors.source'
}],
cubestoreSupportMultistage: getEnv("nativeSqlPlanner")
cubestoreSupportMultistage: getEnv('nativeSqlPlanner')
});

const queryAndParams = query.buildSqlAndParams();
Expand Down Expand Up @@ -2138,6 +2194,78 @@ describe('PreAggregations', () => {
});
});

if (getEnv('nativeSqlPlanner') && getEnv('nativeSqlPlannerPreAggregations')) {
it('rollup lambda', async () => {
await compiler.compile();

const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
measures: [
'visitor_checkins.count',
],
dimensions: ['visitor_checkins.visitor_id'],
timeDimensions: [{
dimension: 'visitor_checkins.created_at',
granularity: 'day',
dateRange: ['2016-12-26', '2017-01-08']
}],
timezone: 'America/Los_Angeles',
preAggregationsSchema: '',
order: [{
id: 'visitor_checkins.visitor_id',
}],
});

const queryAndParams = query.buildSqlAndParams();
console.log(queryAndParams);
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
console.log(preAggregationsDescription);

console.log(query.preAggregations?.rollupMatchResultDescriptions());

const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription);

console.log(JSON.stringify(queries.concat(queryAndParams)));

return dbRunner.evaluateQueryWithPreAggregations(query).then(res => {
console.log(JSON.stringify(res));
expect(res).toEqual(
[
{
vc__visitor_id: 1,
vc__created_at_day: '2017-01-02T00:00:00.000Z',
vc__count: '2'
},
{
vc__visitor_id: 1,
vc__created_at_day: '2017-01-03T00:00:00.000Z',
vc__count: '2'
},
{
vc__visitor_id: 1,
vc__created_at_day: '2017-01-04T00:00:00.000Z',
vc__count: '2'
},
{
vc__visitor_id: 2,
vc__created_at_day: '2017-01-04T00:00:00.000Z',
vc__count: '4'
},
{
vc__visitor_id: 3,
vc__created_at_day: '2017-01-05T00:00:00.000Z',
vc__count: '2'
}
]
);
});
});
} else {
it.skip('rollup lambda: baseQuery generate wrong sql for not external pre-aggregations', async () => {
// This should be fixed in Tesseract.

});
}

it('rollup join', async () => {
await compiler.compile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ use std::rc::Rc;

#[derive(Clone)]
pub struct PreAggregationJoinItem {
pub from: PreAggregationTable,
pub to: PreAggregationTable,
pub from: Rc<PreAggregationSource>,
pub to: Rc<PreAggregationSource>,
pub from_members: Vec<Rc<MemberSymbol>>,
pub to_members: Vec<Rc<MemberSymbol>>,
pub on_sql: Rc<SqlCall>,
}

#[derive(Clone)]
pub struct PreAggregationJoin {
pub root: PreAggregationTable,
pub root: Rc<PreAggregationSource>,
pub items: Vec<PreAggregationJoinItem>,
}

#[derive(Clone)]
pub struct PreAggregationUnion {
pub items: Vec<Rc<PreAggregationTable>>,
}

#[derive(Clone)]
pub struct PreAggregationTable {
pub cube_name: String,
Expand All @@ -26,8 +31,9 @@ pub struct PreAggregationTable {

#[derive(Clone)]
pub enum PreAggregationSource {
Table(PreAggregationTable),
Single(PreAggregationTable),
Join(PreAggregationJoin),
Union(PreAggregationUnion),
}

#[derive(Clone)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::cube_bridge::pre_aggregation_description::PreAggregationDescription;
use crate::logical_plan::PreAggregationJoin;
use crate::logical_plan::PreAggregationJoinItem;
use crate::logical_plan::PreAggregationTable;
use crate::logical_plan::PreAggregationUnion;
use crate::planner::planners::JoinPlanner;
use crate::planner::planners::ResolvedJoinItem;
use crate::planner::query_tools::QueryTools;
Expand Down Expand Up @@ -103,6 +104,11 @@ impl PreAggregationsCompiler {
};

let static_data = description.static_data();

if static_data.pre_aggregation_type == "rollupLambda" {
return self.build_lambda(name, &description);
}

let measures = if let Some(refs) = description.measure_references()? {
Self::symbols_from_ref(
self.query_tools.clone(),
Expand Down Expand Up @@ -151,7 +157,7 @@ impl PreAggregationsCompiler {
let source = if static_data.pre_aggregation_type == "rollupJoin" {
PreAggregationSource::Join(self.build_join_source(&measures, &dimensions, &rollups)?)
} else {
PreAggregationSource::Table(PreAggregationTable {
PreAggregationSource::Single(PreAggregationTable {
cube_name: name.cube_name.clone(),
name: name.name.clone(),
alias: static_data.sql_alias.clone(),
Expand All @@ -173,6 +179,108 @@ impl PreAggregationsCompiler {
Ok(res)
}

fn build_lambda(
&mut self,
name: &PreAggregationFullName,
description: &Rc<dyn PreAggregationDescription>,
) -> Result<Rc<CompiledPreAggregation>, CubeError> {
let rollups = if let Some(refs) = description.rollup_references()? {
let r = self
.query_tools
.cube_evaluator()
.evaluate_rollup_references(name.cube_name.clone(), refs)?;
r
} else {
Vec::new()
};
if rollups.is_empty() {
return Err(CubeError::user(format!(
"rollupLambda '{}.{}' should reference at least one rollup",
name.cube_name, name.name
)));
}

let pre_aggrs_for_lambda = rollups
.iter()
.map(|item| -> Result<_, CubeError> {
self.compile_pre_aggregation(&PreAggregationFullName::from_string(item)?)
})
.collect::<Result<Vec<_>, _>>()?;

let mut sources = vec![];
for (i, rollup) in pre_aggrs_for_lambda.clone().iter().enumerate() {
match rollup.source.as_ref() {
PreAggregationSource::Single(table) => {
sources.push(Rc::new(table.clone()));
}
_ => {
return Err(CubeError::user(format!("Rollup lambda can't be nested")));
}
}
if i > 1 {
Self::match_symbols(&rollup.measures, &pre_aggrs_for_lambda[0].measures)?;
Self::match_symbols(&rollup.dimensions, &pre_aggrs_for_lambda[0].dimensions)?;
Self::match_time_dimensions(
&rollup.time_dimensions,
&pre_aggrs_for_lambda[0].time_dimensions,
)?;
}
}

let measures = pre_aggrs_for_lambda[0].measures.clone();
let dimensions = pre_aggrs_for_lambda[0].dimensions.clone();
let time_dimensions = pre_aggrs_for_lambda[0].time_dimensions.clone();
let allow_non_strict_date_range_match = description
.static_data()
.allow_non_strict_date_range_match
.unwrap_or(false);
let granularity = pre_aggrs_for_lambda[0].granularity.clone();
let source = PreAggregationSource::Union(PreAggregationUnion { items: sources });

let static_data = description.static_data();
let res = Rc::new(CompiledPreAggregation {
name: static_data.name.clone(),
cube_name: name.cube_name.clone(),
source: Rc::new(source),
granularity,
external: static_data.external,
measures,
dimensions,
time_dimensions,
allow_non_strict_date_range_match,
});
self.compiled_cache.insert(name.clone(), res.clone());
Ok(res)
}

fn match_symbols(
a: &Vec<Rc<MemberSymbol>>,
b: &Vec<Rc<MemberSymbol>>,
) -> Result<(), CubeError> {
if !a.iter().zip(b.iter()).all(|(a, b)| a.name() == b.name()) {
return Err(CubeError::user(format!(
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
)));
}
Ok(())
}

fn match_time_dimensions(
a: &Vec<(Rc<MemberSymbol>, Option<String>)>,
b: &Vec<(Rc<MemberSymbol>, Option<String>)>,
) -> Result<(), CubeError> {
if !a
.iter()
.zip(b.iter())
.all(|(a, b)| a.0.name() == b.0.name() && a.1 == b.1)
{
return Err(CubeError::user(format!(
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
)));
}
Ok(())
}

fn build_join_source(
&mut self,
measures: &Vec<Rc<MemberSymbol>>,
Expand Down Expand Up @@ -249,21 +357,9 @@ impl PreAggregationsCompiler {
let to_pre_aggr =
self.find_pre_aggregation_for_join(pre_aggrs_for_join, &join_item.to_members)?;

let from_table = match from_pre_aggr.source.as_ref() {
PreAggregationSource::Table(t) => t.clone(),
PreAggregationSource::Join(_) => {
return Err(CubeError::user(format!("Rollup join can't be nested")));
}
};
let to_table = match to_pre_aggr.source.as_ref() {
PreAggregationSource::Table(t) => t.clone(),
PreAggregationSource::Join(_) => {
return Err(CubeError::user(format!("Rollup join can't be nested")));
}
};
let res = PreAggregationJoinItem {
from: from_table,
to: to_table,
from: from_pre_aggr.source.clone(),
to: to_pre_aggr.source.clone(),
from_members: join_item.from_members.clone(),
to_members: join_item.to_members.clone(),
on_sql: join_item.on_sql.clone(),
Expand Down
Loading
Loading