Skip to content

Commit 040deef

Browse files
committed
in work
1 parent 1782c39 commit 040deef

18 files changed

+587
-141
lines changed

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,106 @@
11
use super::pretty_print::*;
22
use super::*;
3+
use cubenativeutils::CubeError;
4+
use itertools::Itertools;
35
use std::rc::Rc;
46

57
pub enum AggregateMultipliedSubquerySouce {
6-
Cube,
8+
Cube(Rc<Cube>),
79
MeasureSubquery(Rc<MeasureSubquery>),
810
}
911

12+
impl AggregateMultipliedSubquerySouce {
13+
fn as_plan_node(&self) -> PlanNode {
14+
match self {
15+
Self::Cube(item) => item.as_plan_node(),
16+
Self::MeasureSubquery(item) => item.as_plan_node(),
17+
}
18+
}
19+
fn with_plan_node(&self, plan_node: PlanNode) -> Result<Self, CubeError> {
20+
Ok(match self {
21+
Self::Cube(_) => Self::Cube(plan_node.into_logical_node()?),
22+
Self::MeasureSubquery(_) => Self::MeasureSubquery(plan_node.into_logical_node()?),
23+
})
24+
}
25+
}
26+
1027
pub struct AggregateMultipliedSubquery {
1128
pub schema: Rc<LogicalSchema>,
1229
pub keys_subquery: Rc<KeysSubQuery>,
13-
pub pk_cube: Rc<Cube>, //FIXME may be duplication with information in keys_subquery
14-
pub source: Rc<AggregateMultipliedSubquerySouce>,
30+
pub source: AggregateMultipliedSubquerySouce,
1531
pub dimension_subqueries: Vec<Rc<DimensionSubQuery>>,
1632
}
1733

34+
impl LogicalNode for AggregateMultipliedSubquery {
35+
type InputsType = AggregateMultipliedSubqueryInput;
36+
37+
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
38+
PlanNode::AggregateMultipliedSubquery(self.clone())
39+
}
40+
41+
fn inputs(&self) -> Self::InputsType {
42+
let keys_subquery = self.keys_subquery.as_plan_node();
43+
let source = self.source.as_plan_node();
44+
let dimension_subqueries = self
45+
.dimension_subqueries
46+
.iter()
47+
.map(|itm| itm.as_plan_node())
48+
.collect_vec();
49+
AggregateMultipliedSubqueryInput {
50+
keys_subquery,
51+
source,
52+
dimension_subqueries,
53+
}
54+
}
55+
56+
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
57+
let AggregateMultipliedSubqueryInput {
58+
keys_subquery,
59+
source,
60+
dimension_subqueries,
61+
} = inputs;
62+
63+
let result = Self {
64+
schema: self.schema.clone(),
65+
keys_subquery: keys_subquery.into_logical_node()?,
66+
source: self.source.with_plan_node(source)?,
67+
dimension_subqueries: dimension_subqueries
68+
.into_iter()
69+
.map(|itm| itm.into_logical_node())
70+
.collect::<Result<Vec<_>, _>>()?,
71+
};
72+
73+
Ok(Rc::new(result))
74+
}
75+
76+
fn node_name() -> &'static str {
77+
"AggregateMultipliedSubquery"
78+
}
79+
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
80+
if let PlanNode::AggregateMultipliedSubquery(item) = plan_node {
81+
Ok(item)
82+
} else {
83+
Err(cast_error::<Self>(&plan_node))
84+
}
85+
}
86+
}
87+
88+
pub struct AggregateMultipliedSubqueryInput {
89+
pub keys_subquery: PlanNode,
90+
pub source: PlanNode,
91+
pub dimension_subqueries: Vec<PlanNode>,
92+
}
93+
94+
impl NodeInputs for AggregateMultipliedSubqueryInput {
95+
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_> {
96+
Box::new(
97+
std::iter::once(&self.keys_subquery)
98+
.chain(std::iter::once(&self.source))
99+
.chain(self.dimension_subqueries.iter()),
100+
)
101+
}
102+
}
103+
18104
impl PrettyPrint for AggregateMultipliedSubquery {
19105
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
20106
result.println("AggregateMultipliedSubquery: ", state);
@@ -25,11 +111,10 @@ impl PrettyPrint for AggregateMultipliedSubquery {
25111
result.println("keys_subquery:", &state);
26112
self.keys_subquery.pretty_print(result, &details_state);
27113
result.println("source:", &state);
28-
match self.source.as_ref() {
29-
AggregateMultipliedSubquerySouce::Cube => {
114+
match &self.source {
115+
AggregateMultipliedSubquerySouce::Cube(cube) => {
30116
result.println("Cube:", &details_state);
31-
self.pk_cube
32-
.pretty_print(result, &details_state.new_level());
117+
cube.pretty_print(result, &details_state.new_level());
33118
}
34119
AggregateMultipliedSubquerySouce::MeasureSubquery(measure_subquery) => {
35120
result.println(&format!("MeasureSubquery: "), &details_state);

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::*;
22
use crate::planner::BaseCube;
3+
use cubenativeutils::CubeError;
34
use std::rc::Rc;
45

56
#[derive(Clone)]
@@ -49,3 +50,30 @@ impl Cube {
4950
})
5051
}
5152
}
53+
54+
impl LogicalNode for Cube {
55+
type InputsType = EmptyNodeInput;
56+
57+
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
58+
PlanNode::Cube(self.clone())
59+
}
60+
61+
fn inputs(&self) -> Self::InputsType {
62+
EmptyNodeInput::new()
63+
}
64+
65+
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
66+
Ok(self)
67+
}
68+
69+
fn node_name() -> &'static str {
70+
"Cube"
71+
}
72+
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
73+
if let PlanNode::Cube(item) = plan_node {
74+
Ok(item)
75+
} else {
76+
Err(cast_error::<Self>(&plan_node))
77+
}
78+
}
79+
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/dimension_subquery.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::pretty_print::*;
22
use super::*;
33
use crate::planner::sql_evaluator::MemberSymbol;
4+
use cubenativeutils::CubeError;
45
use std::rc::Rc;
56

67
pub struct DimensionSubQuery {
@@ -10,6 +11,39 @@ pub struct DimensionSubQuery {
1011
pub measure_for_subquery_dimension: Rc<MemberSymbol>,
1112
}
1213

14+
impl LogicalNode for DimensionSubQuery {
15+
type InputsType = SingleNodeInput;
16+
17+
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
18+
PlanNode::DimensionSubQuery(self.clone())
19+
}
20+
21+
fn inputs(&self) -> Self::InputsType {
22+
SingleNodeInput::new(self.query.as_plan_node())
23+
}
24+
25+
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
26+
let query = inputs.unpack();
27+
Ok(Rc::new(Self {
28+
query: query.into_logical_node()?,
29+
primary_keys_dimensions: self.primary_keys_dimensions.clone(),
30+
subquery_dimension: self.subquery_dimension.clone(),
31+
measure_for_subquery_dimension: self.measure_for_subquery_dimension.clone(),
32+
}))
33+
}
34+
35+
fn node_name() -> &'static str {
36+
"DimensionSubQuery"
37+
}
38+
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
39+
if let PlanNode::DimensionSubQuery(query) = plan_node {
40+
Ok(query)
41+
} else {
42+
Err(cast_error::<Self>(&plan_node))
43+
}
44+
}
45+
}
46+
1347
impl PrettyPrint for DimensionSubQuery {
1448
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
1549
result.println("DimensionSubQuery: ", state);

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::*;
22
use crate::planner::sql_evaluator::MemberSymbol;
3+
use cubenativeutils::CubeError;
34
use std::rc::Rc;
45

56
pub struct MultiStageSubqueryRef {
@@ -60,49 +61,59 @@ impl LogicalNode for FullKeyAggregate {
6061
type InputsType = OptionNodeInput;
6162

6263
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
63-
PlanNode::LogicalJoin(self.clone())
64+
PlanNode::FullKeyAggregate(self.clone())
6465
}
6566

6667
fn inputs(&self) -> Self::InputsType {
67-
let plan_node = match &self.source {
68-
QuerySource::LogicalJoin(join) => SingleNodeInput::new(join.as_plan_node()),
69-
QuerySource::FullKeyAggregate(full_key) => {
70-
SingleNodeInput::new(full_key.as_plan_node())
71-
}
72-
QuerySource::PreAggregation(pre_aggregation) => {
73-
SingleNodeInput::new(pre_aggregation.as_plan_node())
74-
}
75-
};
76-
plan_node
68+
let plan_node = self
69+
.multiplied_measures_resolver
70+
.as_ref()
71+
.map(|resolver| match resolver {
72+
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(item) => item.as_plan_node(),
73+
ResolvedMultipliedMeasures::PreAggregation(item) => item.as_plan_node(),
74+
});
75+
OptionNodeInput::new(plan_node)
7776
}
7877

7978
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
80-
let source = match inputs.item() {
81-
PlanNode::LogicalJoin(item) => QuerySource::LogicalJoin(item.clone()),
82-
PlanNode::FullKeyAggregate(item) => QuerySource::FullKeyAggregate(item.clone()),
83-
PlanNode::PreAggregation(item) => QuerySource::PreAggregation(item.clone()),
84-
_ => {
85-
return Err(CubeError::internal(format!(
86-
"{} is incorrect input for Query node",
87-
inputs.item().node_name()
88-
)))
89-
}
79+
let input = inputs.unpack();
80+
let multiplied_measures_resolver = if self.multiplied_measures_resolver.is_none()
81+
&& input.is_none()
82+
{
83+
None
84+
} else if let (Some(self_source), Some(input_source)) =
85+
(&self.multiplied_measures_resolver, input)
86+
{
87+
Some(match self_source {
88+
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(_) => {
89+
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(
90+
input_source.into_logical_node()?,
91+
)
92+
}
93+
ResolvedMultipliedMeasures::PreAggregation(_) => {
94+
ResolvedMultipliedMeasures::PreAggregation(input_source.into_logical_node()?)
95+
}
96+
})
97+
} else {
98+
return Err(CubeError::internal(format!(
99+
"Wrong inputs for FullKeyAggregate node"
100+
)));
90101
};
91-
Ok(Rc::new(Query {
92-
multistage_members: self.multistage_members.clone(),
102+
103+
Ok(Rc::new(Self {
93104
schema: self.schema.clone(),
94-
filter: self.filter.clone(),
95-
modifers: self.modifers.clone(),
96-
source,
105+
use_full_join_and_coalesce: self.use_full_join_and_coalesce,
106+
multiplied_measures_resolver,
107+
multi_stage_subquery_refs: self.multi_stage_subquery_refs.clone(),
97108
}))
98109
}
99110

100111
fn node_name() -> &'static str {
101-
"Query"
112+
"FullKeyAggregate"
102113
}
103114
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
104-
if let PlanNode::Query(query) = plan_node {
105-
Ok(query)
115+
if let PlanNode::FullKeyAggregate(item) = plan_node {
116+
Ok(item)
106117
} else {
107118
Err(cast_error::<Self>(&plan_node))
108119
}

0 commit comments

Comments
 (0)