Skip to content

Commit 675f484

Browse files
committed
in work
1 parent e1e6200 commit 675f484

File tree

2 files changed

+42
-43
lines changed

2 files changed

+42
-43
lines changed

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_node.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,9 @@ impl NodeInputs for VecNodeInput {
105105
}
106106

107107
pub trait LogicalNode {
108-
type InputsType: NodeInputs;
108+
fn inputs(&self) -> Vec<PlanNode>;
109109

110-
fn inputs(&self) -> Self::InputsType;
111-
112-
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError>;
110+
fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError>;
113111

114112
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError>;
115113

@@ -118,6 +116,7 @@ pub trait LogicalNode {
118116
fn node_name(&self) -> &'static str;
119117
}
120118

119+
#[derive(Clone)]
121120
pub enum PlanNode {
122121
Query(Rc<Query>),
123122
LogicalJoin(Rc<LogicalJoin>),
@@ -173,7 +172,6 @@ pub(super) fn cast_error(plan_node: &PlanNode, target_type: &str) -> CubeError {
173172
}
174173

175174
pub(super) fn check_inputs_len(
176-
input_name: &str,
177175
inputs: &Vec<PlanNode>,
178176
expected: usize,
179177
node_type: &str,
@@ -182,8 +180,7 @@ pub(super) fn check_inputs_len(
182180
Ok(())
183181
} else {
184182
Err(CubeError::internal(format!(
185-
"For input {} for node {} expected {} inputs but received {}",
186-
input_name,
183+
"For node {} expected {} inputs but received {}",
187184
node_type,
188185
expected,
189186
inputs.len()

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::*;
22
use cubenativeutils::CubeError;
3+
use itertools::Itertools;
34
use std::rc::Rc;
45

56
#[derive(Clone)]
@@ -47,48 +48,29 @@ pub struct Query {
4748
}
4849

4950
impl LogicalNode for Query {
50-
type InputsType = QueryInput;
51-
5251
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
5352
PlanNode::Query(self.clone())
5453
}
5554

56-
fn inputs(&self) -> Self::InputsType {
57-
let source = self.source.as_plan_node();
58-
let multistage_members = self
59-
.multistage_members
60-
.iter()
61-
.map(|member| member.as_plan_node())
62-
.collect();
63-
64-
QueryInput {
65-
source,
66-
multistage_members,
67-
}
55+
fn inputs(&self) -> Vec<PlanNode> {
56+
QueryInputPacker::pack(self)
6857
}
6958

70-
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
71-
let QueryInput {
72-
source,
59+
fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError> {
60+
let QueryInputUnPacker {
7361
multistage_members,
74-
} = inputs;
75-
76-
check_inputs_len(
77-
"multistage_members",
78-
&multistage_members,
79-
self.multistage_members.len(),
80-
self.node_name(),
81-
)?;
62+
source,
63+
} = QueryInputUnPacker::new(&self, &inputs)?;
8264

8365
Ok(Rc::new(Self {
8466
multistage_members: multistage_members
8567
.into_iter()
86-
.map(|member| member.into_logical_node())
68+
.map(|member| member.clone().into_logical_node())
8769
.collect::<Result<Vec<_>, _>>()?,
8870
schema: self.schema.clone(),
8971
filter: self.filter.clone(),
9072
modifers: self.modifers.clone(),
91-
source: self.source.with_plan_node(source)?,
73+
source: self.source.with_plan_node(source.clone())?,
9274
}))
9375
}
9476

@@ -104,18 +86,38 @@ impl LogicalNode for Query {
10486
}
10587
}
10688

107-
pub struct QueryInput {
108-
pub source: PlanNode,
109-
pub multistage_members: Vec<PlanNode>,
89+
pub struct QueryInputPacker {}
90+
91+
impl QueryInputPacker {
92+
pub fn pack(query: &Query) -> Vec<PlanNode> {
93+
let mut result = vec![];
94+
result.extend(
95+
query
96+
.multistage_members
97+
.iter()
98+
.map(|member| member.as_plan_node()),
99+
);
100+
result.push(query.source.as_plan_node());
101+
result
102+
}
103+
}
104+
pub struct QueryInputUnPacker<'a> {
105+
multistage_members: &'a [PlanNode],
106+
source: &'a PlanNode,
110107
}
111108

112-
impl NodeInputs for QueryInput {
113-
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_> {
114-
Box::new(std::iter::once(&self.source).chain(self.multistage_members.iter()))
109+
impl<'a> QueryInputUnPacker<'a> {
110+
pub fn new(query: &Query, inputs: &'a Vec<PlanNode>) -> Result<Self, CubeError> {
111+
check_inputs_len(&inputs, Self::inputs_len(query), query.node_name())?;
112+
let multistage_members = &inputs[0..query.multistage_members.len()];
113+
let source = &inputs[query.multistage_members.len()];
114+
Ok(Self {
115+
multistage_members,
116+
source,
117+
})
115118
}
116-
117-
fn iter_mut(&mut self) -> Box<dyn Iterator<Item = &mut PlanNode> + '_> {
118-
Box::new(std::iter::once(&mut self.source).chain(self.multistage_members.iter_mut()))
119+
fn inputs_len(query: &Query) -> usize {
120+
query.multistage_members.len() + 1
119121
}
120122
}
121123

0 commit comments

Comments
 (0)