Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,12 @@ export class BaseQuery {
*/
this.customSubQueryJoins = this.options.subqueryJoins ?? [];
this.useNativeSqlPlanner = this.options.useNativeSqlPlanner ?? getEnv('nativeSqlPlanner');
this.canUseNativeSqlPlannerPreAggregation = false;
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
this.canUseNativeSqlPlannerPreAggregation = true;
/* if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });

this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0;
}
} */
this.queryLevelJoinHints = this.options.joinHints ?? [];
this.prebuildJoin();

Expand Down Expand Up @@ -830,6 +830,7 @@ export class BaseQuery {
return this.newQueryWithoutNative().buildSqlAndParams(exportAnnotatedSql);
}
}
console.log("!!!!! RRRRR");

return this.buildSqlAndParamsRust(exportAnnotatedSql);
}
Expand Down Expand Up @@ -905,6 +906,8 @@ export class BaseQuery {
if (preAggregation) {
this.preAggregations.preAggregationForQuery = preAggregation;
}
console.log("!!! pags", preAggregation);
console.log("!!! query", query);
return [query, paramsArray];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ export class PreAggregations {
if (tables.length === 1) {
return tables[0].tableName;
}
const union = tables.map(table => `SELECT ${table.columns.join(', ')} FROM ${table.tableName}`).join(' UNION ALL ');
const union = tables.map(table => `SELECT ${table.columns.join(', ')} FROM ${table.tableName} as ${table.tableName}_pre_agg`).join(' UNION ALL ');
return `(${union})`;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,50 @@ describe('PreAggregations', () => {
}
})

cube('visitor_checkins2', {
sql: \`
select * from visitor_checkins
\`,

sqlAlias: 'vc2',

measures: {
count: {
type: 'count'
}
},

dimensions: {
id: {
type: 'number',
sql: 'id',
primaryKey: true
},
visitor_id: {
type: 'number',
sql: 'visitor_id'
},
source: {
type: 'string',
sql: 'source'
},
created_at: {
type: 'time',
sql: 'created_at'
}
},
preAggregations: {
forLambdaS: {
type: 'rollup',
measureReferences: [count],
dimensionReferences: [visitor_id],
timeDimensionReference: created_at,
partitionGranularity: 'day',
granularity: 'day'
},
}
})


cube('visitor_checkins', {
sql: \`
Expand Down Expand Up @@ -316,6 +360,10 @@ describe('PreAggregations', () => {
main: {
type: 'originalSql'
},
lambda: {
type: 'rollupLambda',
rollups: [visitor_checkins.forLambda, visitor_checkins2.forLambdaS],
},
forJoin: {
type: 'rollup',
measureReferences: [count],
Expand All @@ -327,6 +375,14 @@ describe('PreAggregations', () => {
dimensionReferences: [visitors.source],
rollupReferences: [visitor_checkins.forJoin, visitors.forJoin],
},
forLambda: {
type: 'rollup',
measureReferences: [count],
dimensionReferences: [visitor_id],
timeDimensionReference: created_at,
partitionGranularity: 'day',
granularity: 'day'
},
joinedPartitioned: {
type: 'rollupJoin',
measureReferences: [count],
Expand Down Expand Up @@ -2138,6 +2194,50 @@ describe('PreAggregations', () => {
});
});

it('rollup lambda', async () => {
await compiler.compile();

const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
measures: [
'visitor_checkins.count',
],
dimensions: ['visitor_checkins.visitor_id'],
timeDimensions: [{
dimension: 'visitor_checkins.created_at',
granularity: 'day',
dateRange: ['2016-12-26', '2017-01-08']
}],
timezone: 'America/Los_Angeles',
preAggregationsSchema: '',
order: [{
id: 'visitor_checkins.visitor_id',
}],
});

const queryAndParams = query.buildSqlAndParams();
console.log(queryAndParams);
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
console.log(preAggregationsDescription);

console.log(query.preAggregations?.rollupMatchResultDescriptions());

const queries = dbRunner.tempTablePreAggregations(preAggregationsDescription);
console.log("!!! ", preAggregationsDescription);

console.log(JSON.stringify(queries.concat(queryAndParams)));

return dbRunner.evaluateQueryWithPreAggregations(query).then(res => {
console.log(JSON.stringify(res));
expect(res).toEqual(
[
{ visitors__source: 'google', vc__count: '1' },
{ visitors__source: 'some', vc__count: '5' },
{ visitors__source: null, vc__count: null },
],
);
});
});

it('rollup join', async () => {
await compiler.compile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ use std::rc::Rc;

#[derive(Clone)]
pub struct PreAggregationJoinItem {
pub from: PreAggregationTable,
pub to: PreAggregationTable,
pub from: Rc<PreAggregationSource>,
pub to: Rc<PreAggregationSource>,
pub from_members: Vec<Rc<MemberSymbol>>,
pub to_members: Vec<Rc<MemberSymbol>>,
pub on_sql: Rc<SqlCall>,
}

#[derive(Clone)]
pub struct PreAggregationJoin {
pub root: PreAggregationTable,
pub root: Rc<PreAggregationSource>,
pub items: Vec<PreAggregationJoinItem>,
}

#[derive(Clone)]
pub struct PreAggregationUnion {
pub items: Vec<Rc<PreAggregationTable>>,
}

#[derive(Clone)]
pub struct PreAggregationTable {
pub cube_name: String,
Expand All @@ -26,8 +31,9 @@ pub struct PreAggregationTable {

#[derive(Clone)]
pub enum PreAggregationSource {
Table(PreAggregationTable),
Single(PreAggregationTable),
Join(PreAggregationJoin),
Union(PreAggregationUnion),
}

#[derive(Clone)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::cube_bridge::pre_aggregation_description::PreAggregationDescription;
use crate::logical_plan::PreAggregationJoin;
use crate::logical_plan::PreAggregationJoinItem;
use crate::logical_plan::PreAggregationTable;
use crate::logical_plan::PreAggregationUnion;
use crate::planner::planners::JoinPlanner;
use crate::planner::planners::ResolvedJoinItem;
use crate::planner::query_tools::QueryTools;
Expand Down Expand Up @@ -103,6 +104,11 @@ impl PreAggregationsCompiler {
};

let static_data = description.static_data();

if static_data.pre_aggregation_type == "rollupLambda" {
return self.build_lambda(name, &description);
}

let measures = if let Some(refs) = description.measure_references()? {
Self::symbols_from_ref(
self.query_tools.clone(),
Expand Down Expand Up @@ -151,7 +157,7 @@ impl PreAggregationsCompiler {
let source = if static_data.pre_aggregation_type == "rollupJoin" {
PreAggregationSource::Join(self.build_join_source(&measures, &dimensions, &rollups)?)
} else {
PreAggregationSource::Table(PreAggregationTable {
PreAggregationSource::Single(PreAggregationTable {
cube_name: name.cube_name.clone(),
name: name.name.clone(),
alias: static_data.sql_alias.clone(),
Expand All @@ -173,6 +179,108 @@ impl PreAggregationsCompiler {
Ok(res)
}

fn build_lambda(
&mut self,
name: &PreAggregationFullName,
description: &Rc<dyn PreAggregationDescription>,
) -> Result<Rc<CompiledPreAggregation>, CubeError> {
let rollups = if let Some(refs) = description.rollup_references()? {
let r = self
.query_tools
.cube_evaluator()
.evaluate_rollup_references(name.cube_name.clone(), refs)?;
r
} else {
Vec::new()
};
if rollups.len() == 0 {
return Err(CubeError::user(format!(
"rollupLambda '{}.{}' should reference at least one rollup",
name.cube_name, name.name
)));
}

let pre_aggrs_for_lambda = rollups
.iter()
.map(|item| -> Result<_, CubeError> {
self.compile_pre_aggregation(&PreAggregationFullName::from_string(item)?)
})
.collect::<Result<Vec<_>, _>>()?;

let mut sources = vec![];
for (i, rollup) in pre_aggrs_for_lambda.clone().iter().enumerate() {
match rollup.source.as_ref() {
PreAggregationSource::Single(table) => {
sources.push(Rc::new(table.clone()));
}
_ => {
return Err(CubeError::user(format!("Rollup lambda can't be nested")));
}
}
if i > 1 {
Self::match_symbols(&rollup.measures, &pre_aggrs_for_lambda[0].measures)?;
Self::match_symbols(&rollup.dimensions, &pre_aggrs_for_lambda[0].dimensions)?;
Self::match_time_dimensions(
&rollup.time_dimensions,
&pre_aggrs_for_lambda[0].time_dimensions,
)?;
}
}

let measures = pre_aggrs_for_lambda[0].measures.clone();
let dimensions = pre_aggrs_for_lambda[0].dimensions.clone();
let time_dimensions = pre_aggrs_for_lambda[0].time_dimensions.clone();
let allow_non_strict_date_range_match = description
.static_data()
.allow_non_strict_date_range_match
.unwrap_or(false);
let granularity = pre_aggrs_for_lambda[0].granularity.clone();
let source = PreAggregationSource::Union(PreAggregationUnion { items: sources });

let static_data = description.static_data();
let res = Rc::new(CompiledPreAggregation {
name: static_data.name.clone(),
cube_name: name.cube_name.clone(),
source: Rc::new(source),
granularity,
external: static_data.external,
measures,
dimensions,
time_dimensions,
allow_non_strict_date_range_match,
});
self.compiled_cache.insert(name.clone(), res.clone());
Ok(res)
}

fn match_symbols(
a: &Vec<Rc<MemberSymbol>>,
b: &Vec<Rc<MemberSymbol>>,
) -> Result<(), CubeError> {
if !a.iter().zip(b.iter()).all(|(a, b)| a.name() == b.name()) {
return Err(CubeError::user(format!(
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
)));
}
Ok(())
}

fn match_time_dimensions(
a: &Vec<(Rc<MemberSymbol>, Option<String>)>,
b: &Vec<(Rc<MemberSymbol>, Option<String>)>,
) -> Result<(), CubeError> {
if !a
.iter()
.zip(b.iter())
.all(|(a, b)| a.0.name() == b.0.name() && a.1 == b.1)
{
return Err(CubeError::user(format!(
"Names for pre-aggregation symbols in lambda pre-aggragation don't match"
)));
}
Ok(())
}

fn build_join_source(
&mut self,
measures: &Vec<Rc<MemberSymbol>>,
Expand Down Expand Up @@ -249,21 +357,9 @@ impl PreAggregationsCompiler {
let to_pre_aggr =
self.find_pre_aggregation_for_join(pre_aggrs_for_join, &join_item.to_members)?;

let from_table = match from_pre_aggr.source.as_ref() {
PreAggregationSource::Table(t) => t.clone(),
PreAggregationSource::Join(_) => {
return Err(CubeError::user(format!("Rollup join can't be nested")));
}
};
let to_table = match to_pre_aggr.source.as_ref() {
PreAggregationSource::Table(t) => t.clone(),
PreAggregationSource::Join(_) => {
return Err(CubeError::user(format!("Rollup join can't be nested")));
}
};
let res = PreAggregationJoinItem {
from: from_table,
to: to_table,
from: from_pre_aggr.source.clone(),
to: to_pre_aggr.source.clone(),
from_members: join_item.from_members.clone(),
to_members: join_item.to_members.clone(),
on_sql: join_item.on_sql.clone(),
Expand Down
Loading
Loading