Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 5dc5fc0

Browse files
committed
fix(df-repr): enable eliminate limit rule (#223)
Signed-off-by: Alex Chi <[email protected]>
1 parent 54b36e9 commit 5dc5fc0

File tree

3 files changed

+41
-38
lines changed

3 files changed

+41
-38
lines changed

optd-datafusion-repr/src/lib.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use properties::{
2020
column_ref::ColumnRefPropertyBuilder,
2121
schema::{Catalog, SchemaPropertyBuilder},
2222
};
23-
use rules::*;
2423

2524
pub use memo_ext::{LogicalJoinOrder, MemoExt};
2625
pub use optimizer_ext::OptimizerExt;
@@ -96,7 +95,7 @@ impl DatafusionOptimizer {
9695

9796
pub fn default_cascades_rules(
9897
) -> Vec<Arc<RuleWrapper<DfNodeType, CascadesOptimizer<DfNodeType>>>> {
99-
let rules = PhysicalConversionRule::all_conversions();
98+
let rules = rules::PhysicalConversionRule::all_conversions();
10099
let mut rule_wrappers = vec![];
101100
for rule in rules {
102101
rule_wrappers.push(RuleWrapper::new_cascades(rule));
@@ -117,22 +116,33 @@ impl DatafusionOptimizer {
117116
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
118117
// FilterAggTransposeRule::new(),
119118
// )));
120-
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(HashJoinRule::new())));
121-
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinCommuteRule::new())));
122119
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
123-
InnerCrossJoinRule::new(),
120+
rules::HashJoinRule::new(),
124121
)));
125-
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinAssocRule::new())));
126122
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
127-
JoinAbsorbFilterRule::new(),
123+
rules::JoinCommuteRule::new(),
124+
)));
125+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
126+
rules::InnerCrossJoinRule::new(),
127+
)));
128+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
129+
rules::JoinAssocRule::new(),
130+
)));
131+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
132+
rules::JoinAbsorbFilterRule::new(),
128133
)));
129134
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
130135
// ProjectionPullUpJoin::new(),
131136
// )));
132137
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
133-
EliminateProjectRule::new(),
138+
rules::EliminateProjectRule::new(),
139+
)));
140+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
141+
rules::ProjectMergeRule::new(),
142+
)));
143+
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
144+
rules::EliminateLimitRule::new(),
134145
)));
135-
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(ProjectMergeRule::new())));
136146
// rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
137147
// EliminateFilterRule::new(),
138148
// )));
@@ -188,7 +198,7 @@ impl DatafusionOptimizer {
188198

189199
/// The optimizer settings for three-join demo as a perfect optimizer.
190200
pub fn new_alternative_physical_for_demo(catalog: Arc<dyn Catalog>) -> Self {
191-
let rules = PhysicalConversionRule::all_conversions();
201+
let rules = rules::PhysicalConversionRule::all_conversions();
192202
let mut rule_wrappers = Vec::new();
193203
for rule in rules {
194204
rule_wrappers.push(RuleWrapper::new_cascades(rule));

optd-datafusion-repr/src/rules.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// mod eliminate_duplicated_expr;
2-
// mod eliminate_limit;
2+
mod eliminate_limit;
33
// mod filter;
44
// mod filter_pushdown;
55
mod joins;
@@ -11,7 +11,7 @@ mod project_transpose;
1111
// pub use eliminate_duplicated_expr::{
1212
// EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule,
1313
// };
14-
// pub use eliminate_limit::EliminateLimitRule;
14+
pub use eliminate_limit::*;
1515
// pub use filter::{EliminateFilterRule, SimplifyFilterRule, SimplifyJoinCondRule};
1616
// pub use filter_pushdown::{
1717
// FilterAggTransposeRule, FilterCrossJoinTransposeRule, FilterInnerJoinTransposeRule,

optd-datafusion-repr/src/rules/eliminate_limit.rs

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,45 @@
1-
use optd_core::rules::{Rule, RuleMatcher};
2-
use optd_core::{nodes::PlanNode, optimizer::Optimizer};
3-
use std::collections::HashMap;
4-
use std::sync::Arc;
5-
61
use crate::plan_nodes::{
7-
ConstantPred, ConstantType, DfNodeType, DfReprPlanNode, LogicalEmptyRelation,
2+
ArcDfPlanNode, ConstantPred, ConstantType, DfNodeType, DfPredType, DfReprPlanNode,
3+
DfReprPredNode, LogicalEmptyRelation, LogicalLimit,
84
};
9-
10-
use crate::properties::schema::SchemaPropertyBuilder;
5+
use crate::OptimizerExt;
6+
use optd_core::nodes::PlanNodeOrGroup;
7+
use optd_core::optimizer::Optimizer;
8+
use optd_core::rules::{Rule, RuleMatcher};
119

1210
use super::macros::define_rule;
1311

14-
define_rule!(
15-
EliminateLimitRule,
16-
apply_eliminate_limit,
17-
(Limit, [child], [skip, fetch])
18-
);
12+
define_rule!(EliminateLimitRule, apply_eliminate_limit, (Limit, child));
1913

2014
/// Transformations:
2115
/// - Limit with skip 0 and no fetch -> Eliminate from the tree
2216
/// - Limit with limit 0 -> EmptyRelation
2317
fn apply_eliminate_limit(
2418
optimizer: &impl Optimizer<DfNodeType>,
25-
EliminateLimitRulePicks { child, skip, fetch }: EliminateLimitRulePicks,
19+
binding: ArcDfPlanNode,
2620
) -> Vec<PlanNodeOrGroup<DfNodeType>> {
27-
if let DfNodeType::Constant(ConstantType::UInt64) = skip.typ {
28-
if let DfNodeType::Constant(ConstantType::UInt64) = fetch.typ {
29-
let skip_val = ConstantPred::from_rel_node(skip.into())
30-
.unwrap()
31-
.value()
32-
.as_u64();
33-
34-
let fetch_val = ConstantPred::from_rel_node(fetch.into())
21+
let limit = LogicalLimit::from_plan_node(binding).unwrap();
22+
let skip = limit.skip();
23+
let fetch = limit.fetch();
24+
let child = limit.child();
25+
if let DfPredType::Constant(ConstantType::UInt64) = skip.typ {
26+
if let DfPredType::Constant(ConstantType::UInt64) = fetch.typ {
27+
let skip_val = ConstantPred::from_pred_node(skip).unwrap().value().as_u64();
28+
29+
let fetch_val = ConstantPred::from_pred_node(fetch)
3530
.unwrap()
3631
.value()
3732
.as_u64();
3833

3934
// Bad convention to have u64 max represent None
4035
let fetch_is_none = fetch_val == u64::MAX;
4136

42-
let schema =
43-
optimizer.get_property::<SchemaPropertyBuilder>(Arc::new(child.clone()), 0);
44-
37+
let schema = optimizer.get_schema_of(child.clone());
4538
if fetch_is_none && skip_val == 0 {
4639
return vec![child];
4740
} else if fetch_val == 0 {
4841
let node = LogicalEmptyRelation::new(false, schema);
49-
return vec![node.into_rel_node().as_ref().clone()];
42+
return vec![node.into_plan_node().into()];
5043
}
5144
}
5245
}

0 commit comments

Comments
 (0)