Skip to content

Commit 1eda119

Browse files
committed
chore(query): refactor the cardinality of anti-join
1 parent c2ebd41 commit 1eda119

File tree

7 files changed

+243
-6
lines changed

7 files changed

+243
-6
lines changed

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,10 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
263263
]))
264264
// 10. Apply DPhyp algorithm for cost-based join reordering
265265
.add(DPhpyOptimizer::new(opt_ctx.clone()))
266+
.add(RecursiveRuleOptimizer::new(
267+
opt_ctx.clone(),
268+
[RuleID::PushDownAntiJoin].as_slice(),
269+
))
266270
// 11. After join reorder, Convert some single join to inner join.
267271
.add(SingleToInnerOptimizer::new())
268272
// 12. Deduplicate join conditions.

src/query/sql/src/planner/optimizer/optimizers/operator/filter/infer_filter.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,18 @@ impl<'a> InferFilterOptimizer<'a> {
732732
arguments: vec![
733733
self.exprs[equal_indexes[i]].clone(),
734734
self.exprs[equal_indexes[j]].clone(),
735+
// ScalarExpr::FunctionCall(FunctionCall {
736+
// span: None,
737+
// func_name: "infer_predicate".to_string(),
738+
// params: vec![],
739+
// arguments: vec![self.exprs[equal_indexes[i]].clone()],
740+
// }),
741+
// ScalarExpr::FunctionCall(FunctionCall {
742+
// span: None,
743+
// func_name: "infer_predicate".to_string(),
744+
// params: vec![],
745+
// arguments: vec![self.exprs[equal_indexes[j]].clone()],
746+
// }),
735747
],
736748
}));
737749
}

src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use crate::optimizer::optimizers::rule::RulePushDownRankLimitAggregate;
5959
use crate::optimizer::optimizers::rule::RulePushDownSortEvalScalar;
6060
use crate::optimizer::optimizers::rule::RulePushDownSortFilterScan;
6161
use crate::optimizer::optimizers::rule::RulePushDownSortScan;
62+
use crate::optimizer::optimizers::rule::RulePushdownAntiJoin;
6263
use crate::optimizer::optimizers::rule::RuleSemiToInnerJoin;
6364
use crate::optimizer::optimizers::rule::RuleSplitAggregate;
6465
use crate::optimizer::optimizers::rule::RuleTryApplyAggIndex;
@@ -130,6 +131,7 @@ impl RuleFactory {
130131
RuleID::MergeFilterIntoMutation => {
131132
Ok(Box::new(RuleMergeFilterIntoMutation::new(metadata)))
132133
}
134+
RuleID::PushDownAntiJoin => Ok(Box::new(RulePushdownAntiJoin::new())),
133135
}
134136
}
135137
}

src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ use crate::optimizer::ir::Matcher;
2121
use crate::optimizer::ir::RelExpr;
2222
use crate::optimizer::ir::SExpr;
2323
use crate::optimizer::optimizers::operator::EquivalentConstantsVisitor;
24-
use crate::optimizer::optimizers::operator::InferFilterOptimizer;
25-
use crate::optimizer::optimizers::operator::JoinProperty;
2624
use crate::optimizer::optimizers::rule::can_filter_null;
2725
use crate::optimizer::optimizers::rule::constant::false_constant;
2826
use crate::optimizer::optimizers::rule::constant::is_falsy;
@@ -242,9 +240,9 @@ pub fn try_push_down_filter_join(s_expr: &SExpr, metadata: MetadataRef) -> Resul
242240
right_push_down = vec![];
243241
}
244242
}
245-
let join_prop = JoinProperty::new(&left_prop.output_columns, &right_prop.output_columns);
246-
let mut infer_filter = InferFilterOptimizer::new(Some(join_prop));
247-
push_down_predicates = infer_filter.optimize(push_down_predicates)?;
243+
// let join_prop = JoinProperty::new(&left_prop.output_columns, &right_prop.output_columns);
244+
// let mut infer_filter = InferFilterOptimizer::new(Some(join_prop));
245+
// push_down_predicates = infer_filter.optimize(push_down_predicates)?;
248246
}
249247

250248
let mut all_push_down = vec![];

src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ mod push_down_filter_join;
1616
mod rule_commute_join;
1717
mod rule_commute_join_base_table;
1818
mod rule_left_exchange_join;
19+
mod rule_push_down_anti_join;
1920
mod rule_semi_to_inner_join;
2021
mod util;
2122

2223
pub use push_down_filter_join::*;
2324
pub use rule_commute_join::RuleCommuteJoin;
2425
pub use rule_commute_join_base_table::RuleCommuteJoinBaseTable;
2526
pub use rule_left_exchange_join::RuleLeftExchangeJoin;
27+
pub use rule_push_down_anti_join::RulePushdownAntiJoin;
2628
pub use rule_semi_to_inner_join::RuleSemiToInnerJoin;
2729
pub use util::get_join_predicates;

src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_push_down_anti_join.rs

Lines changed: 218 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,224 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
1516

17+
use databend_common_exception::Result;
18+
19+
use crate::binder::JoinPredicate;
20+
use crate::optimizer::ir::Matcher;
21+
use crate::optimizer::ir::RelExpr;
22+
use crate::optimizer::ir::SExpr;
23+
use crate::optimizer::optimizers::rule::Rule;
24+
use crate::optimizer::optimizers::rule::RuleID;
25+
use crate::optimizer::optimizers::rule::TransformResult;
26+
use crate::plans::Join;
27+
use crate::plans::JoinType;
28+
use crate::plans::RelOp;
29+
use crate::plans::RelOperator;
30+
use crate::ColumnSet;
31+
32+
/// Push `Left/Right Semi|Anti` join closer to the base table that participates
33+
/// in the predicate so that fewer rows stay in the join tree.
1634
pub struct RulePushdownAntiJoin {
35+
id: RuleID,
36+
matchers: Vec<Matcher>,
37+
}
38+
39+
impl RulePushdownAntiJoin {
40+
pub fn new() -> Self {
41+
Self {
42+
id: RuleID::PushDownAntiJoin,
43+
matchers: vec![Matcher::MatchOp {
44+
op_type: RelOp::Join,
45+
children: vec![Matcher::Leaf, Matcher::Leaf],
46+
}],
47+
}
48+
}
49+
50+
fn try_push_down(&self, left: &SExpr, right: &SExpr, join: Join) -> Result<Option<SExpr>> {
51+
let right_rel_expr = RelExpr::with_s_expr(right);
52+
53+
if let Some(inner_join) = extract_inner_join(left)? {
54+
let inner_join_rel_expr = RelExpr::with_s_expr(&inner_join);
55+
let inner_join_left_prop = inner_join_rel_expr.derive_relational_prop_child(0)?;
56+
let inner_join_right_prop = inner_join_rel_expr.derive_relational_prop_child(1)?;
57+
58+
let equi_conditions = join
59+
.equi_conditions
60+
.iter()
61+
.map(|condition| {
62+
JoinPredicate::new(
63+
&condition.left,
64+
&inner_join_left_prop,
65+
&inner_join_right_prop,
66+
)
67+
})
68+
.collect::<Vec<_>>();
69+
70+
if equi_conditions.iter().all(left_predicate) {
71+
// let mut new_equi_conditions = Vec::with_capacity(equi_conditions.len());
72+
73+
// for (idx, (inferred, predicate)) in equi_conditions.into_iter().enumerate() {
74+
// if !inferred || matches!(predicate, JoinPredicate::Left(_)) {
75+
// new_equi_conditions.push(join.equi_conditions[idx].clone());
76+
// }
77+
// }
78+
79+
// join.equi_conditions = new_equi_conditions;
80+
let right_prop = right_rel_expr.derive_relational_prop()?;
81+
let mut union_output_columns = ColumnSet::new();
82+
union_output_columns.extend(right_prop.output_columns.clone());
83+
union_output_columns.extend(inner_join_left_prop.output_columns.clone());
84+
85+
if join
86+
.non_equi_conditions
87+
.iter()
88+
.all(|x| x.used_columns().is_subset(&union_output_columns))
89+
{
90+
let new_inner_join = inner_join.replace_children([
91+
Arc::new(SExpr::create_binary(
92+
RelOperator::Join(join.clone()),
93+
inner_join.child(0)?.clone(),
94+
right.clone(),
95+
)),
96+
Arc::new(inner_join.child(1)?.clone()),
97+
]);
98+
99+
return replace_inner_join(left, new_inner_join);
100+
}
101+
} else if equi_conditions.iter().all(right_predicate) {
102+
// let mut new_equi_conditions = Vec::with_capacity(equi_conditions.len());
103+
104+
// for (idx, (inferred, predicate)) in equi_conditions.into_iter().enumerate() {
105+
// if !inferred || matches!(predicate, JoinPredicate::Left(_)) {
106+
// new_equi_conditions.push(join.equi_conditions[idx].clone());
107+
// }
108+
// }
109+
110+
// join.equi_conditions = new_equi_conditions;
111+
let right_prop = right_rel_expr.derive_relational_prop()?;
112+
let mut union_output_columns = ColumnSet::new();
113+
union_output_columns.extend(right_prop.output_columns.clone());
114+
union_output_columns.extend(inner_join_right_prop.output_columns.clone());
115+
116+
if join
117+
.non_equi_conditions
118+
.iter()
119+
.all(|x| x.used_columns().is_subset(&union_output_columns))
120+
{
121+
let new_inner_join = inner_join.replace_children([
122+
Arc::new(inner_join.child(0)?.clone()),
123+
Arc::new(SExpr::create_binary(
124+
RelOperator::Join(join.clone()),
125+
inner_join.child(1)?.clone(),
126+
right.clone(),
127+
)),
128+
]);
129+
130+
return replace_inner_join(left, new_inner_join);
131+
}
132+
}
133+
}
134+
135+
Ok(None)
136+
}
137+
}
138+
139+
impl Rule for RulePushdownAntiJoin {
140+
fn id(&self) -> RuleID {
141+
self.id
142+
}
143+
144+
fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
145+
let join: Join = s_expr.plan().clone().try_into()?;
146+
147+
if matches!(join.join_type, JoinType::LeftAnti | JoinType::LeftSemi) {
148+
if let Some(mut result) =
149+
self.try_push_down(s_expr.child(0)?, s_expr.child(1)?, join)?
150+
{
151+
result.set_applied_rule(&self.id);
152+
state.add_result(result);
153+
}
154+
}
155+
156+
Ok(())
157+
}
158+
159+
fn matchers(&self) -> &[Matcher] {
160+
&self.matchers
161+
}
162+
}
163+
164+
impl Default for RulePushdownAntiJoin {
165+
fn default() -> Self {
166+
Self::new()
167+
}
168+
}
169+
170+
fn replace_inner_join(expr: &SExpr, new_join: SExpr) -> Result<Option<SExpr>> {
171+
match expr.plan() {
172+
RelOperator::Join(join) if join.join_type == JoinType::Inner => Ok(Some(new_join)),
173+
RelOperator::Filter(_) => match replace_inner_join(expr.child(0)?, new_join)? {
174+
None => Ok(None),
175+
Some(new_child) => Ok(Some(expr.replace_children([Arc::new(new_child)]))),
176+
},
177+
_ => Ok(None),
178+
}
179+
}
180+
181+
fn extract_inner_join(expr: &SExpr) -> Result<Option<SExpr>> {
182+
match expr.plan() {
183+
RelOperator::Join(join) if join.join_type == JoinType::Inner => Ok(Some(expr.clone())),
184+
RelOperator::Filter(_) => extract_inner_join(expr.child(0)?),
185+
_ => Ok(None),
186+
}
187+
}
188+
189+
// struct ColumnMappingRewriter<'a> {
190+
// mapping: &'a HashMap<usize, ColumnBinding>,
191+
// }
192+
//
193+
// impl VisitorMut<'_> for ColumnMappingRewriter {
194+
// fn visit_bound_column_ref(&mut self, col: &mut BoundColumnRef) -> Result<()> {
195+
// if let Some(&new_index) = self.mapping.get(&col.column.index) {
196+
// col.column = new_index.clone();
197+
// }
198+
// Ok(())
199+
// }
200+
// }
201+
//
202+
// fn replace_by_equivalence(
203+
// expr: &ScalarExpr,
204+
// mapping: &HashMap<IndexType, ColumnBinding>,
205+
// ) -> Result<ScalarExpr> {
206+
// if mapping.is_empty() {
207+
// return Ok(expr.clone());
208+
// }
209+
//
210+
// let mut new_expr = expr.clone();
211+
// let mut rewriter = ColumnMappingRewriter { mapping };
212+
// rewriter.visit(&mut new_expr)?;
213+
// Ok(new_expr)
214+
// }
215+
//
216+
// fn collect_mapping(join: &Join) -> Result<Vec<(ColumnBinding, ColumnBinding)>> {
217+
// for equi_condition in &join.equi_conditions {
218+
// match equi_condition.left
219+
// }
220+
// }
221+
//
222+
// fn all_left(
223+
// conditions: &[JoinEquiCondition],
224+
// inner_join: Join,
225+
// left: Arc<RelationalProperty>,
226+
// ) -> Result<bool> {
227+
// }
228+
229+
fn left_predicate(tuple: &JoinPredicate) -> bool {
230+
matches!(&tuple, JoinPredicate::Left(_))
231+
}
17232

18-
}
233+
fn right_predicate(tuple: &JoinPredicate) -> bool {
234+
matches!(&tuple, JoinPredicate::Right(_))
235+
}

src/query/sql/src/planner/optimizer/optimizers/rule/rule.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ pub enum RuleID {
122122
PushDownSortFilterScan,
123123
PushDownLimitFilterScan,
124124
SemiToInnerJoin,
125+
PushDownAntiJoin,
125126
EliminateEvalScalar,
126127
EliminateFilter,
127128
EliminateSort,
@@ -194,6 +195,7 @@ impl Display for RuleID {
194195
RuleID::EliminateUnion => write!(f, "EliminateUnion"),
195196

196197
RuleID::MergeFilterIntoMutation => write!(f, "MergeFilterIntoMutation"),
198+
RuleID::PushDownAntiJoin => write!(f, "PushDownAntiJoin"),
197199
}
198200
}
199201
}

0 commit comments

Comments
 (0)