Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 3a44cea

Browse files
committed
fix(df-repr): enable filter rules (#223)
Signed-off-by: Alex Chi <[email protected]>
1 parent 5dc5fc0 commit 3a44cea

File tree

8 files changed

+367
-483
lines changed

8 files changed

+367
-483
lines changed

optd-datafusion-repr/src/lib.rs

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,21 @@ impl DatafusionOptimizer {
7575
pub fn default_heuristic_rules(
7676
) -> Vec<Arc<dyn Rule<DfNodeType, HeuristicsOptimizer<DfNodeType>>>> {
7777
vec![
78-
// Arc::new(EliminateProjectRule::new()),
79-
// Arc::new(SimplifyFilterRule::new()),
80-
// Arc::new(SimplifyJoinCondRule::new()),
81-
// Arc::new(EliminateFilterRule::new()),
82-
// Arc::new(EliminateJoinRule::new()),
83-
// Arc::new(EliminateLimitRule::new()),
84-
// Arc::new(EliminateDuplicatedSortExprRule::new()),
85-
// Arc::new(EliminateDuplicatedAggExprRule::new()),
86-
// Arc::new(DepJoinEliminate::new()),
87-
// Arc::new(DepInitialDistinct::new()),
88-
// Arc::new(DepJoinPastProj::new()),
89-
// Arc::new(DepJoinPastFilter::new()),
90-
// Arc::new(DepJoinPastAgg::new()),
91-
// Arc::new(ProjectMergeRule::new()),
92-
// Arc::new(FilterMergeRule::new()),
78+
Arc::new(rules::EliminateProjectRule::new()),
79+
Arc::new(rules::SimplifyFilterRule::new()),
80+
Arc::new(rules::SimplifyJoinCondRule::new()),
81+
Arc::new(rules::EliminateFilterRule::new()),
82+
Arc::new(rules::EliminateJoinRule::new()),
83+
Arc::new(rules::EliminateLimitRule::new()),
84+
Arc::new(rules::EliminateDuplicatedSortExprRule::new()),
85+
Arc::new(rules::EliminateDuplicatedAggExprRule::new()),
86+
// Arc::new(rules::DepJoinEliminate::new()),
87+
// Arc::new(rules::DepInitialDistinct::new()),
88+
// Arc::new(rules::DepJoinPastProj::new()),
89+
// Arc::new(rules::DepJoinPastFilter::new()),
90+
// Arc::new(rules::DepJoinPastAgg::new()),
91+
Arc::new(rules::ProjectMergeRule::new()),
92+
Arc::new(rules::FilterMergeRule::new()),
9393
]
9494
}
9595

@@ -100,22 +100,21 @@ impl DatafusionOptimizer {
100100
for rule in rules {
101101
rule_wrappers.push(RuleWrapper::new_cascades(rule));
102102
}
103-
// add all filter pushdown rules as heuristic rules
104103
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
105-
// FilterProjectTransposeRule::new(),
106-
// )));
107-
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
108-
// FilterCrossJoinTransposeRule::new(),
109-
// )));
110-
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
111-
// FilterInnerJoinTransposeRule::new(),
112-
// )));
113-
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
114-
// FilterSortTransposeRule::new(),
115-
// )));
116-
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
117-
// FilterAggTransposeRule::new(),
104+
// rules::FilterProjectTransposeRule::new(),
118105
// )));
106+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
107+
rules::FilterCrossJoinTransposeRule::new(),
108+
)));
109+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
110+
rules::FilterInnerJoinTransposeRule::new(),
111+
)));
112+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
113+
rules::FilterSortTransposeRule::new(),
114+
)));
115+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
116+
rules::FilterAggTransposeRule::new(),
117+
)));
119118
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
120119
rules::HashJoinRule::new(),
121120
)));
@@ -143,11 +142,14 @@ impl DatafusionOptimizer {
143142
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
144143
rules::EliminateLimitRule::new(),
145144
)));
145+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
146+
rules::EliminateJoinRule::new(),
147+
)));
148+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
149+
rules::EliminateFilterRule::new(),
150+
)));
146151
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
147-
// EliminateFilterRule::new(),
148-
// )));
149-
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
150-
// ProjectFilterTransposeRule::new(),
152+
// rules::ProjectFilterTransposeRule::new(),
151153
// )));
152154
rule_wrappers
153155
}

optd-datafusion-repr/src/plan_nodes/macros.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ pub(crate) use define_plan_node;
125125
mod test {
126126
use crate::plan_nodes::*;
127127

128+
#[allow(dead_code)]
128129
fn get_explain_str(pretty: &Pretty) -> String {
129130
let mut config = PrettyConfig {
130131
need_boundaries: false,

optd-datafusion-repr/src/rules.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
1-
// mod eliminate_duplicated_expr;
1+
mod eliminate_duplicated_expr;
22
mod eliminate_limit;
3-
// mod filter;
4-
// mod filter_pushdown;
3+
mod filter;
4+
mod filter_pushdown;
55
mod joins;
66
mod macros;
77
mod physical;
88
mod project_transpose;
99
// mod subquery;
1010

11-
// pub use eliminate_duplicated_expr::{
12-
// EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule,
13-
// };
11+
pub use eliminate_duplicated_expr::*;
1412
pub use eliminate_limit::*;
15-
// pub use filter::{EliminateFilterRule, SimplifyFilterRule, SimplifyJoinCondRule};
16-
// pub use filter_pushdown::{
17-
// FilterAggTransposeRule, FilterCrossJoinTransposeRule, FilterInnerJoinTransposeRule,
18-
// FilterMergeRule, FilterSortTransposeRule,
19-
// };
13+
pub use filter::*;
14+
pub use filter_pushdown::*;
2015
pub use joins::*;
2116
pub use physical::PhysicalConversionRule;
2217
pub use project_transpose::*;
Lines changed: 46 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
1-
use std::collections::{HashMap, HashSet};
2-
use std::sync::Arc;
1+
use std::collections::HashSet;
32

4-
use itertools::Itertools;
3+
use optd_core::nodes::PlanNodeOrGroup;
4+
use optd_core::optimizer::Optimizer;
55
use optd_core::rules::{Rule, RuleMatcher};
6-
use optd_core::{nodes::PlanNode, optimizer::Optimizer};
76

87
use crate::plan_nodes::{
9-
DfNodeType, DfReprPlanNode, DfReprPlanNode, Expr, ListPred, LogicalAgg, LogicalSort,
10-
SortOrderPred, SortOrderType,
8+
ArcDfPlanNode, DfNodeType, DfReprPlanNode, DfReprPredNode, ListPred, LogicalAgg, LogicalSort,
9+
SortOrderPred,
1110
};
1211

1312
use super::macros::define_rule;
1413

1514
define_rule!(
1615
EliminateDuplicatedSortExprRule,
1716
apply_eliminate_duplicated_sort_expr,
18-
(Sort, child, [exprs])
17+
(Sort, child)
1918
);
2019

2120
/// Removes duplicate sort expressions
@@ -29,54 +28,37 @@ define_rule!(
2928
/// order by id desc, name
3029
fn apply_eliminate_duplicated_sort_expr(
3130
_optimizer: &impl Optimizer<DfNodeType>,
32-
EliminateDuplicatedSortExprRulePicks { child, exprs }: EliminateDuplicatedSortExprRulePicks,
31+
binding: ArcDfPlanNode,
3332
) -> Vec<PlanNodeOrGroup<DfNodeType>> {
34-
let sort_keys: Vec<Expr> = exprs
35-
.children
36-
.iter()
37-
.map(|x| Expr::from_rel_node(x.clone()).unwrap())
38-
.collect_vec();
33+
let sort = LogicalSort::from_plan_node(binding).unwrap();
34+
let exprs = sort.exprs();
35+
let sort_keys = exprs.to_vec().into_iter();
3936

40-
let normalized_sort_keys: Vec<Arc<PlanNode<DfNodeType>>> = exprs
41-
.children
42-
.iter()
43-
.map(|x| match x.typ {
44-
DfNodeType::SortOrder(_) => SortOrderPred::new(
45-
SortOrderType::Asc,
46-
SortOrderPred::from_rel_node(x.clone()).unwrap().child(),
47-
)
48-
.into_rel_node(),
49-
_ => x.clone(),
50-
})
51-
.collect_vec();
37+
let mut dedup_expr = Vec::new();
38+
let mut dedup_set = HashSet::new();
39+
let mut deduped = false;
5240

53-
let mut dedup_expr: Vec<Expr> = Vec::new();
54-
let mut dedup_set: HashSet<Arc<PlanNode<DfNodeType>>> = HashSet::new();
55-
56-
sort_keys
57-
.iter()
58-
.zip(normalized_sort_keys.iter())
59-
.for_each(|(expr, normalized_expr)| {
60-
if !dedup_set.contains(normalized_expr) {
61-
dedup_expr.push(expr.clone());
62-
dedup_set.insert(normalized_expr.to_owned());
63-
}
64-
});
41+
for sort_key in sort_keys {
42+
let sort_expr = SortOrderPred::from_pred_node(sort_key.clone()).unwrap();
43+
if !dedup_set.contains(&sort_expr.child()) {
44+
dedup_expr.push(sort_key.clone());
45+
dedup_set.insert(sort_expr.child().clone());
46+
} else {
47+
deduped = true;
48+
}
49+
}
6550

66-
if dedup_expr.len() != sort_keys.len() {
67-
let node = LogicalSort::new(
68-
DfReprPlanNode::from_group(child.into()),
69-
ListPred::new(dedup_expr),
70-
);
71-
return vec![node.into_rel_node().as_ref().clone()];
51+
if deduped {
52+
let node = LogicalSort::new_unchecked(sort.child(), ListPred::new(dedup_expr));
53+
return vec![node.into_plan_node().into()];
7254
}
7355
vec![]
7456
}
7557

7658
define_rule!(
7759
EliminateDuplicatedAggExprRule,
7860
apply_eliminate_duplicated_agg_expr,
79-
(Agg, child, exprs, [groups])
61+
(Agg, child)
8062
);
8163

8264
/// Removes duplicate group by expressions
@@ -88,30 +70,31 @@ define_rule!(
8870
/// select *
8971
/// from t1
9072
/// group by id, name
73+
///
74+
/// TODO: if projection refers to the column, we need to update the projection
9175
fn apply_eliminate_duplicated_agg_expr(
9276
_optimizer: &impl Optimizer<DfNodeType>,
93-
EliminateDuplicatedAggExprRulePicks {
94-
child,
95-
exprs,
96-
groups,
97-
}: EliminateDuplicatedAggExprRulePicks,
77+
binding: ArcDfPlanNode,
9878
) -> Vec<PlanNodeOrGroup<DfNodeType>> {
99-
let mut dedup_expr: Vec<Expr> = Vec::new();
100-
let mut dedup_set: HashSet<Arc<PlanNode<DfNodeType>>> = HashSet::new();
101-
groups.children.iter().for_each(|expr| {
102-
if !dedup_set.contains(expr) {
103-
dedup_expr.push(Expr::from_rel_node(expr.clone()).unwrap());
104-
dedup_set.insert(expr.clone());
79+
let agg = LogicalAgg::from_plan_node(binding).unwrap();
80+
let groups = agg.groups().to_vec();
81+
82+
let mut dedup_expr = Vec::new();
83+
let mut dedup_set = HashSet::new();
84+
let mut deduped = false;
85+
86+
for group in groups {
87+
if !dedup_set.contains(&group) {
88+
dedup_expr.push(group.clone());
89+
dedup_set.insert(group.clone());
90+
} else {
91+
deduped = true;
10592
}
106-
});
93+
}
10794

108-
if dedup_expr.len() != groups.children.len() {
109-
let node = LogicalAgg::new(
110-
DfReprPlanNode::from_group(child.into()),
111-
ListPred::from_group(exprs.into()),
112-
ListPred::new(dedup_expr),
113-
);
114-
return vec![node.into_rel_node().as_ref().clone()];
95+
if deduped {
96+
let node = LogicalAgg::new_unchecked(agg.child(), agg.exprs(), ListPred::new(dedup_expr));
97+
return vec![node.into_plan_node().into()];
11598
}
11699
vec![]
117100
}

0 commit comments

Comments
 (0)