Skip to content

Commit cec6203

Browse files
committed
feat(tesseract): Full outer join support
1 parent 69b2cb3 commit cec6203

File tree

8 files changed

+442
-165
lines changed

8 files changed

+442
-165
lines changed

rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::context::PushDownBuilderContext;
22
use super::{LogicalNodeProcessor, ProcessableNode};
33
use crate::logical_plan::*;
44
use crate::physical_plan_builder::context::MultiStageDimensionContext;
5+
use crate::plan::join::JoinType;
56
use crate::plan::schema::QualifiedColumnName;
67
use crate::plan::*;
78
use crate::planner::query_properties::OrderByItem;
@@ -39,6 +40,10 @@ impl PhysicalPlanBuilder {
3940
(&self.query_tools, &self.plan_sql_templates)
4041
}
4142

43+
pub(super) fn templates(&self) -> &PlanSqlTemplates {
44+
&self.plan_sql_templates
45+
}
46+
4247
pub(super) fn process_node<T: ProcessableNode>(
4348
&self,
4449
logical_node: &T,
@@ -214,4 +219,59 @@ impl PhysicalPlanBuilder {
214219
}
215220
Ok(result)
216221
}
222+
223+
pub(super) fn process_query_dimension(
224+
&self,
225+
dimension: &Rc<MemberSymbol>,
226+
references_builder: &ReferencesBuilder,
227+
select_builder: &mut SelectBuilder,
228+
context_factory: &mut SqlNodesFactory,
229+
context: &PushDownBuilderContext,
230+
) -> Result<(), CubeError> {
231+
if let Some(coalesce_ref) = self.dimension_coalesce_refs(dimension, select_builder.from()) {
232+
select_builder.add_projection_coalesce_member(dimension, coalesce_ref, None)?;
233+
} else {
234+
references_builder.resolve_references_for_member(
235+
dimension.clone(),
236+
&None,
237+
context_factory.render_references_mut(),
238+
)?;
239+
if context.measure_subquery {
240+
select_builder.add_projection_member_without_schema(dimension, None);
241+
} else {
242+
select_builder.add_projection_member(dimension, None);
243+
}
244+
}
245+
Ok(())
246+
}
247+
248+
fn dimension_coalesce_refs(
249+
&self,
250+
dimension: &Rc<MemberSymbol>,
251+
from: &Rc<From>,
252+
) -> Option<Vec<QualifiedColumnName>> {
253+
match &from.source {
254+
FromSource::Join(join) => {
255+
if join.joins.iter().any(|i| i.join_type == JoinType::Full) {
256+
let mut result = vec![];
257+
let dim_alias = join.root.source.schema().resolve_member_alias(dimension);
258+
result.push(QualifiedColumnName::new(
259+
Some(join.root.alias.clone()),
260+
dim_alias,
261+
));
262+
for item in join.joins.iter() {
263+
let dim_alias = item.from.source.schema().resolve_member_alias(dimension);
264+
result.push(QualifiedColumnName::new(
265+
Some(item.from.alias.clone()),
266+
dim_alias,
267+
));
268+
}
269+
Some(result)
270+
} else {
271+
None
272+
}
273+
}
274+
_ => None,
275+
}
276+
}
217277
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
use super::FullKeyAggregateStrategy;
2+
use crate::logical_plan::{FullKeyAggregate, LogicalJoin, ResolvedMultipliedMeasures};
3+
use crate::physical_plan_builder::PhysicalPlanBuilder;
4+
use crate::physical_plan_builder::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext};
5+
use crate::plan::{
6+
Expr, From, FromSource, JoinBuilder, JoinCondition, QualifiedColumnName, Select, SelectBuilder,
7+
SingleAliasedSource, Union,
8+
};
9+
use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory;
10+
use crate::planner::sql_evaluator::{MemberSymbol, ReferencesBuilder};
11+
use cubenativeutils::CubeError;
12+
use itertools::Itertools;
13+
use std::rc::Rc;
14+
15+
pub(super) struct FullJoinFullKeyAggregateStrategy<'a> {
16+
builder: &'a PhysicalPlanBuilder,
17+
}
18+
19+
impl<'a> FullJoinFullKeyAggregateStrategy<'a> {
20+
pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc<Self> {
21+
Rc::new(Self { builder })
22+
}
23+
24+
fn full_join(
25+
&self,
26+
left_source: Rc<Select>,
27+
right_source: Rc<Select>,
28+
dimensions: &Vec<Rc<MemberSymbol>>,
29+
) -> Result<Rc<From>, CubeError> {
30+
let left_alias = "q_l".to_string();
31+
let right_alias = "q_r".to_string();
32+
let mut join_builder =
33+
JoinBuilder::new_from_subselect(left_source.clone(), left_alias.clone());
34+
35+
let conditions = dimensions
36+
.iter()
37+
.map(|dim| -> Result<_, CubeError> {
38+
let alias_in_left_query = left_source.schema().resolve_member_alias(dim);
39+
let left_query_ref = Expr::Reference(QualifiedColumnName::new(
40+
Some(left_alias.clone()),
41+
alias_in_left_query,
42+
));
43+
let alias_in_right_query = right_source.schema().resolve_member_alias(dim);
44+
let right_query_ref = Expr::Reference(QualifiedColumnName::new(
45+
Some(right_alias.clone()),
46+
alias_in_right_query,
47+
));
48+
49+
Ok(vec![(left_query_ref, right_query_ref)])
50+
})
51+
.collect::<Result<Vec<_>, _>>()?;
52+
53+
join_builder.full_join_subselect(
54+
right_source.clone(),
55+
right_alias.clone(),
56+
JoinCondition::new_dimension_join(conditions, true),
57+
);
58+
let result = join_builder.build();
59+
Ok(From::new_from_join(result))
60+
}
61+
62+
fn select_over_join_pair(
63+
&self,
64+
from: Rc<From>,
65+
dimensions: &Vec<Rc<MemberSymbol>>,
66+
measures: &Vec<Rc<MemberSymbol>>,
67+
context: &PushDownBuilderContext,
68+
) -> Result<Rc<Select>, CubeError> {
69+
let query_tools = self.builder.query_tools();
70+
let mut context_factory = SqlNodesFactory::new();
71+
let references_builder = ReferencesBuilder::new(from.clone());
72+
let mut select_builder = SelectBuilder::new(from);
73+
for dimension in dimensions.iter() {
74+
self.builder.process_query_dimension(
75+
dimension,
76+
&references_builder,
77+
&mut select_builder,
78+
&mut context_factory,
79+
&context,
80+
)?;
81+
}
82+
83+
for measure in measures {
84+
references_builder.resolve_references_for_member(
85+
measure.clone(),
86+
&None,
87+
context_factory.render_references_mut(),
88+
)?;
89+
select_builder.add_projection_member(&measure, None);
90+
}
91+
let res = Rc::new(select_builder.build(query_tools.clone(), context_factory));
92+
Ok(res)
93+
}
94+
}
95+
96+
impl FullKeyAggregateStrategy for FullJoinFullKeyAggregateStrategy<'_> {
97+
fn process(
98+
&self,
99+
full_key_aggregate: &FullKeyAggregate,
100+
context: &PushDownBuilderContext,
101+
) -> Result<Rc<From>, CubeError> {
102+
let query_tools = self.builder.query_tools();
103+
let mut data_queries = vec![];
104+
if let Some(resolved_multiplied_measures) =
105+
full_key_aggregate.multiplied_measures_resolver()
106+
{
107+
match resolved_multiplied_measures {
108+
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(
109+
resolve_multiplied_measures,
110+
) => {
111+
for regular_measure_query in resolve_multiplied_measures
112+
.regular_measure_subqueries
113+
.iter()
114+
{
115+
let query = self
116+
.builder
117+
.process_node(regular_measure_query.as_ref(), &context)?;
118+
data_queries.push((query, regular_measure_query.schema().measures.clone()));
119+
}
120+
for multiplied_measure_query in resolve_multiplied_measures
121+
.aggregate_multiplied_subqueries
122+
.iter()
123+
{
124+
let query = self
125+
.builder
126+
.process_node(multiplied_measure_query.as_ref(), &context)?;
127+
data_queries
128+
.push((query, multiplied_measure_query.schema.measures.clone()));
129+
}
130+
}
131+
ResolvedMultipliedMeasures::PreAggregation(pre_agg_query) => {
132+
let query = self
133+
.builder
134+
.process_node(pre_agg_query.as_ref(), &context)?;
135+
data_queries.push((query, pre_agg_query.schema().measures.clone()));
136+
}
137+
}
138+
}
139+
140+
for multi_stage_ref in full_key_aggregate.multi_stage_subquery_refs().iter() {
141+
let multi_stage_schema = context.get_multi_stage_schema(multi_stage_ref.name())?;
142+
let multi_stage_source = SingleAliasedSource::new_from_table_reference(
143+
multi_stage_ref.name().clone(),
144+
multi_stage_schema.clone(),
145+
None,
146+
);
147+
let sql_context = SqlNodesFactory::new();
148+
149+
let data_select_builder =
150+
SelectBuilder::new(From::new(FromSource::Single(multi_stage_source)));
151+
let data_select = Rc::new(data_select_builder.build(query_tools.clone(), sql_context));
152+
data_queries.push((data_select, multi_stage_ref.symbols().clone()));
153+
}
154+
155+
if data_queries.is_empty() {
156+
let empty_join = LogicalJoin::builder().build();
157+
return self.builder.process_node(&empty_join, context);
158+
}
159+
160+
if data_queries.len() == 1 {
161+
let (select, _) = data_queries[0].clone();
162+
let result = From::new_from_subselect(select, "fk_aggregate".to_string());
163+
return Ok(result);
164+
}
165+
166+
let dimensions = full_key_aggregate
167+
.schema()
168+
.all_dimensions()
169+
.cloned()
170+
.collect_vec();
171+
let mut measures = vec![];
172+
173+
let mut queries_iter = data_queries.into_iter();
174+
let (left_query, mut query_measures) = queries_iter.next().unwrap();
175+
measures.append(&mut query_measures);
176+
let (right_query, mut query_measures) = queries_iter.next().unwrap();
177+
measures.append(&mut query_measures);
178+
let mut result = self.full_join(left_query, right_query, &dimensions)?;
179+
for (query, mut query_measures) in queries_iter {
180+
let left_query = self.select_over_join_pair(result, &dimensions, &measures, context)?;
181+
result = self.full_join(left_query, query, &dimensions)?;
182+
measures.append(&mut query_measures);
183+
}
184+
185+
Ok(result)
186+
}
187+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use crate::physical_plan_builder::{LogicalNodeProcessor, ProcessableNode, PushDownBuilderContext};
2+
use crate::logical_plan::{FullKeyAggregate, LogicalJoin, ResolvedMultipliedMeasures};
3+
use crate::physical_plan_builder::PhysicalPlanBuilder;
4+
use crate::plan::{
5+
Expr, From, FromSource, JoinBuilder, JoinCondition, QualifiedColumnName, SelectBuilder,
6+
SingleAliasedSource, Union,
7+
};
8+
use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory;
9+
use crate::planner::sql_evaluator::ReferencesBuilder;
10+
use cubenativeutils::CubeError;
11+
use std::rc::Rc;
12+
use super::FullKeyAggregateStrategy;
13+
14+
pub(super) struct InnerJoinFullKeyAggregateStrategy<'a> {
15+
builder: &'a PhysicalPlanBuilder,
16+
}
17+
18+
impl<'a> InnerJoinFullKeyAggregateStrategy<'a> {
19+
pub fn new(builder: &'a PhysicalPlanBuilder) -> Rc<Self> {
20+
Rc::new(Self { builder })
21+
}
22+
}
23+
24+
impl FullKeyAggregateStrategy for InnerJoinFullKeyAggregateStrategy<'_> {
25+
fn process(
26+
&self,
27+
full_key_aggregate: &FullKeyAggregate,
28+
context: &PushDownBuilderContext,
29+
) -> Result<Rc<From>, CubeError> {
30+
let query_tools = self.builder.query_tools();
31+
let mut data_queries = vec![];
32+
if let Some(resolved_multiplied_measures) =
33+
full_key_aggregate.multiplied_measures_resolver()
34+
{
35+
match resolved_multiplied_measures {
36+
ResolvedMultipliedMeasures::ResolveMultipliedMeasures(
37+
resolve_multiplied_measures,
38+
) => {
39+
for regular_measure_query in resolve_multiplied_measures
40+
.regular_measure_subqueries
41+
.iter()
42+
{
43+
let query = self
44+
.builder
45+
.process_node(regular_measure_query.as_ref(), &context)?;
46+
data_queries.push(query);
47+
}
48+
for multiplied_measure_query in resolve_multiplied_measures
49+
.aggregate_multiplied_subqueries
50+
.iter()
51+
{
52+
let query = self
53+
.builder
54+
.process_node(multiplied_measure_query.as_ref(), &context)?;
55+
data_queries.push(query);
56+
}
57+
}
58+
ResolvedMultipliedMeasures::PreAggregation(pre_agg_query) => {
59+
let query = self
60+
.builder
61+
.process_node(pre_agg_query.as_ref(), &context)?;
62+
data_queries.push(query);
63+
}
64+
}
65+
}
66+
67+
for multi_stage_ref in full_key_aggregate.multi_stage_subquery_refs().iter() {
68+
let multi_stage_schema = context.get_multi_stage_schema(multi_stage_ref.name())?;
69+
let multi_stage_source = SingleAliasedSource::new_from_table_reference(
70+
multi_stage_ref.name().clone(),
71+
multi_stage_schema.clone(),
72+
None,
73+
);
74+
let sql_context = SqlNodesFactory::new();
75+
76+
let data_select_builder =
77+
SelectBuilder::new(From::new(FromSource::Single(multi_stage_source)));
78+
let data_select = Rc::new(data_select_builder.build(query_tools.clone(), sql_context));
79+
data_queries.push(data_select);
80+
}
81+
82+
if data_queries.is_empty() {
83+
let empty_join = LogicalJoin::builder().build();
84+
return self.builder.process_node(&empty_join, context);
85+
}
86+
87+
if data_queries.len() == 1 {
88+
let select = data_queries[0].clone();
89+
let result = From::new_from_subselect(select, "fk_aggregate".to_string());
90+
return Ok(result);
91+
}
92+
93+
let mut join_builder =
94+
JoinBuilder::new_from_subselect(data_queries[0].clone(), "q_0".to_string());
95+
96+
for (i, query) in data_queries.iter().skip(1).enumerate() {
97+
let prev_alias = format!("q_{}", i);
98+
let query_alias = format!("q_{}", i + 1);
99+
let conditions = full_key_aggregate
100+
.schema()
101+
.all_dimensions()
102+
.map(|dim| -> Result<_, CubeError> {
103+
let alias_in_prev_query = data_queries[i].schema().resolve_member_alias(dim);
104+
let prev_query_ref = Expr::Reference(QualifiedColumnName::new(
105+
Some(prev_alias.clone()),
106+
alias_in_prev_query,
107+
));
108+
let alias_in_data_query = query.schema().resolve_member_alias(dim);
109+
let data_query_ref = Expr::Reference(QualifiedColumnName::new(
110+
Some(query_alias.clone()),
111+
alias_in_data_query,
112+
));
113+
114+
Ok(vec![(prev_query_ref, data_query_ref)])
115+
})
116+
.collect::<Result<Vec<_>, _>>()?;
117+
118+
join_builder.inner_join_subselect(
119+
query.clone(),
120+
query_alias.clone(),
121+
JoinCondition::new_dimension_join(conditions, true),
122+
);
123+
}
124+
125+
let result = join_builder.build();
126+
Ok(From::new_from_join(result))
127+
}
128+
}

0 commit comments

Comments
 (0)