Skip to content

Commit 7f69fbd

Browse files
committed
in work
1 parent d090e90 commit 7f69fbd

File tree

18 files changed

+163
-121
lines changed

18 files changed

+163
-121
lines changed

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use super::pretty_print::*;
22
use super::*;
33
use cubenativeutils::CubeError;
4-
use itertools::Itertools;
54
use std::rc::Rc;
65

76
pub enum AggregateMultipliedSubquerySouce {
@@ -52,7 +51,7 @@ impl LogicalNode for AggregateMultipliedSubquery {
5251
keys_subquery: keys_subquery.clone().into_logical_node()?,
5352
source: self.source.with_plan_node(source.clone())?,
5453
dimension_subqueries: dimension_subqueries
55-
.into_iter()
54+
.iter()
5655
.map(|itm| itm.clone().into_logical_node())
5756
.collect::<Result<Vec<_>, _>>()?,
5857
};
@@ -79,7 +78,12 @@ impl AggregateMultipliedSubqueryInputPacker {
7978
let mut result = vec![];
8079
result.push(aggregate.keys_subquery.as_plan_node());
8180
result.push(aggregate.source.as_plan_node());
82-
result.extend(aggregate.dimension_subqueries.iter().map(|itm| itm.as_plan_node()));
81+
result.extend(
82+
aggregate
83+
.dimension_subqueries
84+
.iter()
85+
.map(|itm| itm.as_plan_node()),
86+
);
8387
result
8488
}
8589
}
@@ -91,20 +95,23 @@ pub struct AggregateMultipliedSubqueryInputUnPacker<'a> {
9195
}
9296

9397
impl<'a> AggregateMultipliedSubqueryInputUnPacker<'a> {
94-
pub fn new(aggregate: &AggregateMultipliedSubquery, inputs: &'a Vec<PlanNode>) -> Result<Self, CubeError> {
98+
pub fn new(
99+
aggregate: &AggregateMultipliedSubquery,
100+
inputs: &'a Vec<PlanNode>,
101+
) -> Result<Self, CubeError> {
95102
check_inputs_len(&inputs, Self::inputs_len(aggregate), aggregate.node_name())?;
96-
103+
97104
let keys_subquery = &inputs[0];
98105
let source = &inputs[1];
99106
let dimension_subqueries = &inputs[2..];
100-
107+
101108
Ok(Self {
102109
keys_subquery,
103110
source,
104111
dimension_subqueries,
105112
})
106113
}
107-
114+
108115
fn inputs_len(aggregate: &AggregateMultipliedSubquery) -> usize {
109116
2 + aggregate.dimension_subqueries.len()
110117
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ impl PrettyPrint for ResolvedMultipliedMeasures {
5050
}
5151
}
5252

53+
#[derive(Clone)]
5354
pub struct FullKeyAggregate {
5455
pub schema: Rc<LogicalSchema>,
5556
pub use_full_join_and_coalesce: bool,
@@ -80,15 +81,17 @@ impl LogicalNode for FullKeyAggregate {
8081
} else {
8182
check_inputs_len(&inputs, 1, self.node_name())?;
8283
let input_source = &inputs[0];
83-
84+
8485
Some(match self.multiplied_measures_resolver.as_ref().unwrap() {
8586
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(_) => {
8687
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(
8788
input_source.clone().into_logical_node()?,
8889
)
8990
}
9091
ResolvedMultipliedMeasures::PreAggregation(_) => {
91-
ResolvedMultipliedMeasures::PreAggregation(input_source.clone().into_logical_node()?)
92+
ResolvedMultipliedMeasures::PreAggregation(
93+
input_source.clone().into_logical_node()?,
94+
)
9295
}
9396
})
9497
};

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use super::pretty_print::*;
22
use super::*;
33
use crate::planner::sql_evaluator::SqlCall;
44
use cubenativeutils::CubeError;
5-
use itertools::Itertools;
65
use std::rc::Rc;
76

87
#[derive(Clone)]
@@ -45,7 +44,7 @@ impl LogicalNode for LogicalJoin {
4544
let joins = self
4645
.joins
4746
.iter()
48-
.zip(joins.into_iter())
47+
.zip(joins.iter())
4948
.map(|(self_item, item)| -> Result<_, CubeError> {
5049
Ok(LogicalJoinItem {
5150
cube: item.clone().into_logical_node()?,
@@ -58,7 +57,7 @@ impl LogicalNode for LogicalJoin {
5857
root: root.clone().into_logical_node()?,
5958
joins,
6059
dimension_subqueries: dimension_subqueries
61-
.into_iter()
60+
.iter()
6261
.map(|itm| itm.clone().into_logical_node())
6362
.collect::<Result<Vec<_>, _>>()?,
6463
};
@@ -85,7 +84,11 @@ impl LogicalJoinInputPacker {
8584
let mut result = vec![];
8685
result.push(join.root.as_plan_node());
8786
result.extend(join.joins.iter().map(|item| item.cube.as_plan_node()));
88-
result.extend(join.dimension_subqueries.iter().map(|item| item.as_plan_node()));
87+
result.extend(
88+
join.dimension_subqueries
89+
.iter()
90+
.map(|item| item.as_plan_node()),
91+
);
8992
result
9093
}
9194
}
@@ -99,20 +102,20 @@ pub struct LogicalJoinInputUnPacker<'a> {
99102
impl<'a> LogicalJoinInputUnPacker<'a> {
100103
pub fn new(join: &LogicalJoin, inputs: &'a Vec<PlanNode>) -> Result<Self, CubeError> {
101104
check_inputs_len(&inputs, Self::inputs_len(join), join.node_name())?;
102-
105+
103106
let root = &inputs[0];
104107
let joins_start = 1;
105108
let joins_end = joins_start + join.joins.len();
106109
let joins = &inputs[joins_start..joins_end];
107110
let dimension_subqueries = &inputs[joins_end..];
108-
111+
109112
Ok(Self {
110113
root,
111114
joins,
112115
dimension_subqueries,
113116
})
114117
}
115-
118+
116119
fn inputs_len(join: &LogicalJoin) -> usize {
117120
1 + join.joins.len() + join.dimension_subqueries.len()
118121
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ impl LogicalNode for KeysSubQuery {
4848
}
4949
}
5050

51-
5251
impl PrettyPrint for KeysSubQuery {
5352
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
5453
result.println("KeysSubQuery: ", state);

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_query_modifers.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,3 @@ impl PrettyPrint for LogicalQueryModifiers {
3333
}
3434
}
3535
}
36-

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/common.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::logical_plan::pretty_print::*;
22
use crate::planner::planners::multi_stage::MultiStageAppliedState;
33

4-
54
impl PrettyPrint for MultiStageAppliedState {
65
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
76
let details_state = state.new_level();

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@ mod dimension_matcher;
33
mod measure_matcher;
44
mod optimizer;
55
mod original_sql_collector;
6-
//mod original_sql_optimizer;
76
mod pre_aggregations_compiler;
87

98
pub use compiled_pre_aggregation::*;
109
use dimension_matcher::*;
1110
use measure_matcher::*;
1211
pub use optimizer::*;
1312
pub use original_sql_collector::*;
14-
//pub use original_sql_optimizer::*;
1513
pub use pre_aggregations_compiler::*;

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs

Lines changed: 80 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ impl PreAggregationOptimizer {
6060

6161
fn try_rewrite_simple_query(
6262
&mut self,
63-
query: &Query,
63+
query: &Rc<Query>,
6464
pre_aggregation: &Rc<CompiledPreAggregation>,
6565
) -> Result<Option<Rc<Query>>, CubeError> {
6666
if self.is_schema_and_filters_match(&query.schema, &query.filter, pre_aggregation)? {
67-
let mut new_query = query.clone();
67+
let mut new_query = query.as_ref().clone();
6868
new_query.source =
6969
QuerySource::PreAggregation(self.make_pre_aggregation_source(pre_aggregation)?);
7070
Ok(Some(Rc::new(new_query)))
@@ -75,97 +75,106 @@ impl PreAggregationOptimizer {
7575

7676
fn try_rewrite_query_with_multistages(
7777
&mut self,
78-
query: &Query,
78+
query: &Rc<Query>,
7979
pre_aggregation: &Rc<CompiledPreAggregation>,
8080
) -> Result<Option<Rc<Query>>, CubeError> {
8181
let rewriter = LogicalPlanRewriter::new();
82+
let mut has_unrewritten_leaf = false;
83+
84+
let mut rewritten_multistages = Vec::new();
8285
for multi_stage in &query.multistage_members {
8386
let rewritten = rewriter.rewrite_top_down_with(multi_stage.clone(), |plan_node| {
84-
Ok(NodeRewriteResult::stop())
87+
let res = match plan_node {
88+
PlanNode::MultiStageLeafMeasure(multi_stage_leaf_measure) => {
89+
if let Some(rewritten) = self.try_rewrite_query(
90+
multi_stage_leaf_measure.query.clone(),
91+
pre_aggregation,
92+
)? {
93+
let new_leaf = Rc::new(MultiStageLeafMeasure {
94+
measure: multi_stage_leaf_measure.measure.clone(),
95+
render_measure_as_state: multi_stage_leaf_measure
96+
.render_measure_as_state
97+
.clone(),
98+
render_measure_for_ungrouped: multi_stage_leaf_measure
99+
.render_measure_for_ungrouped
100+
.clone(),
101+
time_shifts: multi_stage_leaf_measure.time_shifts.clone(),
102+
query: rewritten,
103+
});
104+
NodeRewriteResult::rewritten(new_leaf.as_plan_node())
105+
} else {
106+
has_unrewritten_leaf = true;
107+
NodeRewriteResult::stop()
108+
}
109+
}
110+
PlanNode::LogicalMultiStageMember(_) => NodeRewriteResult::pass(),
111+
_ => NodeRewriteResult::stop(),
112+
};
113+
Ok(res)
85114
})?;
115+
rewritten_multistages.push(rewritten);
86116
}
87117

88-
/* let used_multi_stage_symbols = self.collect_multi_stage_symbols(&query.source);
89-
let mut multi_stages_queries = query.multistage_members.clone();
90-
let mut rewrited_multistage = multi_stages_queries
91-
.iter()
92-
.map(|query| (query.name.clone(), false))
93-
.collect::<HashMap<_, _>>();
94-
95-
for (_, multi_stage_name) in used_multi_stage_symbols.iter() {
96-
self.try_rewrite_multistage(
97-
multi_stage_name,
98-
&mut multi_stages_queries,
99-
&mut rewrited_multistage,
100-
pre_aggregation,
101-
)?;
102-
}
103-
let all_multi_stage_rewrited = rewrited_multistage.values().all(|v| *v);
104-
if !all_multi_stage_rewrited {
118+
if has_unrewritten_leaf {
105119
return Ok(None);
106120
}
107121

108-
let source = if let Some(resolver_multiplied_measures) =
109-
&query.source.multiplied_measures_resolver
110-
{
111-
if let ResolvedMultipliedMeasures::ResolveMultipliedMeasures(
112-
resolver_multiplied_measures,
113-
) = resolver_multiplied_measures
122+
let source = if let QuerySource::FullKeyAggregate(full_key_aggregate) = &query.source {
123+
let fk_source = if let Some(resolver_multiplied_measures) =
124+
&full_key_aggregate.multiplied_measures_resolver
114125
{
115-
if self.is_schema_and_filters_match(
116-
&resolver_multiplied_measures.schema,
117-
&resolver_multiplied_measures.filter,
118-
&pre_aggregation,
119-
)? {
120-
let pre_aggregation_source =
121-
self.make_pre_aggregation_source(pre_aggregation)?;
122-
123-
let pre_aggregation_query = SimpleQuery {
124-
schema: resolver_multiplied_measures.schema.clone(),
125-
dimension_subqueries: vec![],
126-
filter: resolver_multiplied_measures.filter.clone(),
127-
modifers: Rc::new(LogicalQueryModifiers {
128-
offset: None,
129-
limit: None,
130-
ungrouped: false,
131-
order_by: vec![],
132-
}),
133-
source: SimpleQuerySource::PreAggregation(pre_aggregation_source),
134-
};
135-
Rc::new(FullKeyAggregate {
136-
join_dimensions: query.source.join_dimensions.clone(),
137-
use_full_join_and_coalesce: query.source.use_full_join_and_coalesce,
138-
multiplied_measures_resolver: Some(
139-
ResolvedMultipliedMeasures::PreAggregation(Rc::new(
140-
pre_aggregation_query,
141-
)),
142-
),
143-
multi_stage_subquery_refs: query.source.multi_stage_subquery_refs.clone(),
144-
})
126+
if let ResolvedMultipliedMeasures::ResolveMultipliedMeasures(
127+
resolver_multiplied_measures,
128+
) = resolver_multiplied_measures
129+
{
130+
if self.is_schema_and_filters_match(
131+
&resolver_multiplied_measures.schema,
132+
&resolver_multiplied_measures.filter,
133+
&pre_aggregation,
134+
)? {
135+
let pre_aggregation_source =
136+
self.make_pre_aggregation_source(pre_aggregation)?;
137+
138+
let pre_aggregation_query = Query {
139+
schema: resolver_multiplied_measures.schema.clone(),
140+
filter: resolver_multiplied_measures.filter.clone(),
141+
modifers: Rc::new(LogicalQueryModifiers {
142+
offset: None,
143+
limit: None,
144+
ungrouped: false,
145+
order_by: vec![],
146+
}),
147+
source: QuerySource::PreAggregation(pre_aggregation_source),
148+
multistage_members: vec![],
149+
};
150+
Some(ResolvedMultipliedMeasures::PreAggregation(Rc::new(
151+
pre_aggregation_query,
152+
)))
153+
} else {
154+
return Ok(None);
155+
}
145156
} else {
146-
return Ok(None);
157+
Some(resolver_multiplied_measures.clone())
147158
}
148159
} else {
149-
query.source.clone()
150-
}
160+
None
161+
};
162+
let mut result = full_key_aggregate.as_ref().clone();
163+
result.multiplied_measures_resolver = fk_source;
164+
QuerySource::FullKeyAggregate(Rc::new(result))
151165
} else {
152166
query.source.clone()
153167
};
154168

155-
let result = FullKeyAggregateQuery {
156-
multistage_members: multi_stages_queries,
169+
let result = Query {
170+
multistage_members: rewritten_multistages,
157171
schema: query.schema.clone(),
158172
filter: query.filter.clone(),
159-
modifers: Rc::new(LogicalQueryModifiers {
160-
offset: query.modifers.offset,
161-
limit: query.modifers.limit,
162-
ungrouped: query.modifers.ungrouped,
163-
order_by: query.modifers.order_by.clone(),
164-
}),
173+
modifers: query.modifers.clone(),
165174
source,
166-
}; */
167-
//Ok(Some(Rc::new(Query::FullKeyAggregateQuery(result))))
168-
Ok(None)
175+
};
176+
177+
Ok(Some(Rc::new(result)))
169178
}
170179

171180
/* fn try_rewrite_multistage(

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use super::*;
22
use cubenativeutils::CubeError;
3-
use itertools::Itertools;
43
use std::rc::Rc;
54

65
#[derive(Clone)]
@@ -64,7 +63,7 @@ impl LogicalNode for Query {
6463

6564
Ok(Rc::new(Self {
6665
multistage_members: multistage_members
67-
.into_iter()
66+
.iter()
6867
.map(|member| member.clone().into_logical_node())
6968
.collect::<Result<Vec<_>, _>>()?,
7069
schema: self.schema.clone(),

0 commit comments

Comments
 (0)