Skip to content

Commit 9e24128

Browse files
committed
in work
1 parent 2aa5078 commit 9e24128

File tree

18 files changed

+163
-492
lines changed

18 files changed

+163
-492
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesqlplanner/Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/cubesqlplanner/cubesqlplanner/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ chrono = "0.4.15"
2020
chrono-tz = "0.8.2"
2121
lazy_static = "1.4.0"
2222
regex = "1.3.9"
23+
typed-builder = "0.21.2"
2324

2425
[dependencies.neon]
2526
version = "=1"

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,12 @@ use super::*;
33
use cubenativeutils::CubeError;
44
use std::rc::Rc;
55

6-
pub enum AggregateMultipliedSubquerySouce {
7-
Cube(Rc<Cube>),
8-
MeasureSubquery(Rc<MeasureSubquery>),
9-
}
10-
11-
impl AggregateMultipliedSubquerySouce {
12-
fn as_plan_node(&self) -> PlanNode {
13-
match self {
14-
Self::Cube(item) => item.as_plan_node(),
15-
Self::MeasureSubquery(item) => item.as_plan_node(),
16-
}
17-
}
18-
fn with_plan_node(&self, plan_node: PlanNode) -> Result<Self, CubeError> {
19-
Ok(match self {
20-
Self::Cube(_) => Self::Cube(plan_node.into_logical_node()?),
21-
Self::MeasureSubquery(_) => Self::MeasureSubquery(plan_node.into_logical_node()?),
22-
})
23-
}
24-
}
6+
logical_source_enum!(AggregateMultipliedSubquerySource, [Cube, MeasureSubquery]);
257

268
pub struct AggregateMultipliedSubquery {
279
pub schema: Rc<LogicalSchema>,
2810
pub keys_subquery: Rc<KeysSubQuery>,
29-
pub source: AggregateMultipliedSubquerySouce,
11+
pub source: AggregateMultipliedSubquerySource,
3012
pub dimension_subqueries: Vec<Rc<DimensionSubQuery>>,
3113
}
3214

@@ -127,16 +109,7 @@ impl PrettyPrint for AggregateMultipliedSubquery {
127109
result.println("keys_subquery:", &state);
128110
self.keys_subquery.pretty_print(result, &details_state);
129111
result.println("source:", &state);
130-
match &self.source {
131-
AggregateMultipliedSubquerySouce::Cube(cube) => {
132-
result.println("Cube:", &details_state);
133-
cube.pretty_print(result, &details_state.new_level());
134-
}
135-
AggregateMultipliedSubquerySouce::MeasureSubquery(measure_subquery) => {
136-
result.println(&format!("MeasureSubquery: "), &details_state);
137-
measure_subquery.pretty_print(result, &details_state);
138-
}
139-
}
112+
self.source.pretty_print(result, &details_state);
140113
if !self.dimension_subqueries.is_empty() {
141114
result.println("dimension_subqueries:", &state);
142115
let details_state = state.new_level();

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/calc_groups_cross_join.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use std::rc::Rc;
66

77
#[derive(Clone)]
88
pub struct CalcGroupDescription {
9-
symbol: Rc<MemberSymbol>,
10-
values: Vec<String>,
9+
pub symbol: Rc<MemberSymbol>,
10+
pub values: Vec<String>,
1111
}
1212

1313
impl PrettyPrint for CalcGroupDescription {
@@ -20,10 +20,15 @@ impl PrettyPrint for CalcGroupDescription {
2020
}
2121
}
2222

23+
logical_source_enum!(
24+
CalcGroupsCrossJoinSource,
25+
[LogicalJoin, FullKeyAggregate, PreAggregation]
26+
);
27+
2328
#[derive(Clone)]
2429
pub struct CalcGroupsCrossJoin {
25-
source: QuerySource,
26-
calc_groups: Vec<Rc<CalcGroupDescription>>,
30+
pub source: CalcGroupsCrossJoinSource,
31+
pub calc_groups: Vec<Rc<CalcGroupDescription>>,
2732
}
2833

2934
impl LogicalNode for CalcGroupsCrossJoin {
@@ -46,7 +51,7 @@ impl LogicalNode for CalcGroupsCrossJoin {
4651
}
4752

4853
fn node_name(&self) -> &'static str {
49-
"MultiStageGetDateRange"
54+
"CalcGroupsCrossJoin"
5055
}
5156

5257
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ impl ResolvedMultipliedMeasures {
3131
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(resolve_multiplied_measures) => {
3232
resolve_multiplied_measures.schema.clone()
3333
}
34-
ResolvedMultipliedMeasures::PreAggregation(simple_query) => simple_query.schema.clone(),
34+
ResolvedMultipliedMeasures::PreAggregation(simple_query) => {
35+
simple_query.schema().clone()
36+
}
3537
}
3638
}
3739
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,33 @@ use super::*;
22
use crate::planner::sql_evaluator::MemberSymbol;
33
use cubenativeutils::CubeError;
44
use std::rc::Rc;
5+
use typed_builder::TypedBuilder;
56

6-
#[derive(Clone)]
7+
#[derive(Clone, TypedBuilder)]
78
pub struct KeysSubQuery {
8-
pub pk_cube: Rc<Cube>,
9-
pub schema: Rc<LogicalSchema>,
10-
pub primary_keys_dimensions: Vec<Rc<MemberSymbol>>,
11-
pub filter: Rc<LogicalFilter>,
12-
pub source: Rc<LogicalJoin>,
9+
pk_cube: Rc<Cube>,
10+
schema: Rc<LogicalSchema>,
11+
primary_keys_dimensions: Vec<Rc<MemberSymbol>>,
12+
filter: Rc<LogicalFilter>,
13+
source: Rc<LogicalJoin>,
14+
}
15+
16+
impl KeysSubQuery {
17+
pub fn pk_cube(&self) -> &Rc<Cube> {
18+
&self.pk_cube
19+
}
20+
pub fn schema(&self) -> &Rc<LogicalSchema> {
21+
&self.schema
22+
}
23+
pub fn primary_keys_dimensions(&self) -> &Vec<Rc<MemberSymbol>> {
24+
&self.primary_keys_dimensions
25+
}
26+
pub fn filter(&self) -> &Rc<LogicalFilter> {
27+
&self.filter
28+
}
29+
pub fn source(&self) -> &Rc<LogicalJoin> {
30+
&self.source
31+
}
1332
}
1433

1534
impl LogicalNode for KeysSubQuery {

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/logical_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::*;
22
use cubenativeutils::CubeError;
33
use std::rc::Rc;
44

5-
pub trait LogicalSource: Sized {
5+
pub trait LogicalSource: Sized + PrettyPrint {
66
fn as_plan_node(&self) -> PlanNode;
77
fn with_plan_node(&self, plan_node: PlanNode) -> Result<Self, CubeError>;
88
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#[macro_use]
2+
mod logical_source;
13
mod aggregate_multiplied_subquery;
24
mod calc_groups_cross_join;
35
mod cube;
@@ -8,7 +10,6 @@ mod join;
810
mod keys_subquery;
911
mod logical_node;
1012
mod logical_query_modifers;
11-
mod logical_source;
1213
mod measure_subquery;
1314
mod multistage;
1415
pub mod optimizers;

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl PreAggregationOptimizer {
4949
query: Rc<Query>,
5050
pre_aggregation: &Rc<CompiledPreAggregation>,
5151
) -> Result<Option<Rc<Query>>, CubeError> {
52-
if query.multistage_members.is_empty() {
52+
if query.multistage_members().is_empty() {
5353
self.try_rewrite_simple_query(&query, pre_aggregation)
5454
} else if !self.allow_multi_stage {
5555
Ok(None)
@@ -63,10 +63,9 @@ impl PreAggregationOptimizer {
6363
query: &Rc<Query>,
6464
pre_aggregation: &Rc<CompiledPreAggregation>,
6565
) -> Result<Option<Rc<Query>>, CubeError> {
66-
if self.is_schema_and_filters_match(&query.schema, &query.filter, pre_aggregation)? {
66+
if self.is_schema_and_filters_match(&query.schema(), &query.filter(), pre_aggregation)? {
6767
let mut new_query = query.as_ref().clone();
68-
new_query.source =
69-
QuerySource::PreAggregation(self.make_pre_aggregation_source(pre_aggregation)?);
68+
new_query.set_source(self.make_pre_aggregation_source(pre_aggregation)?.into());
7069
Ok(Some(Rc::new(new_query)))
7170
} else {
7271
Ok(None)
@@ -82,7 +81,7 @@ impl PreAggregationOptimizer {
8281
let mut has_unrewritten_leaf = false;
8382

8483
let mut rewritten_multistages = Vec::new();
85-
for multi_stage in &query.multistage_members {
84+
for multi_stage in query.multistage_members() {
8685
let rewritten = rewriter.rewrite_top_down_with(multi_stage.clone(), |plan_node| {
8786
let res = match plan_node {
8887
PlanNode::MultiStageLeafMeasure(multi_stage_leaf_measure) => {
@@ -119,7 +118,7 @@ impl PreAggregationOptimizer {
119118
return Ok(None);
120119
}
121120

122-
let source = if let QuerySource::FullKeyAggregate(full_key_aggregate) = &query.source {
121+
let source = if let QuerySource::FullKeyAggregate(full_key_aggregate) = query.source() {
123122
let fk_source = if let Some(resolver_multiplied_measures) =
124123
&full_key_aggregate.multiplied_measures_resolver
125124
{
@@ -135,18 +134,17 @@ impl PreAggregationOptimizer {
135134
let pre_aggregation_source =
136135
self.make_pre_aggregation_source(pre_aggregation)?;
137136

138-
let pre_aggregation_query = Query {
139-
schema: resolver_multiplied_measures.schema.clone(),
140-
filter: resolver_multiplied_measures.filter.clone(),
141-
modifers: Rc::new(LogicalQueryModifiers {
137+
let pre_aggregation_query = Query::builder()
138+
.schema(resolver_multiplied_measures.schema.clone())
139+
.filter(resolver_multiplied_measures.filter.clone())
140+
.modifers(Rc::new(LogicalQueryModifiers {
142141
offset: None,
143142
limit: None,
144143
ungrouped: false,
145144
order_by: vec![],
146-
}),
147-
source: QuerySource::PreAggregation(pre_aggregation_source),
148-
multistage_members: vec![],
149-
};
145+
}))
146+
.source(pre_aggregation_source.into())
147+
.build();
150148
Some(ResolvedMultipliedMeasures::PreAggregation(Rc::new(
151149
pre_aggregation_query,
152150
)))
@@ -161,18 +159,18 @@ impl PreAggregationOptimizer {
161159
};
162160
let mut result = full_key_aggregate.as_ref().clone();
163161
result.multiplied_measures_resolver = fk_source;
164-
QuerySource::FullKeyAggregate(Rc::new(result))
162+
Rc::new(result).into()
165163
} else {
166-
query.source.clone()
164+
query.source().clone()
167165
};
168166

169-
let result = Query {
170-
multistage_members: rewritten_multistages,
171-
schema: query.schema.clone(),
172-
filter: query.filter.clone(),
173-
modifers: query.modifers.clone(),
174-
source,
175-
};
167+
let result = Query::builder()
168+
.multistage_members(rewritten_multistages)
169+
.schema(query.schema().clone())
170+
.filter(query.filter().clone())
171+
.modifers(query.modifers().clone())
172+
.source(source)
173+
.build();
176174

177175
Ok(Some(Rc::new(result)))
178176
}

0 commit comments

Comments
 (0)