Skip to content

Commit 69c5310

Browse files
committed
in work
1 parent d1bf747 commit 69c5310

File tree

5 files changed

+125
-273
lines changed

5 files changed

+125
-273
lines changed

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -32,41 +32,28 @@ pub struct AggregateMultipliedSubquery {
3232
}
3333

3434
impl LogicalNode for AggregateMultipliedSubquery {
35-
type InputsType = AggregateMultipliedSubqueryInput;
36-
3735
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
3836
PlanNode::AggregateMultipliedSubquery(self.clone())
3937
}
4038

41-
fn inputs(&self) -> Self::InputsType {
42-
let keys_subquery = self.keys_subquery.as_plan_node();
43-
let source = self.source.as_plan_node();
44-
let dimension_subqueries = self
45-
.dimension_subqueries
46-
.iter()
47-
.map(|itm| itm.as_plan_node())
48-
.collect_vec();
49-
AggregateMultipliedSubqueryInput {
50-
keys_subquery,
51-
source,
52-
dimension_subqueries,
53-
}
39+
fn inputs(&self) -> Vec<PlanNode> {
40+
AggregateMultipliedSubqueryInputPacker::pack(self)
5441
}
5542

56-
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
57-
let AggregateMultipliedSubqueryInput {
43+
fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError> {
44+
let AggregateMultipliedSubqueryInputUnPacker {
5845
keys_subquery,
5946
source,
6047
dimension_subqueries,
61-
} = inputs;
48+
} = AggregateMultipliedSubqueryInputUnPacker::new(&self, &inputs)?;
6249

6350
let result = Self {
6451
schema: self.schema.clone(),
65-
keys_subquery: keys_subquery.into_logical_node()?,
66-
source: self.source.with_plan_node(source)?,
52+
keys_subquery: keys_subquery.clone().into_logical_node()?,
53+
source: self.source.with_plan_node(source.clone())?,
6754
dimension_subqueries: dimension_subqueries
6855
.into_iter()
69-
.map(|itm| itm.into_logical_node())
56+
.map(|itm| itm.clone().into_logical_node())
7057
.collect::<Result<Vec<_>, _>>()?,
7158
};
7259

@@ -85,27 +72,41 @@ impl LogicalNode for AggregateMultipliedSubquery {
8572
}
8673
}
8774

88-
pub struct AggregateMultipliedSubqueryInput {
89-
pub keys_subquery: PlanNode,
90-
pub source: PlanNode,
91-
pub dimension_subqueries: Vec<PlanNode>,
92-
}
75+
pub struct AggregateMultipliedSubqueryInputPacker;
9376

94-
impl NodeInputs for AggregateMultipliedSubqueryInput {
95-
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_> {
96-
Box::new(
97-
std::iter::once(&self.keys_subquery)
98-
.chain(std::iter::once(&self.source))
99-
.chain(self.dimension_subqueries.iter()),
100-
)
77+
impl AggregateMultipliedSubqueryInputPacker {
78+
pub fn pack(aggregate: &AggregateMultipliedSubquery) -> Vec<PlanNode> {
79+
let mut result = vec![];
80+
result.push(aggregate.keys_subquery.as_plan_node());
81+
result.push(aggregate.source.as_plan_node());
82+
result.extend(aggregate.dimension_subqueries.iter().map(|itm| itm.as_plan_node()));
83+
result
10184
}
85+
}
10286

103-
fn iter_mut(&mut self) -> Box<dyn Iterator<Item = &mut PlanNode> + '_> {
104-
Box::new(
105-
std::iter::once(&mut self.keys_subquery)
106-
.chain(std::iter::once(&mut self.source))
107-
.chain(self.dimension_subqueries.iter_mut()),
108-
)
87+
pub struct AggregateMultipliedSubqueryInputUnPacker<'a> {
88+
keys_subquery: &'a PlanNode,
89+
source: &'a PlanNode,
90+
dimension_subqueries: &'a [PlanNode],
91+
}
92+
93+
impl<'a> AggregateMultipliedSubqueryInputUnPacker<'a> {
94+
pub fn new(aggregate: &AggregateMultipliedSubquery, inputs: &'a Vec<PlanNode>) -> Result<Self, CubeError> {
95+
check_inputs_len(&inputs, Self::inputs_len(aggregate), aggregate.node_name())?;
96+
97+
let keys_subquery = &inputs[0];
98+
let source = &inputs[1];
99+
let dimension_subqueries = &inputs[2..];
100+
101+
Ok(Self {
102+
keys_subquery,
103+
source,
104+
dimension_subqueries,
105+
})
106+
}
107+
108+
fn inputs_len(aggregate: &AggregateMultipliedSubquery) -> usize {
109+
2 + aggregate.dimension_subqueries.len()
109110
}
110111
}
111112

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs

Lines changed: 42 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -27,65 +27,39 @@ pub struct LogicalJoin {
2727
}
2828

2929
impl LogicalNode for LogicalJoin {
30-
type InputsType = LogicalJoinInput;
31-
3230
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
3331
PlanNode::LogicalJoin(self.clone())
3432
}
3533

36-
fn inputs(&self) -> Self::InputsType {
37-
let root = self.root.as_plan_node();
38-
let joins = self
39-
.joins
40-
.iter()
41-
.map(|itm| itm.cube.as_plan_node())
42-
.collect_vec();
43-
let dimension_subqueries = self
44-
.dimension_subqueries
45-
.iter()
46-
.map(|itm| itm.as_plan_node())
47-
.collect_vec();
48-
LogicalJoinInput {
49-
root,
50-
joins,
51-
dimension_subqueries,
52-
}
34+
fn inputs(&self) -> Vec<PlanNode> {
35+
LogicalJoinInputPacker::pack(self)
5336
}
5437

55-
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
56-
let LogicalJoinInput {
38+
fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError> {
39+
let LogicalJoinInputUnPacker {
5740
root,
5841
joins,
5942
dimension_subqueries,
60-
} = inputs;
61-
62-
check_inputs_len("joins", &joins, self.joins.len(), self.node_name())?;
63-
64-
check_inputs_len(
65-
"dimension_subqueries",
66-
&dimension_subqueries,
67-
self.dimension_subqueries.len(),
68-
self.node_name(),
69-
)?;
43+
} = LogicalJoinInputUnPacker::new(&self, &inputs)?;
7044

7145
let joins = self
7246
.joins
7347
.iter()
7448
.zip(joins.into_iter())
7549
.map(|(self_item, item)| -> Result<_, CubeError> {
7650
Ok(LogicalJoinItem {
77-
cube: item.into_logical_node()?,
51+
cube: item.clone().into_logical_node()?,
7852
on_sql: self_item.on_sql.clone(),
7953
})
8054
})
8155
.collect::<Result<Vec<_>, _>>()?;
8256

8357
let result = Self {
84-
root: root.into_logical_node()?,
58+
root: root.clone().into_logical_node()?,
8559
joins,
8660
dimension_subqueries: dimension_subqueries
8761
.into_iter()
88-
.map(|itm| itm.into_logical_node())
62+
.map(|itm| itm.clone().into_logical_node())
8963
.collect::<Result<Vec<_>, _>>()?,
9064
};
9165

@@ -104,27 +78,43 @@ impl LogicalNode for LogicalJoin {
10478
}
10579
}
10680

107-
pub struct LogicalJoinInput {
108-
pub root: PlanNode,
109-
pub joins: Vec<PlanNode>,
110-
pub dimension_subqueries: Vec<PlanNode>,
111-
}
81+
pub struct LogicalJoinInputPacker;
11282

113-
impl NodeInputs for LogicalJoinInput {
114-
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_> {
115-
Box::new(
116-
std::iter::once(&self.root)
117-
.chain(self.joins.iter())
118-
.chain(self.dimension_subqueries.iter()),
119-
)
83+
impl LogicalJoinInputPacker {
84+
pub fn pack(join: &LogicalJoin) -> Vec<PlanNode> {
85+
let mut result = vec![];
86+
result.push(join.root.as_plan_node());
87+
result.extend(join.joins.iter().map(|item| item.cube.as_plan_node()));
88+
result.extend(join.dimension_subqueries.iter().map(|item| item.as_plan_node()));
89+
result
12090
}
91+
}
92+
93+
pub struct LogicalJoinInputUnPacker<'a> {
94+
root: &'a PlanNode,
95+
joins: &'a [PlanNode],
96+
dimension_subqueries: &'a [PlanNode],
97+
}
12198

122-
fn iter_mut(&mut self) -> Box<dyn Iterator<Item = &mut PlanNode> + '_> {
123-
Box::new(
124-
std::iter::once(&mut self.root)
125-
.chain(self.joins.iter_mut())
126-
.chain(self.dimension_subqueries.iter_mut()),
127-
)
99+
impl<'a> LogicalJoinInputUnPacker<'a> {
100+
pub fn new(join: &LogicalJoin, inputs: &'a Vec<PlanNode>) -> Result<Self, CubeError> {
101+
check_inputs_len(&inputs, Self::inputs_len(join), join.node_name())?;
102+
103+
let root = &inputs[0];
104+
let joins_start = 1;
105+
let joins_end = joins_start + join.joins.len();
106+
let joins = &inputs[joins_start..joins_end];
107+
let dimension_subqueries = &inputs[joins_end..];
108+
109+
Ok(Self {
110+
root,
111+
joins,
112+
dimension_subqueries,
113+
})
114+
}
115+
116+
fn inputs_len(join: &LogicalJoin) -> usize {
117+
1 + join.joins.len() + join.dimension_subqueries.len()
128118
}
129119
}
130120

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,25 @@ pub struct KeysSubQuery {
1313
}
1414

1515
impl LogicalNode for KeysSubQuery {
16-
type InputsType = KeysSubQueryInputs;
17-
1816
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
1917
PlanNode::KeysSubQuery(self.clone())
2018
}
2119

22-
fn inputs(&self) -> Self::InputsType {
23-
KeysSubQueryInputs {
24-
pk_cube: self.pk_cube.as_plan_node(),
25-
source: self.source.as_plan_node(),
26-
}
20+
fn inputs(&self) -> Vec<PlanNode> {
21+
vec![self.pk_cube.as_plan_node(), self.source.as_plan_node()]
2722
}
2823

29-
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
30-
let KeysSubQueryInputs { pk_cube, source } = inputs;
24+
fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError> {
25+
check_inputs_len(&inputs, 2, self.node_name())?;
26+
let pk_cube = &inputs[0];
27+
let source = &inputs[1];
3128

3229
let res = Self {
33-
pk_cube: pk_cube.into_logical_node()?,
30+
pk_cube: pk_cube.clone().into_logical_node()?,
3431
schema: self.schema.clone(),
3532
primary_keys_dimensions: self.primary_keys_dimensions.clone(),
3633
filter: self.filter.clone(),
37-
source: source.into_logical_node()?,
34+
source: source.clone().into_logical_node()?,
3835
};
3936
Ok(Rc::new(res))
4037
}
@@ -51,20 +48,6 @@ impl LogicalNode for KeysSubQuery {
5148
}
5249
}
5350

54-
pub struct KeysSubQueryInputs {
55-
pub pk_cube: PlanNode,
56-
pub source: PlanNode,
57-
}
58-
59-
impl NodeInputs for KeysSubQueryInputs {
60-
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_> {
61-
Box::new(std::iter::once(&self.pk_cube).chain(std::iter::once(&self.source)))
62-
}
63-
64-
fn iter_mut(&mut self) -> Box<dyn Iterator<Item = &mut PlanNode> + '_> {
65-
Box::new(std::iter::once(&mut self.pk_cube).chain(std::iter::once(&mut self.source)))
66-
}
67-
}
6851

6952
impl PrettyPrint for KeysSubQuery {
7053
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {

0 commit comments

Comments
 (0)