Skip to content

Commit 725abcf

Browse files
committed
builder for logical plan nodes
1 parent 76dc8f6 commit 725abcf

26 files changed

+430
-246
lines changed

rust/cubesqlplanner/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ node_modules
1111
/cubesql/egraph-debug-intermediate
1212
egraph-debug
1313
/cubesql/debug-qtrace
14+
.claude

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,52 +2,73 @@ use super::*;
22
use crate::planner::BaseCube;
33
use cubenativeutils::CubeError;
44
use std::rc::Rc;
5+
use typed_builder::TypedBuilder;
56

6-
#[derive(Clone)]
7+
#[derive(Clone, TypedBuilder)]
78
pub struct OriginalSqlPreAggregation {
8-
pub name: String,
9+
name: String,
10+
}
11+
12+
impl OriginalSqlPreAggregation {
13+
pub fn name(&self) -> &String {
14+
&self.name
15+
}
916
}
1017

1118
impl PrettyPrint for OriginalSqlPreAggregation {
1219
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
13-
result.println(&format!("OriginalSqlPreAggregation: {}", self.name), state);
20+
result.println(&format!("OriginalSqlPreAggregation: {}", self.name()), state);
1421
}
1522
}
1623

17-
#[derive(Clone)]
24+
#[derive(Clone, TypedBuilder)]
1825
pub struct Cube {
19-
pub name: String,
20-
pub cube: Rc<BaseCube>,
21-
pub original_sql_pre_aggregation: Option<OriginalSqlPreAggregation>,
26+
name: String,
27+
cube: Rc<BaseCube>,
28+
#[builder(default)]
29+
original_sql_pre_aggregation: Option<OriginalSqlPreAggregation>,
30+
}
31+
32+
impl Cube {
33+
pub fn name(&self) -> &String {
34+
&self.name
35+
}
36+
37+
pub fn cube(&self) -> &Rc<BaseCube> {
38+
&self.cube
39+
}
40+
41+
pub fn original_sql_pre_aggregation(&self) -> &Option<OriginalSqlPreAggregation> {
42+
&self.original_sql_pre_aggregation
43+
}
2244
}
2345

2446
impl PrettyPrint for Cube {
2547
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
26-
result.println(&format!("Cube: {}", self.name), state);
27-
if let Some(original_sql_pre_aggregation) = &self.original_sql_pre_aggregation {
48+
result.println(&format!("Cube: {}", self.name()), state);
49+
if let Some(original_sql_pre_aggregation) = self.original_sql_pre_aggregation() {
2850
original_sql_pre_aggregation.pretty_print(result, state);
2951
}
3052
}
3153
}
3254

3355
impl Cube {
3456
pub fn new(cube: Rc<BaseCube>) -> Rc<Self> {
35-
Rc::new(Self {
36-
name: cube.name().clone(),
37-
cube,
38-
original_sql_pre_aggregation: None,
39-
})
57+
Rc::new(Self::builder()
58+
.name(cube.name().clone())
59+
.cube(cube)
60+
.build())
4061
}
4162

4263
pub fn with_original_sql_pre_aggregation(
4364
self: Rc<Self>,
4465
original_sql_pre_aggregation: OriginalSqlPreAggregation,
4566
) -> Rc<Self> {
46-
Rc::new(Self {
47-
name: self.name.clone(),
48-
cube: self.cube.clone(),
49-
original_sql_pre_aggregation: Some(original_sql_pre_aggregation),
50-
})
67+
Rc::new(Self::builder()
68+
.name(self.name().clone())
69+
.cube(self.cube().clone())
70+
.original_sql_pre_aggregation(Some(original_sql_pre_aggregation))
71+
.build())
5172
}
5273
}
5374

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,31 @@ use super::*;
22
use crate::planner::sql_evaluator::MemberSymbol;
33
use cubenativeutils::CubeError;
44
use std::rc::Rc;
5+
use typed_builder::TypedBuilder;
56

7+
#[derive(TypedBuilder)]
68
pub struct MultiStageSubqueryRef {
7-
pub name: String,
8-
pub symbols: Vec<Rc<MemberSymbol>>,
9+
name: String,
10+
#[builder(default)]
11+
symbols: Vec<Rc<MemberSymbol>>,
12+
}
13+
14+
impl MultiStageSubqueryRef {
15+
pub fn name(&self) -> &String {
16+
&self.name
17+
}
18+
19+
pub fn symbols(&self) -> &Vec<Rc<MemberSymbol>> {
20+
&self.symbols
21+
}
922
}
1023

1124
impl PrettyPrint for MultiStageSubqueryRef {
1225
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
13-
result.println(&format!("MultiStageSubqueryRef: {}", self.name), state);
26+
result.println(&format!("MultiStageSubqueryRef: {}", self.name()), state);
1427
let state = state.new_level();
1528
result.println(
16-
&format!("symbols: {}", print_symbols(&self.symbols)),
29+
&format!("symbols: {}", print_symbols(self.symbols())),
1730
&state,
1831
);
1932
}
@@ -52,12 +65,32 @@ impl PrettyPrint for ResolvedMultipliedMeasures {
5265
}
5366
}
5467

55-
#[derive(Clone)]
68+
#[derive(Clone, TypedBuilder)]
5669
pub struct FullKeyAggregate {
57-
pub schema: Rc<LogicalSchema>,
58-
pub use_full_join_and_coalesce: bool,
59-
pub multiplied_measures_resolver: Option<ResolvedMultipliedMeasures>,
60-
pub multi_stage_subquery_refs: Vec<Rc<MultiStageSubqueryRef>>,
70+
schema: Rc<LogicalSchema>,
71+
use_full_join_and_coalesce: bool,
72+
#[builder(default)]
73+
multiplied_measures_resolver: Option<ResolvedMultipliedMeasures>,
74+
#[builder(default)]
75+
multi_stage_subquery_refs: Vec<Rc<MultiStageSubqueryRef>>,
76+
}
77+
78+
impl FullKeyAggregate {
79+
pub fn schema(&self) -> &Rc<LogicalSchema> {
80+
&self.schema
81+
}
82+
83+
pub fn use_full_join_and_coalesce(&self) -> bool {
84+
self.use_full_join_and_coalesce
85+
}
86+
87+
pub fn multiplied_measures_resolver(&self) -> &Option<ResolvedMultipliedMeasures> {
88+
&self.multiplied_measures_resolver
89+
}
90+
91+
pub fn multi_stage_subquery_refs(&self) -> &Vec<Rc<MultiStageSubqueryRef>> {
92+
&self.multi_stage_subquery_refs
93+
}
6194
}
6295

6396
impl LogicalNode for FullKeyAggregate {
@@ -66,7 +99,7 @@ impl LogicalNode for FullKeyAggregate {
6699
}
67100

68101
fn inputs(&self) -> Vec<PlanNode> {
69-
if let Some(resolver) = &self.multiplied_measures_resolver {
102+
if let Some(resolver) = self.multiplied_measures_resolver() {
70103
vec![match resolver {
71104
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(item) => item.as_plan_node(),
72105
ResolvedMultipliedMeasures::PreAggregation(item) => item.as_plan_node(),
@@ -77,14 +110,14 @@ impl LogicalNode for FullKeyAggregate {
77110
}
78111

79112
fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError> {
80-
let multiplied_measures_resolver = if self.multiplied_measures_resolver.is_none() {
113+
let multiplied_measures_resolver = if self.multiplied_measures_resolver().is_none() {
81114
check_inputs_len(&inputs, 0, self.node_name())?;
82115
None
83116
} else {
84117
check_inputs_len(&inputs, 1, self.node_name())?;
85118
let input_source = &inputs[0];
86119

87-
Some(match self.multiplied_measures_resolver.as_ref().unwrap() {
120+
Some(match self.multiplied_measures_resolver().as_ref().unwrap() {
88121
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(_) => {
89122
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(
90123
input_source.clone().into_logical_node()?,
@@ -98,12 +131,12 @@ impl LogicalNode for FullKeyAggregate {
98131
})
99132
};
100133

101-
Ok(Rc::new(Self {
102-
schema: self.schema.clone(),
103-
use_full_join_and_coalesce: self.use_full_join_and_coalesce,
104-
multiplied_measures_resolver,
105-
multi_stage_subquery_refs: self.multi_stage_subquery_refs.clone(),
106-
}))
134+
Ok(Rc::new(Self::builder()
135+
.schema(self.schema().clone())
136+
.use_full_join_and_coalesce(self.use_full_join_and_coalesce())
137+
.multiplied_measures_resolver(multiplied_measures_resolver)
138+
.multi_stage_subquery_refs(self.multi_stage_subquery_refs().clone())
139+
.build()))
107140
}
108141

109142
fn node_name(&self) -> &'static str {
@@ -124,22 +157,22 @@ impl PrettyPrint for FullKeyAggregate {
124157
let state = state.new_level();
125158
let details_state = state.new_level();
126159
result.println(&format!("schema:"), &state);
127-
self.schema.pretty_print(result, &details_state);
160+
self.schema().pretty_print(result, &details_state);
128161
result.println(
129162
&format!(
130163
"use_full_join_and_coalesce: {}",
131-
self.use_full_join_and_coalesce
164+
self.use_full_join_and_coalesce()
132165
),
133166
&state,
134167
);
135-
if let Some(resolve_multiplied_measures) = &self.multiplied_measures_resolver {
168+
if let Some(resolve_multiplied_measures) = self.multiplied_measures_resolver() {
136169
result.println("multiplied measures resolver:", &state);
137170
resolve_multiplied_measures.pretty_print(result, &details_state);
138171
}
139172

140-
if !self.multi_stage_subquery_refs.is_empty() {
173+
if !self.multi_stage_subquery_refs().is_empty() {
141174
result.println("multi_stage_subquery_refs:", &state);
142-
for subquery_ref in self.multi_stage_subquery_refs.iter() {
175+
for subquery_ref in self.multi_stage_subquery_refs().iter() {
143176
subquery_ref.pretty_print(result, &details_state);
144177
}
145178
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,53 @@ use super::*;
33
use crate::planner::sql_evaluator::SqlCall;
44
use cubenativeutils::CubeError;
55
use std::rc::Rc;
6+
use typed_builder::TypedBuilder;
67

7-
#[derive(Clone)]
8+
#[derive(Clone, TypedBuilder)]
89
pub struct LogicalJoinItem {
9-
pub cube: Rc<Cube>,
10-
pub on_sql: Rc<SqlCall>,
10+
cube: Rc<Cube>,
11+
on_sql: Rc<SqlCall>,
12+
}
13+
14+
impl LogicalJoinItem {
15+
pub fn cube(&self) -> &Rc<Cube> {
16+
&self.cube
17+
}
18+
19+
pub fn on_sql(&self) -> &Rc<SqlCall> {
20+
&self.on_sql
21+
}
1122
}
1223

1324
impl PrettyPrint for LogicalJoinItem {
1425
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
1526
result.println(&format!("CubeJoinItem: "), state);
1627
let details_state = state.new_level();
17-
self.cube.pretty_print(result, &details_state);
28+
self.cube().pretty_print(result, &details_state);
1829
}
1930
}
2031

21-
#[derive(Clone)]
32+
#[derive(Clone, TypedBuilder)]
2233
pub struct LogicalJoin {
23-
pub root: Rc<Cube>,
24-
pub joins: Vec<LogicalJoinItem>,
25-
pub dimension_subqueries: Vec<Rc<DimensionSubQuery>>,
34+
root: Rc<Cube>,
35+
#[builder(default)]
36+
joins: Vec<LogicalJoinItem>,
37+
#[builder(default)]
38+
dimension_subqueries: Vec<Rc<DimensionSubQuery>>,
39+
}
40+
41+
impl LogicalJoin {
42+
pub fn root(&self) -> &Rc<Cube> {
43+
&self.root
44+
}
45+
46+
pub fn joins(&self) -> &Vec<LogicalJoinItem> {
47+
&self.joins
48+
}
49+
50+
pub fn dimension_subqueries(&self) -> &Vec<Rc<DimensionSubQuery>> {
51+
&self.dimension_subqueries
52+
}
2653
}
2754

2855
impl LogicalNode for LogicalJoin {
@@ -42,25 +69,25 @@ impl LogicalNode for LogicalJoin {
4269
} = LogicalJoinInputUnPacker::new(&self, &inputs)?;
4370

4471
let joins = self
45-
.joins
72+
.joins()
4673
.iter()
4774
.zip(joins.iter())
4875
.map(|(self_item, item)| -> Result<_, CubeError> {
49-
Ok(LogicalJoinItem {
50-
cube: item.clone().into_logical_node()?,
51-
on_sql: self_item.on_sql.clone(),
52-
})
76+
Ok(LogicalJoinItem::builder()
77+
.cube(item.clone().into_logical_node()?)
78+
.on_sql(self_item.on_sql().clone())
79+
.build())
5380
})
5481
.collect::<Result<Vec<_>, _>>()?;
5582

56-
let result = Self {
57-
root: root.clone().into_logical_node()?,
58-
joins,
59-
dimension_subqueries: dimension_subqueries
83+
let result = Self::builder()
84+
.root(root.clone().into_logical_node()?)
85+
.joins(joins)
86+
.dimension_subqueries(dimension_subqueries
6087
.iter()
6188
.map(|itm| itm.clone().into_logical_node())
62-
.collect::<Result<Vec<_>, _>>()?,
63-
};
89+
.collect::<Result<Vec<_>, _>>()?)
90+
.build();
6491

6592
Ok(Rc::new(result))
6693
}
@@ -82,10 +109,10 @@ pub struct LogicalJoinInputPacker;
82109
impl LogicalJoinInputPacker {
83110
pub fn pack(join: &LogicalJoin) -> Vec<PlanNode> {
84111
let mut result = vec![];
85-
result.push(join.root.as_plan_node());
86-
result.extend(join.joins.iter().map(|item| item.cube.as_plan_node()));
112+
result.push(join.root().as_plan_node());
113+
result.extend(join.joins().iter().map(|item| item.cube().as_plan_node()));
87114
result.extend(
88-
join.dimension_subqueries
115+
join.dimension_subqueries()
89116
.iter()
90117
.map(|item| item.as_plan_node()),
91118
);
@@ -105,7 +132,7 @@ impl<'a> LogicalJoinInputUnPacker<'a> {
105132

106133
let root = &inputs[0];
107134
let joins_start = 1;
108-
let joins_end = joins_start + join.joins.len();
135+
let joins_end = joins_start + join.joins().len();
109136
let joins = &inputs[joins_start..joins_end];
110137
let dimension_subqueries = &inputs[joins_end..];
111138

@@ -117,7 +144,7 @@ impl<'a> LogicalJoinInputUnPacker<'a> {
117144
}
118145

119146
fn inputs_len(join: &LogicalJoin) -> usize {
120-
1 + join.joins.len() + join.dimension_subqueries.len()
147+
1 + join.joins().len() + join.dimension_subqueries().len()
121148
}
122149
}
123150

@@ -127,16 +154,16 @@ impl PrettyPrint for LogicalJoin {
127154
let state = state.new_level();
128155
let details_state = state.new_level();
129156
result.println(&format!("root: "), &state);
130-
self.root.pretty_print(result, &details_state);
157+
self.root().pretty_print(result, &details_state);
131158
result.println(&format!("joins: "), &state);
132159
let state = state.new_level();
133-
for join in self.joins.iter() {
160+
for join in self.joins().iter() {
134161
join.pretty_print(result, &state);
135162
}
136-
if !self.dimension_subqueries.is_empty() {
163+
if !self.dimension_subqueries().is_empty() {
137164
result.println("dimension_subqueries:", &state);
138165
let details_state = state.new_level();
139-
for subquery in self.dimension_subqueries.iter() {
166+
for subquery in self.dimension_subqueries().iter() {
140167
subquery.pretty_print(result, &details_state);
141168
}
142169
}

0 commit comments

Comments
 (0)