Skip to content

Commit 369702e

Browse files
author
longshan.lu
committed
refactor: Streamline optimizer rules by simplifying filter handling and enhancing type coercion logic
1 parent b92e227 commit 369702e

File tree

3 files changed

+106
-131
lines changed

3 files changed

+106
-131
lines changed

qurious/src/optimizer/rule/extract_equijoin_predicate.rs

Lines changed: 31 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use crate::common::table_schema::TableSchemaRef;
2-
use crate::common::transformed::{TransformNode, Transformed, TransformedResult};
32
use crate::error::Result;
4-
use crate::logical::expr::Column;
53
use crate::utils::expr::{check_all_columns_from_schema, split_conjunctive_predicates};
64
use crate::{
75
datatypes::operator::Operator,
@@ -11,7 +9,6 @@ use crate::{
119
},
1210
optimizer::rule::rule_optimizer::OptimizerRule,
1311
};
14-
use std::collections::HashSet;
1512

1613
/// Extract equijoin predicate from join filter.
1714
/// extract equijoin predicate from join filter and treat equijoin predicate specially.
@@ -23,36 +20,33 @@ impl OptimizerRule for ExtractEquijoinPredicate {
2320
}
2421

2522
fn rewrite(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
26-
plan.transform_up(|plan| {
27-
match plan {
28-
LogicalPlan::Join(Join {
29-
left,
30-
right,
31-
join_type,
32-
mut on,
33-
filter: Some(filter),
34-
schema,
35-
}) => {
36-
// Extract equijoin predicates from filter
37-
let (equijoin_predicates, remaining_filter) =
38-
extract_equijoin_predicates(filter, left.table_schema(), right.table_schema());
39-
40-
// Combine existing join conditions with extracted equijoin predicates
41-
on.extend(equijoin_predicates);
42-
43-
Ok(Transformed::yes(LogicalPlan::Join(Join {
44-
left,
45-
right,
46-
join_type,
47-
on,
48-
filter: remaining_filter,
49-
schema,
50-
})))
51-
}
52-
_ => Ok(Transformed::no(plan)),
53-
}
54-
})
55-
.data()
23+
let LogicalPlan::Join(Join {
24+
left,
25+
right,
26+
join_type,
27+
mut on,
28+
filter: Some(filter),
29+
schema,
30+
}) = plan
31+
else {
32+
return Ok(plan);
33+
};
34+
35+
// Extract equijoin predicates from filter
36+
let (equijoin_predicates, remaining_filter) =
37+
extract_equijoin_predicates(filter, left.table_schema(), right.table_schema());
38+
39+
// Combine existing join conditions with extracted equijoin predicates
40+
on.extend(equijoin_predicates);
41+
42+
Ok(LogicalPlan::Join(Join {
43+
left,
44+
right,
45+
join_type,
46+
on,
47+
filter: remaining_filter,
48+
schema,
49+
}))
5650
}
5751
}
5852

@@ -110,30 +104,16 @@ fn extract_equijoin_predicates(
110104
)
111105
}
112106

113-
114-
115107
#[cfg(test)]
116108
mod tests {
117109
use super::*;
118-
use crate::{optimizer::rule::rule_optimizer::OptimizerRule, test_utils::sql_to_plan, utils};
119-
120-
fn assert_after_optimizer(sql: &str, expected: Vec<&str>) {
121-
let plan = sql_to_plan(sql);
122-
let optimizer = ExtractEquijoinPredicate;
123-
let plan = optimizer.rewrite(plan).unwrap();
124-
let actual = utils::format(&plan, 0);
125-
let actual = actual.trim().lines().collect::<Vec<_>>();
126-
127-
assert_eq!(
128-
expected, actual,
129-
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
130-
);
131-
}
110+
use crate::test_utils::assert_after_optimizer;
132111

133112
#[test]
134113
fn test_extract_equijoin_predicate_from_filter() {
135114
assert_after_optimizer(
136115
"SELECT * FROM users INNER JOIN repos ON users.id = repos.owner_id WHERE users.id = repos.id AND users.name = 'test'",
116+
Box::new(ExtractEquijoinPredicate),
137117
vec![
138118
"Projection: (users.email, repos.id, users.id, repos.name, users.name, repos.owner_id)",
139119
" Filter: users.id = repos.id AND users.name = Utf8('test')",
@@ -148,6 +128,7 @@ mod tests {
148128
fn test_no_equijoin_predicates_in_filter() {
149129
assert_after_optimizer(
150130
"SELECT * FROM users INNER JOIN repos ON users.id > repos.owner_id WHERE users.name = 'test' AND repos.name = 'repo'",
131+
Box::new(ExtractEquijoinPredicate),
151132
vec![
152133
"Projection: (users.email, repos.id, users.id, repos.name, users.name, repos.owner_id)",
153134
" Filter: users.name = Utf8('test') AND repos.name = Utf8('repo')",
@@ -162,6 +143,7 @@ mod tests {
162143
fn test_multiple_equijoin_predicates() {
163144
assert_after_optimizer(
164145
"SELECT * FROM users INNER JOIN repos ON users.id = repos.owner_id AND users.name = repos.name WHERE users.id = repos.id AND users.email = 'test@example.com'",
146+
Box::new(ExtractEquijoinPredicate),
165147
vec![
166148
"Projection: (users.email, repos.id, users.id, repos.name, users.name, repos.owner_id)",
167149
" Filter: users.id = repos.id AND users.email = Utf8('test@example.com')",

qurious/src/optimizer/rule/scalar_subquery_to_join.rs

Lines changed: 62 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -39,74 +39,71 @@ impl OptimizerRule for ScalarSubqueryToJoin {
3939
}
4040

4141
fn rewrite(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
42-
let new_plan = plan
43-
.transform(|plan| match plan {
44-
LogicalPlan::Filter(filter) => {
45-
if !contains_scalar_subquery(&filter.expr) {
46-
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
47-
}
48-
49-
let (subqueries, rewritten_expr) = extract_subquery_exprs(filter.expr.clone(), &self.id_generator)?;
50-
let mut cur_input = filter.input.as_ref().clone();
51-
52-
// iterate through all subqueries in predicate, turning each into a left join
53-
for (subquery, subquery_alias) in subqueries {
54-
let (correlated_exprs, new_subquery_plan) =
55-
find_correlated_exprs(subquery.subquery.as_ref().clone())?;
56-
57-
let new_subquery_plan = LogicalPlanBuilder::from(new_subquery_plan)
58-
.alias(&subquery_alias)?
59-
.build();
60-
61-
cur_input = match filter.input.as_ref() {
62-
LogicalPlan::EmptyRelation(_) => new_subquery_plan,
63-
_ => {
64-
let mut all_correlated_cols = BTreeSet::new();
65-
correlated_exprs
66-
.correlated_subquery_cols_map
67-
.values()
68-
.for_each(|cols| all_correlated_cols.extend(cols.clone()));
69-
70-
let join_filter = correlated_exprs
71-
.join_filters
72-
.into_iter()
73-
.reduce(LogicalExpr::and)
74-
.map_or(Ok(Some(LogicalExpr::Literal(true.into()))), |expr| {
75-
expr.transform(|expr| match expr {
76-
LogicalExpr::Column(col) if all_correlated_cols.contains(&col) => {
77-
Ok(Transformed::yes(LogicalExpr::Column(
78-
col.with_relation(subquery_alias.clone()),
79-
)))
80-
}
81-
_ => Ok(Transformed::no(expr)),
82-
})
83-
.data()
84-
.map(Some)
85-
})?;
86-
87-
LogicalPlanBuilder::from(cur_input)
88-
.join_on(new_subquery_plan, JoinType::Left, join_filter)?
89-
.build()
90-
}
91-
};
92-
}
42+
let plan = match plan {
43+
LogicalPlan::Filter(filter) => {
44+
if !contains_scalar_subquery(&filter.expr) {
45+
return Ok(LogicalPlan::Filter(filter));
46+
}
9347

94-
Ok(Transformed::yes(LogicalPlanBuilder::filter(cur_input, rewritten_expr)?))
48+
let (subqueries, rewritten_expr) = extract_subquery_exprs(filter.expr.clone(), &self.id_generator)?;
49+
let mut cur_input = filter.input.as_ref().clone();
50+
51+
// iterate through all subqueries in predicate, turning each into a left join
52+
for (subquery, subquery_alias) in subqueries {
53+
let (correlated_exprs, new_subquery_plan) =
54+
find_correlated_exprs(subquery.subquery.as_ref().clone())?;
55+
56+
let new_subquery_plan = LogicalPlanBuilder::from(new_subquery_plan)
57+
.alias(&subquery_alias)?
58+
.build();
59+
60+
cur_input = match filter.input.as_ref() {
61+
LogicalPlan::EmptyRelation(_) => new_subquery_plan,
62+
_ => {
63+
let mut all_correlated_cols = BTreeSet::new();
64+
correlated_exprs
65+
.correlated_subquery_cols_map
66+
.values()
67+
.for_each(|cols| all_correlated_cols.extend(cols.clone()));
68+
69+
let join_filter = correlated_exprs
70+
.join_filters
71+
.into_iter()
72+
.reduce(LogicalExpr::and)
73+
.map_or(Ok(Some(LogicalExpr::Literal(true.into()))), |expr| {
74+
expr.transform(|expr| match expr {
75+
LogicalExpr::Column(col) if all_correlated_cols.contains(&col) => {
76+
Ok(Transformed::yes(LogicalExpr::Column(
77+
col.with_relation(subquery_alias.clone()),
78+
)))
79+
}
80+
_ => Ok(Transformed::no(expr)),
81+
})
82+
.data()
83+
.map(Some)
84+
})?;
85+
86+
LogicalPlanBuilder::from(cur_input)
87+
.join_on(new_subquery_plan, JoinType::Left, join_filter)?
88+
.build()
89+
}
90+
};
9591
}
96-
LogicalPlan::SubqueryAlias(subquery_alias) => self
97-
.rewrite(Arc::unwrap_or_clone(subquery_alias.input))
98-
.and_then(|new_plan| {
99-
LogicalPlanBuilder::from(new_plan)
100-
.alias(&subquery_alias.alias.to_qualified_name())
101-
.map(LogicalPlanBuilder::build)
102-
})
103-
.map(Transformed::yes),
104-
_ => Ok(Transformed::no(plan)),
105-
})
106-
.data()?;
10792

108-
new_plan
109-
.map_children(|child_plan| self.rewrite(child_plan).map(Transformed::yes))
93+
LogicalPlanBuilder::filter(cur_input, rewritten_expr)?
94+
}
95+
LogicalPlan::SubqueryAlias(subquery_alias) => self
96+
.rewrite(Arc::unwrap_or_clone(subquery_alias.input))
97+
.and_then(|new_plan| {
98+
LogicalPlanBuilder::from(new_plan)
99+
.alias(&subquery_alias.alias.to_qualified_name())
100+
.map(LogicalPlanBuilder::build)
101+
})?,
102+
_ => plan,
103+
};
104+
105+
// rewrite children
106+
plan.map_children(|child_plan| self.rewrite(child_plan).map(Transformed::yes))
110107
.data()
111108
}
112109
}

qurious/src/optimizer/rule/type_coercion.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::Arc;
22

33
use arrow::datatypes::{DataType, Schema};
44

5-
use crate::common::transformed::{TransformNode, Transformed, TransformedResult};
5+
use crate::common::transformed::{Transformed, TransformedResult};
66
use crate::error::Result;
77
use crate::logical::expr::alias::Alias;
88
use crate::logical::expr::{AggregateExpr, BinaryExpr, LogicalExpr};
@@ -18,22 +18,18 @@ impl OptimizerRule for TypeCoercion {
1818
"type_coercion"
1919
}
2020

21-
fn rewrite(&self, base_plan: LogicalPlan) -> Result<LogicalPlan> {
22-
base_plan
23-
.transform(|plan| {
24-
if matches!(plan, LogicalPlan::TableScan(_)) {
25-
return Ok(Transformed::no(plan));
26-
}
27-
let mut merged_schema = Arc::new(Schema::empty());
28-
let schema = plan.schema();
29-
30-
for input in plan.children().into_iter().flat_map(|x| x) {
31-
merged_schema = merge_schema(&schema, &input.schema()).map(Arc::new)?;
32-
}
33-
34-
plan.map_exprs(|expr| type_coercion(&merged_schema, expr))
35-
})
36-
.data()
21+
fn rewrite(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
22+
if matches!(plan, LogicalPlan::TableScan(_)) {
23+
return Ok(plan);
24+
}
25+
let mut merged_schema = Arc::new(Schema::empty());
26+
let schema = plan.schema();
27+
28+
for input in plan.children().into_iter().flat_map(|x| x) {
29+
merged_schema = merge_schema(&schema, &input.schema()).map(Arc::new)?;
30+
}
31+
32+
plan.map_exprs(|expr| type_coercion(&merged_schema, expr)).data()
3733
}
3834
}
3935

0 commit comments

Comments
 (0)