1
1
use super :: PreAggregationsCompiler ;
2
2
use super :: * ;
3
+ use crate :: logical_plan:: visitor:: { LogicalPlanRewriter , NodeRewriteResult } ;
3
4
use crate :: logical_plan:: * ;
4
5
use crate :: plan:: FilterItem ;
5
6
use crate :: planner:: query_tools:: QueryTools ;
@@ -24,9 +25,7 @@ impl PreAggregationOptimizer {
24
25
}
25
26
26
27
pub fn try_optimize ( & mut self , plan : Rc < Query > ) -> Result < Option < Rc < Query > > , CubeError > {
27
- let mut cube_names_collector = CubeNamesCollector :: new ( ) ;
28
- cube_names_collector. collect ( & plan) ?;
29
- let cube_names = cube_names_collector. result ( ) ;
28
+ let cube_names = collect_cube_names_from_node ( & plan) ?;
30
29
let mut compiler = PreAggregationsCompiler :: try_new ( self . query_tools . clone ( ) , & cube_names) ?;
31
30
32
31
let compiled_pre_aggregations = compiler. compile_all_pre_aggregations ( ) ?;
@@ -50,66 +49,43 @@ impl PreAggregationOptimizer {
50
49
query : Rc < Query > ,
51
50
pre_aggregation : & Rc < CompiledPreAggregation > ,
52
51
) -> Result < Option < Rc < Query > > , CubeError > {
53
- match query. as_ref ( ) {
54
- Query :: SimpleQuery ( query) => self . try_rewrite_simple_query ( query, pre_aggregation) ,
55
- Query :: FullKeyAggregateQuery ( query) => {
56
- self . try_rewrite_full_key_aggregate_query ( query, pre_aggregation)
57
- }
52
+ if query. multistage_members . is_empty ( ) {
53
+ self . try_rewrite_simple_query ( & query, pre_aggregation)
54
+ } else if !self . allow_multi_stage {
55
+ Ok ( None )
56
+ } else {
57
+ self . try_rewrite_query_with_multistages ( & query, pre_aggregation)
58
58
}
59
59
}
60
60
61
61
fn try_rewrite_simple_query (
62
62
& mut self ,
63
- query : & SimpleQuery ,
63
+ query : & Query ,
64
64
pre_aggregation : & Rc < CompiledPreAggregation > ,
65
65
) -> Result < Option < Rc < Query > > , CubeError > {
66
66
if self . is_schema_and_filters_match ( & query. schema , & query. filter , pre_aggregation) ? {
67
- let mut new_query = SimpleQuery :: clone ( & query) ;
68
- new_query. source = SimpleQuerySource :: PreAggregation (
69
- self . make_pre_aggregation_source ( pre_aggregation) ?,
70
- ) ;
71
- Ok ( Some ( Rc :: new ( Query :: SimpleQuery ( new_query) ) ) )
67
+ let mut new_query = query. clone ( ) ;
68
+ new_query. source =
69
+ QuerySource :: PreAggregation ( self . make_pre_aggregation_source ( pre_aggregation) ?) ;
70
+ Ok ( Some ( Rc :: new ( new_query) ) )
72
71
} else {
73
72
Ok ( None )
74
73
}
75
74
}
76
75
77
- fn try_rewrite_full_key_aggregate_query (
76
+ fn try_rewrite_query_with_multistages (
78
77
& mut self ,
79
- query : & FullKeyAggregateQuery ,
78
+ query : & Query ,
80
79
pre_aggregation : & Rc < CompiledPreAggregation > ,
81
80
) -> Result < Option < Rc < Query > > , CubeError > {
82
- if !self . allow_multi_stage && !query. multistage_members . is_empty ( ) {
83
- return Ok ( None ) ;
84
- }
85
- if self . allow_multi_stage && !query. multistage_members . is_empty ( ) {
86
- return self
87
- . try_rewrite_full_key_aggregate_query_with_multi_stages ( query, pre_aggregation) ;
88
- }
89
-
90
- if self . is_schema_and_filters_match ( & query. schema , & query. filter , pre_aggregation) ? {
91
- let source = SimpleQuerySource :: PreAggregation (
92
- self . make_pre_aggregation_source ( pre_aggregation) ?,
93
- ) ;
94
- let new_query = SimpleQuery {
95
- schema : query. schema . clone ( ) ,
96
- dimension_subqueries : vec ! [ ] ,
97
- filter : query. filter . clone ( ) ,
98
- modifers : query. modifers . clone ( ) ,
99
- source,
100
- } ;
101
- Ok ( Some ( Rc :: new ( Query :: SimpleQuery ( new_query) ) ) )
102
- } else {
103
- Ok ( None )
81
+ let rewriter = LogicalPlanRewriter :: new ( ) ;
82
+ for multi_stage in & query. multistage_members {
83
+ let rewritten = rewriter. rewrite_top_down ( multi_stage. clone ( ) , & mut |& plan_node| {
84
+ Ok ( NodeRewriteResult :: stop ( ) )
85
+ } ) ?;
104
86
}
105
- }
106
87
107
- fn try_rewrite_full_key_aggregate_query_with_multi_stages (
108
- & mut self ,
109
- query : & FullKeyAggregateQuery ,
110
- pre_aggregation : & Rc < CompiledPreAggregation > ,
111
- ) -> Result < Option < Rc < Query > > , CubeError > {
112
- let used_multi_stage_symbols = self . collect_multi_stage_symbols ( & query. source ) ;
88
+ /* let used_multi_stage_symbols = self.collect_multi_stage_symbols(&query.source);
113
89
let mut multi_stages_queries = query.multistage_members.clone();
114
90
let mut rewrited_multistage = multi_stages_queries
115
91
.iter()
@@ -187,11 +163,12 @@ impl PreAggregationOptimizer {
187
163
order_by: query.modifers.order_by.clone(),
188
164
}),
189
165
source,
190
- } ;
191
- Ok ( Some ( Rc :: new ( Query :: FullKeyAggregateQuery ( result) ) ) )
166
+ }; */
167
+ //Ok(Some(Rc::new(Query::FullKeyAggregateQuery(result))))
168
+ Ok ( None )
192
169
}
193
170
194
- fn try_rewrite_multistage (
171
+ /* fn try_rewrite_multistage(
195
172
&mut self,
196
173
multi_stage_name: &String,
197
174
multi_stage_queries: &mut Vec<Rc<LogicalMultiStageMember>>,
@@ -382,17 +359,12 @@ impl PreAggregationOptimizer {
382
359
}
383
360
}
384
361
symbols
385
- }
362
+ } */
386
363
387
364
fn make_pre_aggregation_source (
388
365
& mut self ,
389
366
pre_aggregation : & Rc < CompiledPreAggregation > ,
390
367
) -> Result < Rc < PreAggregation > , CubeError > {
391
- /* let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name(
392
- pre_aggregation.cube_name.clone(),
393
- pre_aggregation.name.clone(),
394
- )?; */
395
- //if let Some(table_name) = &pre_aggregation_obj.static_data().table_name {
396
368
let schema = LogicalSchema {
397
369
time_dimensions : vec ! [ ] ,
398
370
dimensions : pre_aggregation
0 commit comments