Skip to content

Commit 5101e78

Browse files
committed
in work
1 parent ba827d0 commit 5101e78

File tree

30 files changed

+970
-45
lines changed

30 files changed

+970
-45
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,9 @@ export class BaseQuery {
715715
const res = buildResult.result;
716716
// FIXME
717717
res[1] = [...res[1]];
718+
if (res[2]) {
719+
this.preAggregations.preAggregationForQuery = res[2];
720+
}
718721
return res;
719722
}
720723

@@ -738,6 +741,10 @@ export class BaseQuery {
738741
return timeSeriesFromCustomInterval(granularityInterval, dateRange, moment(origin), { timestampPrecision: 3 });
739742
}
740743

744+
getPreAggregationByName(cube, preAggregationName) {
745+
return this.preAggregations.getRollupPreAggregationByName(cube, preAggregationName);
746+
}
747+
741748
get shouldReuseParams() {
742749
return false;
743750
}

packages/cubejs-schema-compiler/src/adapter/PreAggregations.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,29 @@ export class PreAggregations {
819819
)(preAggregations);
820820
}
821821

822+
getRollupPreAggregationByName(cube, preAggregationName) {
823+
const canUsePreAggregation = () => true;
824+
const preAggregation = R.pipe(
825+
R.toPairs,
826+
// eslint-disable-next-line no-unused-vars
827+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
828+
R.filter(([k, a]) => a.type === 'rollup' || a.type === 'rollupJoin' || a.type === 'rollupLambda'),
829+
// eslint-disable-next-line no-unused-vars
830+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
831+
R.find(([k, a]) => k === preAggregationName)
832+
)(this.query.cubeEvaluator.preAggregationsForCube(cube));
833+
if (preAggregation) {
834+
const tableName = this.preAggregationTableName(cube, preAggregation[0], preAggregation[1]);
835+
const preAggObj = preAggregation ? this.evaluatedPreAggregationObj(cube, preAggregation[0], preAggregation[1], canUsePreAggregation) : {};
836+
return {
837+
tableName,
838+
...preAggObj
839+
};
840+
} else {
841+
return {};
842+
}
843+
}
844+
822845
// TODO check multiplication factor didn't change
823846
buildRollupJoin(preAggObj, preAggObjsToJoin) {
824847
return this.query.cacheValue(

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ describe('PreAggregations', () => {
231231
granularity: 'hour',
232232
partitionGranularity: 'month'
233233
},
234-
/* countCustomGranularity: {
234+
countCustomGranularity: {
235235
measures: [count],
236236
timeDimension: createdAt,
237237
granularity: 'hourTenMinOffset',
@@ -517,7 +517,6 @@ describe('PreAggregations', () => {
517517
console.log(queryAndParams);
518518
console.log("!!!! pre aggrs", query.preAggregations?.preAggregationForQuery);
519519
console.log("!!!! pre aggrs fun", query.preAggregations?.preAggregationForQuery.preAggregation);
520-
console.log("!!!! pre aggrs fun 2", query.preAggregations?.preAggregationForQuery.preAggregation.dimensionReferences.toString());
521520
expect(query.preAggregations?.preAggregationForQuery?.canUsePreAggregation).toEqual(true);
522521

523522
return dbRunner.evaluateQueryWithPreAggregations(query).then(res => {
@@ -887,7 +886,7 @@ describe('PreAggregations', () => {
887886
});
888887
}));
889888

890-
it('multiplied measure match', () => compiler.compile().then(() => {
889+
it('multiplied measure match 1', () => compiler.compile().then(() => {
891890
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
892891
measures: [
893892
'visitors.count'

packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,6 @@ export class BaseDbRunner {
171171

172172
public async evaluateQueryWithPreAggregations(query, seed = this.nextSeed++) {
173173
const preAggregationsDescription = query.preAggregations?.preAggregationsDescription();
174-
// console.log(preAggregationsDescription);
175-
176174
await Promise.all(preAggregationsDescription.map(
177175
async desc => {
178176
if (desc.partitionGranularity) {

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use super::base_query_options::FilterItem;
22
use super::filter_group::{FilterGroup, NativeFilterGroup};
33
use super::filter_params::{FilterParams, NativeFilterParams};
4+
use cubenativeutils::wrappers::NativeArray;
45
use super::security_context::{NativeSecurityContext, SecurityContext};
56
use super::sql_templates_render::{NativeSqlTemplatesRender, SqlTemplatesRender};
7+
use super::pre_aggregation_obj::{NativePreAggregationObj, PreAggregationObj};
68
use super::sql_utils::{NativeSqlUtils, SqlUtils};
79
use cubenativeutils::wrappers::serializer::{
810
NativeDeserialize, NativeDeserializer, NativeSerialize,
@@ -58,4 +60,6 @@ pub trait BaseTools {
5860
source: String,
5961
origin: String,
6062
) -> Result<String, CubeError>;
63+
64+
fn get_pre_aggregation_by_name(&self, cube_name: String, name: String) -> Result<Rc<dyn PreAggregationObj>, CubeError>;
6165
}

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ pub mod segment_definition;
2727
pub mod sql_templates_render;
2828
pub mod sql_utils;
2929
pub mod struct_with_sql_member;
30+
pub mod pre_aggregation_obj;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use super::member_sql::{MemberSql, NativeMemberSql};
2+
use cubenativeutils::wrappers::serializer::{
3+
NativeDeserialize, NativeDeserializer, NativeSerialize,
4+
};
5+
use cubenativeutils::wrappers::NativeArray;
6+
use cubenativeutils::wrappers::NativeContextHolder;
7+
use cubenativeutils::wrappers::NativeObjectHandle;
8+
use cubenativeutils::CubeError;
9+
use serde::{Deserialize, Serialize};
10+
use std::any::Any;
11+
use std::rc::Rc;
12+
13+
#[derive(Serialize, Deserialize, Debug)]
14+
pub struct PreAggregationObjStatic {
15+
#[serde(rename = "tableName")]
16+
pub table_name: Option<String>,
17+
#[serde(rename = "preAggregationName")]
18+
pub pre_aggregation_name: Option<String>,
19+
pub cube: Option<String>,
20+
#[serde(rename = "preAggregationId")]
21+
pub pre_aggregation_id: Option<String>,
22+
}
23+
24+
#[nativebridge::native_bridge(PreAggregationObjStatic)]
25+
pub trait PreAggregationObj {
26+
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate_query.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::*;
2-
use crate::planner::query_properties::OrderByItem;
32
use std::rc::Rc;
3+
use crate::planner::query_properties::OrderByItem;
44

55
pub struct FullKeyAggregateQuery {
66
pub multistage_members: Vec<Rc<LogicalMultiStageMember>>,
@@ -14,7 +14,7 @@ pub struct FullKeyAggregateQuery {
1414
}
1515

1616
impl PrettyPrint for FullKeyAggregateQuery {
17-
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
17+
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
1818
result.println("FullKeyAggregateQuery: ", state);
1919
let state = state.new_level();
2020
let details_state = state.new_level();
@@ -39,17 +39,10 @@ impl PrettyPrint for FullKeyAggregateQuery {
3939
if !self.order_by.is_empty() {
4040
result.println("order_by:", &state);
4141
for order_by in self.order_by.iter() {
42-
result.println(
43-
&format!(
44-
"{} {}",
45-
order_by.name(),
46-
if order_by.desc() { "desc" } else { "asc" }
47-
),
48-
&details_state,
49-
);
42+
result.println(&format!("{} {}", order_by.name(), if order_by.desc() { "desc" } else { "asc" }), &details_state);
5043
}
5144
}
5245
result.println("source:", &state);
5346
self.source.pretty_print(result, &details_state);
5447
}
55-
}
48+
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ mod join;
88
mod keys_subquery;
99
mod measure_subquery;
1010
mod multistage;
11-
mod pretty_print;
11+
pub mod pretty_print;
1212
mod query;
1313
mod regular_measures_query;
1414
mod resolve_multiplied_measures;
1515
mod schema;
1616
mod simple_query;
17+
mod pre_aggregation;
18+
pub mod optimizers;
1719

1820
pub use aggregate_multiplied_subquery::*;
1921
pub use cube::*;
@@ -31,3 +33,5 @@ pub use regular_measures_query::*;
3133
pub use resolve_multiplied_measures::*;
3234
pub use schema::*;
3335
pub use simple_query::*;
36+
pub use optimizers::*;
37+
pub use pre_aggregation::*;
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
use crate::logical_plan::*;
2+
use cubenativeutils::CubeError;
3+
use itertools::Itertools;
4+
use std::collections::HashSet;
5+
use std::rc::Rc;
6+
7+
pub struct CubeNamesCollector {
8+
cube_names: HashSet<String>,
9+
}
10+
11+
impl CubeNamesCollector {
12+
pub fn new() -> Self {
13+
Self {
14+
cube_names: HashSet::new(),
15+
}
16+
}
17+
18+
pub fn collect(&mut self, query: &Query) -> Result<(), CubeError> {
19+
match query {
20+
Query::SimpleQuery(query) => self.collect_from_simple_query(query),
21+
Query::FullKeyAggregateQuery(query) => {
22+
self.collect_from_full_key_aggregate_query(query)
23+
}
24+
}
25+
}
26+
27+
pub fn result(self) -> Vec<String> {
28+
self.cube_names.into_iter().collect_vec()
29+
}
30+
31+
fn collect_from_simple_query(&mut self, query: &SimpleQuery) -> Result<(), CubeError> {
32+
self.collect_from_simple_query_source(&query.source)?;
33+
self.collect_from_dimension_subqueries(&query.dimension_subqueries)?;
34+
Ok(())
35+
}
36+
37+
fn collect_from_full_key_aggregate_query(
38+
&mut self,
39+
query: &FullKeyAggregateQuery,
40+
) -> Result<(), CubeError> {
41+
self.collect_from_full_key_aggregate(&query.source)?;
42+
Ok(())
43+
}
44+
45+
fn collect_from_measure_subquery(
46+
&mut self,
47+
subquery: &Rc<MeasureSubquery>,
48+
) -> Result<(), CubeError> {
49+
self.collect_from_logical_join(&subquery.source)?;
50+
self.collect_from_dimension_subqueries(&subquery.dimension_subqueries)?;
51+
Ok(())
52+
}
53+
54+
fn collect_from_full_key_aggregate(
55+
&mut self,
56+
full_key_aggregate: &Rc<FullKeyAggregate>,
57+
) -> Result<(), CubeError> {
58+
for source in full_key_aggregate.sources.iter() {
59+
self.collect_from_full_key_aggregate_source(source)?;
60+
}
61+
Ok(())
62+
}
63+
fn collect_from_full_key_aggregate_source(
64+
&mut self,
65+
source: &FullKeyAggregateSource,
66+
) -> Result<(), CubeError> {
67+
match source {
68+
FullKeyAggregateSource::ResolveMultipliedMeasures(resolve_multiplied_measures) => {
69+
self.collect_from_multiplied_measures_resolver(resolve_multiplied_measures)
70+
}
71+
FullKeyAggregateSource::MultiStageSubqueryRef(multi_stage_subquery_ref) => Ok(()),
72+
}
73+
}
74+
75+
fn collect_from_multiplied_measures_resolver(
76+
&mut self,
77+
resolver: &ResolveMultipliedMeasures,
78+
) -> Result<(), CubeError> {
79+
for regular_subquery in resolver.regular_measure_subqueries.iter() {
80+
self.collect_from_simple_query(&regular_subquery)?;
81+
}
82+
for aggregate_multiplied_subquery in resolver.aggregate_multiplied_subqueries.iter() {
83+
self.collect_from_aggregate_multiplied_subquery(&aggregate_multiplied_subquery)?;
84+
}
85+
Ok(())
86+
}
87+
88+
fn collect_from_aggregate_multiplied_subquery(
89+
&mut self,
90+
subquery: &Rc<AggregateMultipliedSubquery>,
91+
) -> Result<(), CubeError> {
92+
self.collect_from_logical_join(&subquery.keys_subquery.source)?;
93+
match subquery.source.as_ref() {
94+
AggregateMultipliedSubquerySouce::Cube => {
95+
self.cube_names.insert(subquery.pk_cube.name().clone());
96+
}
97+
AggregateMultipliedSubquerySouce::MeasureSubquery(measure_subquery) => {
98+
self.collect_from_measure_subquery(&measure_subquery)?;
99+
}
100+
}
101+
Ok(())
102+
}
103+
104+
fn collect_from_simple_query_source(
105+
&mut self,
106+
source: &SimpleQuerySource,
107+
) -> Result<(), CubeError> {
108+
match source {
109+
SimpleQuerySource::LogicalJoin(join) => self.collect_from_logical_join(join),
110+
SimpleQuerySource::PreAggregation(_) => Ok(()),
111+
}
112+
}
113+
114+
fn collect_from_logical_join(&mut self, join: &Rc<LogicalJoin>) -> Result<(), CubeError> {
115+
self.cube_names.insert(join.root.name.clone());
116+
for join_item in join.joins.iter() {
117+
match join_item {
118+
LogicalJoinItem::CubeJoinItem(cube_join_item) => {
119+
self.cube_names.insert(cube_join_item.cube.name.clone());
120+
}
121+
}
122+
}
123+
Ok(())
124+
}
125+
126+
fn collect_from_dimension_subqueries(
127+
&mut self,
128+
dimension_subqueries: &Vec<Rc<DimensionSubQuery>>,
129+
) -> Result<(), CubeError> {
130+
for subquery in dimension_subqueries.iter() {
131+
self.collect(&subquery.query)?;
132+
}
133+
Ok(())
134+
}
135+
}

0 commit comments

Comments
 (0)