Skip to content

Commit 1782c39

Browse files
committed
in work
1 parent 0ae4af1 commit 1782c39

File tree

8 files changed

+243
-121
lines changed

8 files changed

+243
-121
lines changed

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl PrettyPrint for MultiStageSubqueryRef {
2121
#[derive(Clone)]
2222
pub enum ResolvedMultipliedMeasures {
2323
ResolveMultipliedMeasures(Rc<ResolveMultipliedMeasures>),
24-
PreAggregation(Rc<SimpleQuery>),
24+
PreAggregation(Rc<Query>),
2525
}
2626

2727
impl ResolvedMultipliedMeasures {
@@ -56,6 +56,59 @@ pub struct FullKeyAggregate {
5656
pub multi_stage_subquery_refs: Vec<Rc<MultiStageSubqueryRef>>,
5757
}
5858

59+
impl LogicalNode for FullKeyAggregate {
60+
type InputsType = OptionNodeInput;
61+
62+
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
63+
PlanNode::LogicalJoin(self.clone())
64+
}
65+
66+
fn inputs(&self) -> Self::InputsType {
67+
let plan_node = match &self.source {
68+
QuerySource::LogicalJoin(join) => SingleNodeInput::new(join.as_plan_node()),
69+
QuerySource::FullKeyAggregate(full_key) => {
70+
SingleNodeInput::new(full_key.as_plan_node())
71+
}
72+
QuerySource::PreAggregation(pre_aggregation) => {
73+
SingleNodeInput::new(pre_aggregation.as_plan_node())
74+
}
75+
};
76+
plan_node
77+
}
78+
79+
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
80+
let source = match inputs.item() {
81+
PlanNode::LogicalJoin(item) => QuerySource::LogicalJoin(item.clone()),
82+
PlanNode::FullKeyAggregate(item) => QuerySource::FullKeyAggregate(item.clone()),
83+
PlanNode::PreAggregation(item) => QuerySource::PreAggregation(item.clone()),
84+
_ => {
85+
return Err(CubeError::internal(format!(
86+
"{} is incorrect input for Query node",
87+
inputs.item().node_name()
88+
)))
89+
}
90+
};
91+
Ok(Rc::new(Query {
92+
multistage_members: self.multistage_members.clone(),
93+
schema: self.schema.clone(),
94+
filter: self.filter.clone(),
95+
modifers: self.modifers.clone(),
96+
source,
97+
}))
98+
}
99+
100+
fn node_name() -> &'static str {
101+
"Query"
102+
}
103+
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
104+
if let PlanNode::Query(query) = plan_node {
105+
Ok(query)
106+
} else {
107+
Err(cast_error::<Self>(&plan_node))
108+
}
109+
}
110+
}
111+
59112
impl PrettyPrint for FullKeyAggregate {
60113
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
61114
result.println("FullKeyAggregate: ", state);

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate_query.rs

Lines changed: 0 additions & 32 deletions
This file was deleted.

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,27 @@ use crate::planner::sql_evaluator::SqlCall;
44
use std::rc::Rc;
55

66
#[derive(Clone)]
7-
pub struct CubeJoinItem {
7+
pub struct LogicalJoinItem {
88
pub cube: Rc<Cube>,
99
pub on_sql: Rc<SqlCall>,
1010
}
1111

12-
impl PrettyPrint for CubeJoinItem {
12+
impl PrettyPrint for LogicalJoinItem {
1313
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
1414
result.println(&format!("CubeJoinItem: "), state);
1515
let details_state = state.new_level();
1616
self.cube.pretty_print(result, &details_state);
1717
}
1818
}
1919

20-
#[derive(Clone)]
21-
pub enum LogicalJoinItem {
22-
CubeJoinItem(CubeJoinItem),
23-
}
24-
25-
impl PrettyPrint for LogicalJoinItem {
26-
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
27-
match self {
28-
LogicalJoinItem::CubeJoinItem(item) => item.pretty_print(result, state),
29-
}
30-
}
31-
}
32-
3320
#[derive(Clone)]
3421
pub struct LogicalJoin {
3522
pub root: Rc<Cube>,
3623
pub joins: Vec<LogicalJoinItem>,
3724
pub dimension_subqueries: Vec<Rc<DimensionSubQuery>>,
3825
}
3926

27+
4028
impl PrettyPrint for LogicalJoin {
4129
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
4230
result.println(&format!("Join: "), state);
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
use super::*;
2+
use cubenativeutils::CubeError;
3+
use std::rc::Rc;
4+
5+
pub trait NodeInputs {
6+
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_>;
7+
}
8+
9+
pub struct SingleNodeInput {
10+
item: PlanNode,
11+
}
12+
13+
impl SingleNodeInput {
14+
pub fn new(item: PlanNode) -> Self {
15+
Self { item }
16+
}
17+
18+
pub fn item(&self) -> &PlanNode {
19+
&self.item
20+
}
21+
}
22+
23+
impl NodeInputs for SingleNodeInput {
24+
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_> {
25+
Box::new(std::iter::once(&self.item))
26+
}
27+
}
28+
29+
pub struct OptionNodeInput {
30+
item: Option<PlanNode>,
31+
}
32+
33+
impl OptionNodeInput {
34+
pub fn new<T: LogicalNode>(item: Option<PlanNode>) -> Self {
35+
Self { item }
36+
}
37+
38+
pub fn item(&self) -> &Option<PlanNode> {
39+
&self.item
40+
}
41+
}
42+
43+
impl NodeInputs for OptionNodeInput {
44+
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_> {
45+
if let Some(item) = &self.item {
46+
Box::new(std::iter::once(item))
47+
} else {
48+
Box::new(std::iter::empty())
49+
}
50+
}
51+
}
52+
53+
pub struct VecNodeInput {
54+
items: Vec<PlanNode>,
55+
}
56+
57+
impl VecNodeInput {
58+
pub fn new(items: Vec<PlanNode>) -> Self {
59+
Self { items }
60+
}
61+
62+
pub fn items(&self) -> &Vec<PlanNode> {
63+
&self.items
64+
}
65+
}
66+
67+
impl NodeInputs for VecNodeInput {
68+
fn iter(&self) -> Box<dyn Iterator<Item = &PlanNode> + '_> {
69+
Box::new(self.items.iter())
70+
}
71+
}
72+
73+
pub trait LogicalNode {
74+
type InputsType: NodeInputs;
75+
76+
fn inputs(&self) -> Self::InputsType;
77+
78+
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError>;
79+
80+
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError>;
81+
82+
fn as_plan_node(self: &Rc<Self>) -> PlanNode;
83+
84+
fn node_name() -> &'static str;
85+
}
86+
87+
pub enum PlanNode {
88+
Query(Rc<Query>),
89+
LogicalJoin(Rc<LogicalJoin>),
90+
FullKeyAggregate(Rc<FullKeyAggregate>),
91+
PreAggregation(Rc<PreAggregation>),
92+
}
93+
94+
impl PlanNode {
95+
pub fn node_name(&self) -> &'static str {
96+
match self {
97+
PlanNode::Query(_) => Query::node_name(),
98+
PlanNode::LogicalJoin(_) => LogicalJoin::node_name(),
99+
PlanNode::FullKeyAggregate(_) => FullKeyAggregate::node_name(),
100+
PlanNode::PreAggregation(_) => PreAggregation::node_name(),
101+
}
102+
}
103+
}
104+
105+
pub(super) fn cast_error<T: LogicalNode>(plan_node: &PlanNode) -> CubeError {
106+
CubeError::internal(format!(
107+
"Can't cast {} PlanNode into {}",
108+
plan_node.node_name(),
109+
T::node_name(),
110+
))
111+
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ mod cube;
33
mod dimension_subquery;
44
mod filter;
55
mod full_key_aggregate;
6-
mod full_key_aggregate_query;
76
mod join;
87
mod keys_subquery;
8+
mod logical_node;
99
mod logical_query_modifers;
1010
mod measure_subquery;
1111
mod multistage;
@@ -16,16 +16,15 @@ mod query;
1616
mod regular_measures_query;
1717
mod resolve_multiplied_measures;
1818
mod schema;
19-
mod simple_query;
2019

2120
pub use aggregate_multiplied_subquery::*;
2221
pub use cube::*;
2322
pub use dimension_subquery::*;
2423
pub use filter::*;
2524
pub use full_key_aggregate::*;
26-
pub use full_key_aggregate_query::*;
2725
pub use join::*;
2826
pub use keys_subquery::*;
27+
pub use logical_node::*;
2928
pub use logical_query_modifers::*;
3029
pub use measure_subquery::*;
3130
pub use multistage::*;
@@ -36,4 +35,3 @@ pub use query::*;
3635
pub use regular_measures_query::*;
3736
pub use resolve_multiplied_measures::*;
3837
pub use schema::*;
39-
pub use simple_query::*;

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::*;
2+
use cubenativeutils::CubeError;
23
use std::rc::Rc;
34

45
#[derive(Clone)]
@@ -27,6 +28,59 @@ pub struct Query {
2728
pub source: QuerySource,
2829
}
2930

31+
impl LogicalNode for Query {
32+
type InputsType = SingleNodeInput;
33+
34+
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
35+
PlanNode::Query(self.clone())
36+
}
37+
38+
fn inputs(&self) -> Self::InputsType {
39+
let plan_node = match &self.source {
40+
QuerySource::LogicalJoin(join) => SingleNodeInput::new(join.as_plan_node()),
41+
QuerySource::FullKeyAggregate(full_key) => {
42+
SingleNodeInput::new(full_key.as_plan_node())
43+
}
44+
QuerySource::PreAggregation(pre_aggregation) => {
45+
SingleNodeInput::new(pre_aggregation.as_plan_node())
46+
}
47+
};
48+
plan_node
49+
}
50+
51+
fn with_inputs(self: Rc<Self>, inputs: Self::InputsType) -> Result<Rc<Self>, CubeError> {
52+
let source = match inputs.item() {
53+
PlanNode::LogicalJoin(item) => QuerySource::LogicalJoin(item.clone()),
54+
PlanNode::FullKeyAggregate(item) => QuerySource::FullKeyAggregate(item.clone()),
55+
PlanNode::PreAggregation(item) => QuerySource::PreAggregation(item.clone()),
56+
_ => {
57+
return Err(CubeError::internal(format!(
58+
"{} is incorrect input for Query node",
59+
inputs.item().node_name()
60+
)))
61+
}
62+
};
63+
Ok(Rc::new(Query {
64+
multistage_members: self.multistage_members.clone(),
65+
schema: self.schema.clone(),
66+
filter: self.filter.clone(),
67+
modifers: self.modifers.clone(),
68+
source,
69+
}))
70+
}
71+
72+
fn node_name() -> &'static str {
73+
"Query"
74+
}
75+
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
76+
if let PlanNode::Query(query) = plan_node {
77+
Ok(query)
78+
} else {
79+
Err(cast_error::<Self>(&plan_node))
80+
}
81+
}
82+
}
83+
3084
impl PrettyPrint for Query {
3185
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
3286
result.println("Query: ", state);

0 commit comments

Comments
 (0)