Skip to content

Commit 2eb339c

Browse files
committed
in work
1 parent a74ab72 commit 2eb339c

File tree

10 files changed

+342
-28
lines changed

10 files changed

+342
-28
lines changed

rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use super::context::PushDownBuilderContext;
2+
use super::{LogicalNodeProcessor, ProcessableNode};
13
use crate::logical_plan::*;
24
use crate::plan::schema::QualifiedColumnName;
35
use crate::plan::*;
@@ -60,6 +62,28 @@ impl PhysicalPlanBuilder {
6062
}
6163
}
6264

65+
pub(super) fn query_tools(&self) -> &Rc<QueryTools> {
66+
&self.query_tools
67+
}
68+
69+
pub(super) fn plan_sql_templates(&self) -> &PlanSqlTemplates {
70+
&self.plan_sql_templates
71+
}
72+
73+
pub(super) fn qtools_and_templates(&self) -> (&Rc<QueryTools>, &PlanSqlTemplates) {
74+
(&self.query_tools, &self.plan_sql_templates)
75+
}
76+
77+
pub(super) fn process_node<T: ProcessableNode>(
78+
&self,
79+
logical_node: &T,
80+
context: &PushDownBuilderContext,
81+
) -> Result<<T::ProcessorType<'_> as LogicalNodeProcessor<'_, T>>::PhysycalNode, CubeError>
82+
{
83+
let processor = T::ProcessorType::new(self);
84+
processor.process(logical_node, context)
85+
}
86+
6387
pub fn build(
6488
&self,
6589
logical_plan: Rc<Query>,
@@ -173,8 +197,9 @@ impl PhysicalPlanBuilder {
173197

174198
select_builder.set_filter(filter);
175199
select_builder.set_group_by(group_by);
176-
select_builder
177-
.set_order_by(self.make_order_by(&logical_plan.schema, &logical_plan.modifers.order_by)?);
200+
select_builder.set_order_by(
201+
self.make_order_by(&logical_plan.schema, &logical_plan.modifers.order_by)?,
202+
);
178203
select_builder.set_having(having);
179204
select_builder.set_limit(logical_plan.modifers.limit);
180205
select_builder.set_offset(logical_plan.modifers.offset);
@@ -335,7 +360,6 @@ impl PhysicalPlanBuilder {
335360
&references_builder,
336361
&mut render_references,
337362
joins_len,
338-
context,
339363
)?;
340364

341365
for measure in logical_plan.schema.measures.iter() {
@@ -361,8 +385,9 @@ impl PhysicalPlanBuilder {
361385
Some(filter)
362386
};
363387

364-
select_builder
365-
.set_order_by(self.make_order_by(&logical_plan.schema, &logical_plan.modifers.order_by)?);
388+
select_builder.set_order_by(
389+
self.make_order_by(&logical_plan.schema, &logical_plan.modifers.order_by)?,
390+
);
366391
select_builder.set_filter(having);
367392
select_builder.set_limit(logical_plan.modifers.limit);
368393
select_builder.set_offset(logical_plan.modifers.offset);
@@ -375,15 +400,14 @@ impl PhysicalPlanBuilder {
375400
}
376401

377402
//FIXME refactor required
378-
fn process_full_key_aggregate_dimensions(
403+
pub(super) fn process_full_key_aggregate_dimensions(
379404
&self,
380405
dimensions: &Vec<Rc<MemberSymbol>>,
381406
full_key_aggregate: &Rc<FullKeyAggregate>,
382407
select_builder: &mut SelectBuilder,
383408
references_builder: &ReferencesBuilder,
384409
render_references: &mut HashMap<String, QualifiedColumnName>,
385410
joins_len: usize,
386-
_context: &PhysicalPlanBuilderContext,
387411
) -> Result<(), CubeError> {
388412
let dimensions_for_join_names = full_key_aggregate
389413
.join_dimensions
@@ -910,7 +934,7 @@ impl PhysicalPlanBuilder {
910934
Ok(res)
911935
}
912936

913-
fn make_order_by(
937+
pub(crate) fn make_order_by(
914938
&self,
915939
logical_schema: &LogicalSchema,
916940
order_by: &Vec<OrderByItem>,
@@ -1257,7 +1281,6 @@ impl PhysicalPlanBuilder {
12571281
&references_builder,
12581282
&mut render_references,
12591283
joins_len,
1260-
context,
12611284
)?;
12621285

12631286
for measure in measure_calculation.schema.measures.iter() {

rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/context.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,3 @@ impl PushDownBuilderContext {
2323
}
2424
}
2525

26-
#[derive(Clone, Debug, Default)]
27-
pub(super) struct PullUpBuilderContext {
28-
pub render_references: HashMap<String, QualifiedColumnName>,
29-
pub measure_references: HashMap<String, QualifiedColumnName>,
30-
pub dimensions_references: HashMap<String, QualifiedColumnName>,
31-
pub aliases: HashMap<String, String>,
32-
}
Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
1-
use super::context::*;
1+
use crate::physical_plan_builder::PhysicalPlanBuilder;
2+
3+
use super::context::PushDownBuilderContext;
24
use cubenativeutils::CubeError;
3-
use std::rc::Rc;
45

5-
pub(super) trait LogicalNodeProcessor {
6-
type LogicalNode;
6+
pub(super) trait LogicalNodeProcessor<'a, LogicalNode> {
77
type PhysycalNode;
8+
fn new(builder: &'a PhysicalPlanBuilder) -> Self;
89
fn process(
9-
logical_plan: &Rc<Self::LogicalNode>,
10+
&self,
11+
logical_plan: &LogicalNode,
1012
context: &PushDownBuilderContext,
11-
) -> Result<(Self::PhysycalNode, PullUpBuilderContext), CubeError>;
13+
) -> Result<Self::PhysycalNode, CubeError>;
14+
}
15+
16+
pub(super) trait ProcessableNode: Sized {
17+
type ProcessorType<'a>: LogicalNodeProcessor<'a, Self>;
1218
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext};
2+
use crate::logical_plan::{FullKeyAggregate, SimpleQuery, SimpleQuerySource};
3+
use crate::physical_plan_builder::PhysicalPlanBuilder;
4+
use crate::plan::{Expr, Filter, MemberExpression, QueryPlan, Select, SelectBuilder};
5+
use crate::planner::query_tools::QueryTools;
6+
use crate::planner::sql_templates::PlanSqlTemplates;
7+
use crate::planner::{BaseMember, MemberSymbolRef};
8+
use cubenativeutils::CubeError;
9+
use std::collections::HashMap;
10+
use std::rc::Rc;
11+
12+
pub struct FullKeyAggregateProcessor<'a> {
13+
builder: &'a PhysicalPlanBuilder,
14+
}
15+
16+
impl<'a> LogicalNodeProcessor<'a, FullKeyAggregate> for FullKeyAggregateProcessor<'a> {
17+
type PhysycalNode = Rc<Select>;
18+
fn new(builder: &'a PhysicalPlanBuilder) -> Self {
19+
Self { builder }
20+
}
21+
22+
fn process(
23+
&self,
24+
logical_plan: &FullKeyAggregate,
25+
context: &PushDownBuilderContext,
26+
) -> Result<Self::PhysycalNode, CubeError> {
27+
let query_tools = self.builder.query_tools();
28+
todo!()
29+
}
30+
}
31+
32+
impl ProcessableNode for FullKeyAggregate {
33+
type ProcessorType<'a> = FullKeyAggregateProcessor<'a>;
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext};
2+
use crate::logical_plan::{FullKeyAggregateQuery, SimpleQuery, SimpleQuerySource};
3+
use crate::physical_plan_builder::PhysicalPlanBuilder;
4+
use crate::plan::{Expr, Filter, MemberExpression, QueryPlan, Select, SelectBuilder};
5+
use crate::planner::query_tools::QueryTools;
6+
use crate::planner::sql_templates::PlanSqlTemplates;
7+
use crate::planner::{BaseMember, MemberSymbolRef};
8+
use cubenativeutils::CubeError;
9+
use std::collections::HashMap;
10+
use std::rc::Rc;
11+
12+
pub struct FullKeyAggregateQueryProcessor<'a> {
13+
builder: &'a PhysicalPlanBuilder,
14+
}
15+
16+
impl<'a> LogicalNodeProcessor<'a, FullKeyAggregateQuery> for FullKeyAggregateQueryProcessor<'a> {
17+
type PhysycalNode = Rc<Select>;
18+
fn new(builder: &'a PhysicalPlanBuilder) -> Self {
19+
Self { builder }
20+
}
21+
22+
fn process(
23+
&self,
24+
logical_plan: &FullKeyAggregateQuery,
25+
context: &PushDownBuilderContext,
26+
) -> Result<Self::PhysycalNode, CubeError> {
27+
let query_tools = self.builder.query_tools();
28+
todo!()
29+
}
30+
}
31+
32+
impl ProcessableNode for FullKeyAggregateQuery {
33+
type ProcessorType<'a> = FullKeyAggregateQueryProcessor<'a>;
34+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use super::super::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext};
2+
use crate::logical_plan::{CubeJoinItem, LogicalJoin, LogicalJoinItem, SimpleQuery};
3+
use crate::physical_plan_builder::PhysicalPlanBuilder;
4+
use crate::plan::{From, JoinBuilder, JoinCondition, QueryPlan, Select};
5+
use crate::planner::query_tools::QueryTools;
6+
use crate::planner::sql_templates::PlanSqlTemplates;
7+
use crate::planner::SqlJoinCondition;
8+
use cubenativeutils::CubeError;
9+
use std::rc::Rc;
10+
11+
pub struct LogicalJoinProcessor<'a> {
12+
builder: &'a PhysicalPlanBuilder,
13+
}
14+
15+
impl<'a> LogicalNodeProcessor<'a, LogicalJoin> for LogicalJoinProcessor<'a> {
16+
type PhysycalNode = Rc<From>;
17+
fn new(builder: &'a PhysicalPlanBuilder) -> Self {
18+
Self { builder }
19+
}
20+
21+
fn process(
22+
&self,
23+
logical_join: &LogicalJoin,
24+
context: &PushDownBuilderContext,
25+
) -> Result<Self::PhysycalNode, CubeError> {
26+
let query_tools = self.builder.query_tools();
27+
let root = logical_join.root.cube.clone();
28+
if logical_join.joins.is_empty() {
29+
//&& dimension_subqueries.is_empty() {
30+
Ok(From::new_from_cube(
31+
root.clone(),
32+
Some(root.default_alias_with_prefix(&context.alias_prefix)),
33+
))
34+
} else {
35+
let mut join_builder = JoinBuilder::new_from_cube(
36+
root.clone(),
37+
Some(root.default_alias_with_prefix(&context.alias_prefix)),
38+
);
39+
/* for dimension_subquery in dimension_subqueries //TODO move dimension_subquery to
40+
* LogicalJoin mode
41+
.iter()
42+
.filter(|d| &d.subquery_dimension.cube_name() == root.name())
43+
{
44+
self.add_subquery_join(
45+
dimension_subquery.clone(),
46+
&mut join_builder,
47+
render_references,
48+
context,
49+
)?;
50+
} */
51+
for join in logical_join.joins.iter() {
52+
match join {
53+
LogicalJoinItem::CubeJoinItem(CubeJoinItem { cube, on_sql }) => {
54+
join_builder.left_join_cube(
55+
cube.cube.clone(),
56+
Some(cube.cube.default_alias_with_prefix(&context.alias_prefix)),
57+
JoinCondition::new_base_join(SqlJoinCondition::try_new(
58+
query_tools.clone(),
59+
on_sql.clone(),
60+
)?),
61+
);
62+
/* for dimension_subquery in dimension_subqueries
63+
.iter()
64+
.filter(|d| &d.subquery_dimension.cube_name() == cube.cube.name())
65+
{
66+
self.add_subquery_join(
67+
dimension_subquery.clone(),
68+
&mut join_builder,
69+
render_references,
70+
context,
71+
)?;
72+
} */
73+
}
74+
}
75+
}
76+
Ok(From::new_from_join(join_builder.build()))
77+
}
78+
}
79+
}
80+
81+
impl ProcessableNode for LogicalJoin {
82+
type ProcessorType<'a> = LogicalJoinProcessor<'a>;
83+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,11 @@
1+
mod full_key_aggregate;
2+
mod full_key_aggregate_query;
3+
mod logical_join;
14
mod multi_stage_measure_calculation;
5+
mod simple_query;
6+
7+
pub(super) use full_key_aggregate::*;
8+
pub(super) use full_key_aggregate_query::*;
9+
pub(super) use logical_join::*;
210
pub(super) use multi_stage_measure_calculation::*;
11+
pub(super) use simple_query::*;
Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,35 @@
1-
use super::super::{LogicalNodeProcessor, PullUpBuilderContext, PushDownBuilderContext};
1+
use super::super::{LogicalNodeProcessor, ProcessableNode};
2+
use super::super::context::PushDownBuilderContext;
23
use crate::logical_plan::MultiStageMeasureCalculation;
4+
use crate::physical_plan_builder::PhysicalPlanBuilder;
35
use crate::plan::QueryPlan;
6+
use crate::planner::query_tools::QueryTools;
7+
use crate::planner::sql_templates::PlanSqlTemplates;
48
use cubenativeutils::CubeError;
59
use std::rc::Rc;
610

7-
pub struct MultiStageMeasureCalculationProcessor;
11+
pub struct MultiStageMeasureCalculationProcessor<'a> {
12+
builder: &'a PhysicalPlanBuilder,
13+
}
814

9-
impl LogicalNodeProcessor for MultiStageMeasureCalculationProcessor {
10-
type LogicalNode = MultiStageMeasureCalculation;
15+
impl<'a> LogicalNodeProcessor<'a, MultiStageMeasureCalculation>
16+
for MultiStageMeasureCalculationProcessor<'a>
17+
{
1118
type PhysycalNode = QueryPlan;
19+
fn new(builder: &'a PhysicalPlanBuilder) -> Self {
20+
Self { builder }
21+
}
1222

1323
fn process(
14-
logical_plan: &Rc<Self::LogicalNode>,
24+
&self,
25+
logical_plan: &MultiStageMeasureCalculation,
1526
context: &PushDownBuilderContext,
16-
) -> Result<(Self::PhysycalNode, PullUpBuilderContext), CubeError> {
27+
) -> Result<Self::PhysycalNode, CubeError> {
1728
todo!()
1829
}
1930
}
31+
32+
impl ProcessableNode for MultiStageMeasureCalculation {
33+
type ProcessorType<'a> = MultiStageMeasureCalculationProcessor<'a>;
34+
}
35+

0 commit comments

Comments
 (0)