Skip to content

Commit cf2d8f2

Browse files
committed
in work
1 parent 31d7533 commit cf2d8f2

File tree

9 files changed

+279
-117
lines changed

9 files changed

+279
-117
lines changed

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ describe('PreAggregationsMultiStage', () => {
161161
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
162162
const sqlAndParams = query.buildSqlAndParams();
163163
console.log("!!!! sqlAndParamsl", sqlAndParams);
164-
/* expect(preAggregationsDescription[0].tableName).toEqual('rvis_rollupalias');
165-
expect(sqlAndParams[0]).toContain('rvis_rollupalias'); */
164+
expect(preAggregationsDescription[0].tableName).toEqual('rvis_rollupalias');
165+
expect(sqlAndParams[0]).toContain('rvis_rollupalias');
166166

167167
return dbRunner.evaluateQueryWithPreAggregations(query).then(res => {
168168
console.log("!!!! res", res);

packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations.test.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { PostgresQuery } from '../../../src/adapter/PostgresQuery';
33
import { BigqueryQuery } from '../../../src/adapter/BigqueryQuery';
44
import { prepareJsCompiler } from '../../unit/PrepareCompiler';
55
import { dbRunner } from './PostgresDBRunner';
6+
import {
7+
getEnv,
8+
} from '@cubejs-backend/shared';
69

710
describe('PreAggregations', () => {
811
jest.setTimeout(200000);
@@ -568,6 +571,9 @@ describe('PreAggregations', () => {
568571

569572

570573
it('simple pre-aggregation proxy time dimension', () => compiler.compile().then(() => {
574+
if (!getEnv('nativeSqlPlanner')) {
575+
return;
576+
}
571577
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
572578
measures: [
573579
'visitors.count'
Lines changed: 189 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,142 +1,256 @@
11
use super::CompiledPreAggregation;
22
use super::MatchState;
3-
use crate::plan::{Filter, FilterItem};
43
use crate::plan::filter::FilterGroupOperator;
4+
use crate::plan::{Filter, FilterItem};
5+
use crate::planner::filter::BaseFilter;
6+
use crate::planner::query_tools::QueryTools;
7+
use crate::planner::sql_evaluator::DimensionSymbol;
58
use crate::planner::sql_evaluator::MemberSymbol;
9+
use crate::planner::sql_evaluator::TimeDimensionSymbol;
10+
use crate::planner::GranularityHelper;
611
use cubenativeutils::CubeError;
712
use std::collections::HashMap;
8-
use crate::planner::filter::BaseFilter;
913
use std::rc::Rc;
1014

11-
pub struct DimensionMatcher {
15+
pub struct DimensionMatcher<'a> {
16+
query_tools: Rc<QueryTools>,
17+
pre_aggregation: &'a CompiledPreAggregation,
18+
pre_aggregation_dimensions: HashMap<String, bool>,
19+
pre_aggregation_time_dimensions: HashMap<String, (Option<String>, bool)>,
20+
result: MatchState,
1221
}
1322

14-
impl DimensionMatcher {
15-
pub fn new() -> Self {
16-
Self {}
23+
impl<'a> DimensionMatcher<'a> {
24+
pub fn new(query_tools: Rc<QueryTools>, pre_aggregation: &'a CompiledPreAggregation) -> Self {
25+
let pre_aggregation_dimensions = pre_aggregation
26+
.dimensions
27+
.iter()
28+
.map(|d| (d.full_name(), false))
29+
.collect();
30+
let pre_aggregation_time_dimensions = pre_aggregation
31+
.time_dimensions
32+
.iter()
33+
.map(|(dim, granularity)| (dim.full_name(), (granularity.clone(), false)))
34+
.collect::<HashMap<_, _>>();
35+
Self {
36+
query_tools,
37+
pre_aggregation,
38+
pre_aggregation_dimensions,
39+
pre_aggregation_time_dimensions,
40+
result: MatchState::Full
41+
}
1742
}
1843

1944
pub fn try_match(
20-
&self,
21-
symbols: &Vec<Rc<MemberSymbol>>,
45+
&mut self,
46+
dimensions: &Vec<Rc<MemberSymbol>>,
47+
time_dimensions: &Vec<Rc<MemberSymbol>>,
2248
filters: &Vec<FilterItem>,
49+
time_dimension_filters: &Vec<FilterItem>,
2350
segments: &Vec<FilterItem>,
24-
pre_aggregation: &CompiledPreAggregation,
25-
) -> Result<MatchState, CubeError> {
26-
let mut pre_aggregation_dimensions = pre_aggregation
27-
.dimensions
28-
.iter()
29-
.map(|d| (d.full_name(), false))
30-
.collect();
31-
let mut result = MatchState::Full;
32-
for symbol in symbols.iter() {
33-
let symbol_match = self.try_match_symbol(symbol, true, &mut pre_aggregation_dimensions)?;
34-
result = result.combine(&symbol_match);
35-
if result == MatchState::NotMatched {
36-
return Ok(result);
51+
) -> Result<(), CubeError> {
52+
for dimension in dimensions.iter() {
53+
let dimension_match = self.try_match_symbol(dimension, true)?;
54+
self.result = self.result.combine(&dimension_match);
55+
if self.result == MatchState::NotMatched {
56+
return Ok(());
57+
}
58+
}
59+
for time_dimension in time_dimensions.iter() {
60+
let time_dimension_match = self.try_match_symbol(time_dimension, true)?;
61+
self.result = self.result.combine(&time_dimension_match);
62+
if self.result == MatchState::NotMatched {
63+
return Ok(());
3764
}
3865
}
3966

4067
for filter in filters.iter() {
41-
let filter_match = self.try_match_filter_item(filter, true, &mut pre_aggregation_dimensions)?;
42-
result = result.combine(&filter_match);
43-
if result == MatchState::NotMatched {
44-
return Ok(result);
68+
let filter_match = self.try_match_filter_item(filter, true)?;
69+
self.result = self.result.combine(&filter_match);
70+
if self.result == MatchState::NotMatched {
71+
return Ok(());
72+
}
73+
}
74+
75+
for filter in time_dimension_filters.iter() {
76+
let filter_match = self.try_match_filter_item(filter, true)?;
77+
self.result = self.result.combine(&filter_match);
78+
if self.result == MatchState::NotMatched {
79+
return Ok(());
4580
}
4681
}
4782

4883
for segment in segments.iter() {
49-
let segment_match = self.try_match_filter_item(segment, true, &mut pre_aggregation_dimensions)?;
50-
result = result.combine(&segment_match);
51-
if result == MatchState::NotMatched {
52-
return Ok(result);
84+
let segment_match = self.try_match_filter_item(segment, true)?;
85+
self.result = self.result.combine(&segment_match);
86+
if self.result == MatchState::NotMatched {
87+
return Ok(());
5388
}
5489
}
90+
Ok(())
91+
}
5592

56-
let coverage_result = if pre_aggregation_dimensions.values().all(|v| *v) {
93+
pub fn result(mut self) -> MatchState {
94+
let dimension_coverage_result = if self.pre_aggregation_dimensions.values().all(|v| *v) {
5795
MatchState::Full
5896
} else {
5997
MatchState::Partial
6098
};
61-
62-
Ok(result.combine(&coverage_result))
99+
self.result = self.result.combine(&dimension_coverage_result);
100+
let time_dimension_coverage_result = if self.pre_aggregation_time_dimensions.values().all(|v| v.1) {
101+
MatchState::Full
102+
} else {
103+
MatchState::Partial
104+
};
105+
self.result = self.result.combine(&time_dimension_coverage_result);
106+
self.result
63107
}
64108

65109
fn try_match_symbol(
66-
&self,
110+
&mut self,
67111
symbol: &Rc<MemberSymbol>,
68112
add_to_matched_dimension: bool,
69-
pre_aggregation_dimensions: &mut HashMap<String, bool>,
70113
) -> Result<MatchState, CubeError> {
71-
let mut result = match symbol.as_ref() {
114+
match symbol.as_ref() {
72115
MemberSymbol::Dimension(dimension) => {
73-
if let Some(found) = pre_aggregation_dimensions.get_mut(&dimension.full_name()) {
74-
if add_to_matched_dimension {
75-
*found = true;
76-
}
77-
return Ok(MatchState::Full);
78-
} else if dimension.owned_by_cube() {
79-
return Ok(MatchState::NotMatched);
80-
}
81-
MatchState::Full
116+
self.try_match_dimension(dimension, add_to_matched_dimension)
82117
}
83-
MemberSymbol::MemberExpression(_member_expression) => MatchState::NotMatched, //TODO We not allow to use pre-aggregations with member expressions before sqlapi ready for it
84-
_ => return Ok(MatchState::NotMatched),
85-
};
86-
87-
if symbol.get_dependencies().is_empty() {
88-
return Ok(MatchState::NotMatched);
118+
MemberSymbol::TimeDimension(time_dimension) => {
119+
self.try_match_time_dimension(time_dimension, add_to_matched_dimension)
120+
}
121+
MemberSymbol::MemberExpression(_member_expression) => Ok(MatchState::NotMatched), //TODO We not allow to use pre-aggregations with member expressions before sqlapi ready for it
122+
_ => Ok(MatchState::NotMatched),
89123
}
124+
}
90125

91-
if !symbol.is_reference() {
92-
result = result.combine(&MatchState::Partial);
126+
fn try_match_dimension(
127+
&mut self,
128+
dimension: &DimensionSymbol,
129+
add_to_matched_dimension: bool,
130+
) -> Result<MatchState, CubeError> {
131+
if let Some(found) = self
132+
.pre_aggregation_dimensions
133+
.get_mut(&dimension.full_name())
134+
{
135+
if add_to_matched_dimension {
136+
*found = true;
137+
}
138+
Ok(MatchState::Full)
139+
} else if dimension.owned_by_cube() {
140+
Ok(MatchState::NotMatched)
141+
} else {
142+
let dependencies = dimension.get_dependencies();
143+
if dependencies.is_empty() {
144+
Ok(MatchState::NotMatched)
145+
} else {
146+
let mut result = if dimension.is_reference() {
147+
MatchState::Full
148+
} else {
149+
MatchState::Partial
150+
};
151+
for dep in dimension.get_dependencies() {
152+
let dep_match = self.try_match_symbol(&dep, add_to_matched_dimension)?;
153+
if dep_match == MatchState::NotMatched {
154+
return Ok(MatchState::NotMatched);
155+
}
156+
result = result.combine(&dep_match);
157+
}
158+
Ok(result)
159+
}
93160
}
161+
}
94162

95-
for dep in symbol.get_dependencies() {
96-
let dep_match = self.try_match_symbol(&dep, add_to_matched_dimension, pre_aggregation_dimensions)?;
97-
if dep_match == MatchState::NotMatched {
98-
return Ok(MatchState::NotMatched);
163+
fn try_match_time_dimension(
164+
&mut self,
165+
time_dimension: &TimeDimensionSymbol,
166+
add_to_matched_dimension: bool,
167+
) -> Result<MatchState, CubeError> {
168+
let granularity = if self.pre_aggregation.allow_non_strict_date_range_match {
169+
time_dimension.granularity().clone()
170+
} else {
171+
time_dimension.rollup_granularity(self.query_tools.clone())?
172+
};
173+
let base_symbol_name = time_dimension.base_symbol().full_name();
174+
if let Some(found) = self
175+
.pre_aggregation_time_dimensions
176+
.get_mut(&base_symbol_name)
177+
{
178+
if add_to_matched_dimension {
179+
found.1 = true;
180+
}
181+
let pre_aggr_granularity = &found.0;
182+
if granularity.is_none() || pre_aggr_granularity == &granularity {
183+
Ok(MatchState::Full)
184+
} else if pre_aggr_granularity.is_none()
185+
|| GranularityHelper::is_predefined_granularity(
186+
pre_aggr_granularity.as_ref().unwrap(),
187+
)
188+
{
189+
let min_granularity =
190+
GranularityHelper::min_granularity(&granularity, &pre_aggr_granularity)?;
191+
if &min_granularity == pre_aggr_granularity {
192+
Ok(MatchState::Partial)
193+
} else {
194+
Ok(MatchState::NotMatched)
195+
}
196+
} else {
197+
Ok(MatchState::NotMatched) //TODO Custom granularities!!!
198+
}
199+
} else {
200+
if time_dimension.owned_by_cube() {
201+
Ok(MatchState::NotMatched)
202+
} else {
203+
let mut result = if time_dimension.is_reference() {
204+
MatchState::Full
205+
} else {
206+
MatchState::Partial
207+
};
208+
for dep in time_dimension.get_dependencies_as_time_dimensions() {
209+
let dep_match = self.try_match_symbol(&dep, add_to_matched_dimension)?;
210+
if dep_match == MatchState::NotMatched {
211+
return Ok(MatchState::NotMatched);
212+
}
213+
result = result.combine(&dep_match);
214+
}
215+
Ok(result)
99216
}
100-
result = result.combine(&dep_match);
101217
}
102-
Ok(result)
103218
}
104219

105220
fn try_match_filter_item(
106-
&self,
221+
&mut self,
107222
filter_item: &FilterItem,
108223
add_to_matched_dimension: bool,
109-
pre_aggregation_dimensions: &mut HashMap<String, bool>,
110224
) -> Result<MatchState, CubeError> {
111225
match filter_item {
112-
FilterItem::Item(filter) => {
113-
self.try_match_filter(filter, add_to_matched_dimension, pre_aggregation_dimensions)
114-
}
226+
FilterItem::Item(filter) => self.try_match_filter(filter, add_to_matched_dimension),
115227
FilterItem::Group(group) => {
116-
let add_to_matched_dimension = add_to_matched_dimension && group.operator == FilterGroupOperator::And;
228+
let add_to_matched_dimension =
229+
add_to_matched_dimension && group.operator == FilterGroupOperator::And;
117230
let mut result = MatchState::Full;
118231
for item in group.items.iter() {
119-
result = result.combine(&self.try_match_filter_item(item, add_to_matched_dimension, pre_aggregation_dimensions)?);
232+
result = result
233+
.combine(&self.try_match_filter_item(item, add_to_matched_dimension)?);
120234
}
121235
Ok(result)
122-
},
236+
}
123237
FilterItem::Segment(segment) => {
124-
self.try_match_symbol(&segment.member_evaluator(), add_to_matched_dimension, pre_aggregation_dimensions)
125-
},
238+
self.try_match_symbol(&segment.member_evaluator(), add_to_matched_dimension)
239+
}
126240
}
127241
}
128242

129243
fn try_match_filter(
130-
&self,
244+
&mut self,
131245
filter: &Rc<BaseFilter>,
132246
add_to_matched_dimension: bool,
133-
pre_aggregation_dimensions: &mut HashMap<String, bool>,
134247
) -> Result<MatchState, CubeError> {
135-
let symbol = filter.member_evaluator().clone();
248+
let symbol = if let Some(time_dimension) = filter.time_dimension_symbol() {
249+
time_dimension
250+
} else {
251+
filter.member_evaluator().clone()
252+
};
136253
let add_to_matched_dimension = add_to_matched_dimension && filter.is_single_value_equal();
137-
self.try_match_symbol(&symbol, add_to_matched_dimension, pre_aggregation_dimensions)
254+
self.try_match_symbol(&symbol, add_to_matched_dimension)
138255
}
139-
140-
141-
142256
}

0 commit comments

Comments
 (0)