Skip to content

Commit 10c5ba6

Browse files
authored
feat: support self join elimination (#19169)
* feat: support self join elimination * fix marker join * fix subquery decorrelate * fix marker join * fix column id * fix logic test * refactor and add test * fix * fix * fix
1 parent 0adba06 commit 10c5ba6

File tree

16 files changed

+1411
-48
lines changed

16 files changed

+1411
-48
lines changed

src/query/sql/src/planner/metadata/metadata.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub type MetadataRef = Arc<RwLock<Metadata>>;
5959
pub struct Metadata {
6060
tables: Vec<TableEntry>,
6161
columns: Vec<ColumnEntry>,
62+
removed_mark_indexes: ColumnSet,
6263
/// Table column indexes that are lazy materialized.
6364
table_lazy_columns: HashMap<IndexType, ColumnSet>,
6465
table_source: HashMap<IndexType, DataSourcePlan>,
@@ -134,6 +135,14 @@ impl Metadata {
134135
self.columns.as_slice()
135136
}
136137

138+
pub fn add_removed_mark_index(&mut self, index: IndexType) {
139+
self.removed_mark_indexes.insert(index);
140+
}
141+
142+
pub fn is_removed_mark_index(&self, index: IndexType) -> bool {
143+
self.removed_mark_indexes.contains(&index)
144+
}
145+
137146
pub fn add_retained_column(&mut self, index: IndexType) {
138147
self.retained_columns.insert(index);
139148
}

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::optimizer::ir::SExpr;
3030
use crate::optimizer::optimizers::CTEFilterPushdownOptimizer;
3131
use crate::optimizer::optimizers::CascadesOptimizer;
3232
use crate::optimizer::optimizers::DPhpyOptimizer;
33+
use crate::optimizer::optimizers::EliminateSelfJoinOptimizer;
3334
use crate::optimizer::optimizers::distributed::BroadcastToShuffleOptimizer;
3435
use crate::optimizer::optimizers::operator::CleanupUnusedCTEOptimizer;
3536
use crate::optimizer::optimizers::operator::DeduplicateJoinConditionOptimizer;
@@ -265,6 +266,8 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
265266
]))
266267
// Apply DPhyp algorithm for cost-based join reordering
267268
.add(DPhpyOptimizer::new(opt_ctx.clone()))
269+
// Eliminate self joins when possible
270+
.add(EliminateSelfJoinOptimizer::new(opt_ctx.clone()))
268271
// After join reorder, Convert some single join to inner join.
269272
.add(SingleToInnerOptimizer::new())
270273
// Deduplicate join conditions.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::time::Instant;
17+
18+
use databend_common_exception::Result;
19+
use log::info;
20+
21+
use crate::optimizer::Optimizer;
22+
use crate::optimizer::OptimizerContext;
23+
use crate::optimizer::ir::SExpr;
24+
use crate::optimizer::optimizers::DPhpyOptimizer;
25+
use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer;
26+
use crate::optimizer::optimizers::rule::RuleID;
27+
28+
pub struct EliminateSelfJoinOptimizer {
29+
opt_ctx: Arc<OptimizerContext>,
30+
}
31+
32+
impl EliminateSelfJoinOptimizer {
33+
pub fn new(opt_ctx: Arc<OptimizerContext>) -> Self {
34+
Self { opt_ctx }
35+
}
36+
}
37+
38+
#[async_trait::async_trait]
39+
impl Optimizer for EliminateSelfJoinOptimizer {
40+
fn name(&self) -> String {
41+
"EliminateSelfJoinOptimizer".to_string()
42+
}
43+
44+
async fn optimize(&mut self, s_expr: &SExpr) -> Result<SExpr> {
45+
// `EagerAggregation` is used here as a speculative pre-rewrite to expose patterns that
46+
// `EliminateSelfJoin` can match. If no self-join is actually eliminated, we intentionally
47+
// return the original input plan to avoid keeping the eager-aggregation rewrite as a
48+
// standalone optimization.
49+
let start = Instant::now();
50+
static RULES_EAGER_AGGREGATION: &[RuleID] = &[RuleID::EagerAggregation];
51+
let optimizer = RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_EAGER_AGGREGATION);
52+
let s_expr_after_eager_aggregation = optimizer.optimize_sync(s_expr)?;
53+
54+
static RULES_ELIMINATE_SELF_JOIN: &[RuleID] = &[RuleID::EliminateSelfJoin];
55+
let optimizer =
56+
RecursiveRuleOptimizer::new(self.opt_ctx.clone(), RULES_ELIMINATE_SELF_JOIN);
57+
let s_expr_after_eliminate_self_join =
58+
optimizer.optimize_sync(&s_expr_after_eager_aggregation)?;
59+
60+
let duration = start.elapsed();
61+
62+
if s_expr_after_eliminate_self_join == s_expr_after_eager_aggregation {
63+
return Ok(s_expr.clone());
64+
}
65+
66+
// EliminateSelfJoinOptimizer should ideally run before Dphyp in the optimizer pipeline.
67+
// However, due to issues with the current EagerAggregation implementation, running it
68+
// before Dphyp causes TPC-H Q18 optimization to fail. Therefore, EliminateSelfJoinOptimizer
69+
// is placed after Dphyp, and we run Dphyp again here to ensure join reordering after
70+
// eliminating self-joins.
71+
let s_expr_after_dphyp = DPhpyOptimizer::new(self.opt_ctx.clone())
72+
.optimize_async(&s_expr_after_eliminate_self_join)
73+
.await?;
74+
75+
info!("EliminateSelfJoinOptimizer: {}ms", duration.as_millis());
76+
77+
Ok(s_expr_after_dphyp)
78+
}
79+
}

src/query/sql/src/planner/optimizer/optimizers/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
mod cascades;
1616
pub mod cte_filter_pushdown;
1717
pub mod distributed;
18+
mod eliminate_self_join;
1819
mod hyper_dp;
1920
pub mod operator;
2021
pub mod recursive;
2122
pub mod rule;
2223

2324
pub use cascades::CascadesOptimizer;
2425
pub use cte_filter_pushdown::CTEFilterPushdownOptimizer;
26+
pub use eliminate_self_join::EliminateSelfJoinOptimizer;
2527
pub use hyper_dp::DPhpyOptimizer;
2628
pub use operator::CleanupUnusedCTEOptimizer;

src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -692,20 +692,22 @@ impl SubqueryDecorrelatorOptimizer {
692692
SubqueryType::Any => {
693693
let output_column = subquery.output_column.clone();
694694
let column_name = format!("subquery_{}", output_column.index);
695-
let left_condition = wrap_cast(
696-
&ScalarExpr::BoundColumnRef(BoundColumnRef {
697-
span: subquery.span,
698-
column: ColumnBindingBuilder::new(
699-
column_name,
700-
output_column.index,
701-
output_column.data_type,
702-
Visibility::Visible,
703-
)
704-
.table_index(output_column.table_index)
705-
.build(),
706-
}),
707-
&subquery.data_type,
708-
);
695+
let left_condition_base = ScalarExpr::BoundColumnRef(BoundColumnRef {
696+
span: subquery.span,
697+
column: ColumnBindingBuilder::new(
698+
column_name,
699+
output_column.index,
700+
output_column.data_type,
701+
Visibility::Visible,
702+
)
703+
.table_index(output_column.table_index)
704+
.build(),
705+
});
706+
let left_condition = if left_condition_base.data_type()? == *subquery.data_type {
707+
left_condition_base
708+
} else {
709+
wrap_cast(&left_condition_base, &subquery.data_type)
710+
};
709711
let child_expr = *subquery.child_expr.as_ref().unwrap().clone();
710712
let op = subquery.compare_op.as_ref().unwrap().clone();
711713
let (right_condition, is_non_equi_condition) =

src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_eager_aggregation.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,12 @@ impl Rule for RuleEagerAggregation {
337337
if extra_eval_scalar_expr.is_some() {
338338
for eval_item in &extra_eval_scalar.items {
339339
let eval_used_columns = eval_item.scalar.used_columns();
340+
if eval_used_columns
341+
.iter()
342+
.any(|column| self.metadata.read().is_removed_mark_index(*column))
343+
{
344+
continue;
345+
}
340346
let mut resolved_by_one_child = false;
341347
join_columns.for_each_mut(|side, columns_set| {
342348
if eval_used_columns.is_subset(columns_set) {

src/query/sql/src/planner/optimizer/optimizers/rule/factory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::optimizer::optimizers::rule::RuleDeduplicateSort;
2323
use crate::optimizer::optimizers::rule::RuleEagerAggregation;
2424
use crate::optimizer::optimizers::rule::RuleEliminateEvalScalar;
2525
use crate::optimizer::optimizers::rule::RuleEliminateFilter;
26+
use crate::optimizer::optimizers::rule::RuleEliminateSelfJoin;
2627
use crate::optimizer::optimizers::rule::RuleEliminateSort;
2728
use crate::optimizer::optimizers::rule::RuleEliminateUnion;
2829
use crate::optimizer::optimizers::rule::RuleFilterFlattenOr;
@@ -124,6 +125,7 @@ impl RuleFactory {
124125
RuleID::EagerAggregation => Ok(Box::new(RuleEagerAggregation::new(metadata))),
125126
RuleID::PushDownPrewhere => Ok(Box::new(RulePushDownPrewhere::new(metadata))),
126127
RuleID::TryApplyAggIndex => Ok(Box::new(RuleTryApplyAggIndex::new(metadata))),
128+
RuleID::EliminateSelfJoin => Ok(Box::new(RuleEliminateSelfJoin::new(ctx))),
127129
RuleID::EliminateSort => Ok(Box::new(RuleEliminateSort::new())),
128130
RuleID::DeduplicateSort => Ok(Box::new(RuleDeduplicateSort::new())),
129131
RuleID::SemiToInnerJoin => Ok(Box::new(RuleSemiToInnerJoin::new())),

src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_eliminate_filter.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,28 @@ impl Rule for RuleEliminateFilter {
111111
true
112112
}
113113
}
114+
ScalarExpr::FunctionCall(func)
115+
if func.func_name == "is_true"
116+
&& func.arguments.len() == 1
117+
&& matches!(func.arguments[0], ScalarExpr::FunctionCall(_)) =>
118+
{
119+
let ScalarExpr::FunctionCall(inner) = &func.arguments[0] else {
120+
return true;
121+
};
122+
if inner.func_name != "eq" || inner.arguments.len() != 2 {
123+
return true;
124+
}
125+
if let (
126+
ScalarExpr::BoundColumnRef(left_col),
127+
ScalarExpr::BoundColumnRef(right_col),
128+
) = (&inner.arguments[0], &inner.arguments[1])
129+
{
130+
left_col.column.index != right_col.column.index
131+
|| left_col.column.data_type.is_nullable()
132+
} else {
133+
true
134+
}
135+
}
114136
_ => true,
115137
})
116138
.collect::<Vec<ScalarExpr>>();

src/query/sql/src/planner/optimizer/optimizers/rule/filter_rules/rule_push_down_filter_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl Rule for RulePushDownFilterJoin {
8282
let (s_expr, outer_to_inner) = outer_join_to_inner_join(s_expr, self.metadata.clone())?;
8383

8484
// Second, check if can convert mark join to semi join
85-
let (s_expr, mark_to_semi) = convert_mark_to_semi_join(&s_expr)?;
85+
let (s_expr, mark_to_semi) = convert_mark_to_semi_join(&s_expr, self.metadata.clone())?;
8686
if s_expr.plan().rel_op() != RelOp::Filter {
8787
state.add_result(s_expr);
8888
return Ok(());

src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
mod push_down_filter_join;
1616
mod rule_commute_join;
1717
mod rule_commute_join_base_table;
18+
mod rule_eliminate_self_join;
1819
mod rule_left_exchange_join;
1920
mod rule_semi_to_inner_join;
2021
mod util;
2122

2223
pub use push_down_filter_join::*;
2324
pub use rule_commute_join::RuleCommuteJoin;
2425
pub use rule_commute_join_base_table::RuleCommuteJoinBaseTable;
26+
pub use rule_eliminate_self_join::RuleEliminateSelfJoin;
2527
pub use rule_left_exchange_join::RuleLeftExchangeJoin;
2628
pub use rule_semi_to_inner_join::RuleSemiToInnerJoin;
2729
pub use util::get_join_predicates;

0 commit comments

Comments
 (0)