Skip to content

Commit b7e71a5

Browse files
authored
refactor(optimizer): optimizers (#17697)
* refactor(optimizer): optimizers * optimizers/distributed * run -> optimize() * optimizer/dynamic_sample/ -> optimizer/optimizers/hyper_dp/dynamic_sample/ * move cascade check from optimizer to cascade * optimizer/rule -> optimizers/rule
1 parent 4e9ecfb commit b7e71a5

File tree

101 files changed

+391
-339
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+391
-339
lines changed

src/query/service/tests/it/sql/planner/optimizer/agg_index_query_rewrite.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ use databend_common_expression::TableField;
2828
use databend_common_expression::TableSchemaRefExt;
2929
use databend_common_meta_app::schema::CreateOption;
3030
use databend_common_sql::optimizer::ir::SExpr;
31-
use databend_common_sql::optimizer::optimizers::RecursiveOptimizer;
31+
use databend_common_sql::optimizer::optimizers::recursive::RecursiveOptimizer;
32+
use databend_common_sql::optimizer::optimizers::rule::RuleID;
33+
use databend_common_sql::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES;
3234
use databend_common_sql::optimizer::OptimizerContext;
33-
use databend_common_sql::optimizer::RuleID;
34-
use databend_common_sql::optimizer::DEFAULT_REWRITE_RULES;
3535
use databend_common_sql::plans::AggIndexInfo;
3636
use databend_common_sql::plans::CreateTablePlan;
3737
use databend_common_sql::plans::Plan;
@@ -393,8 +393,8 @@ async fn test_query_rewrite_impl(format: &str) -> Result<()> {
393393
query.clear_applied_rules();
394394

395395
let opt_ctx = OptimizerContext::new(ctx.clone(), metadata.clone());
396-
let result =
397-
RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::TryApplyAggIndex]).run(&query)?;
396+
let result = RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::TryApplyAggIndex])
397+
.optimize(&query)?;
398398
let agg_index = find_push_down_index_info(&result)?;
399399
assert_eq!(
400400
suite.is_matched,
@@ -449,7 +449,7 @@ async fn plan_sql(
449449
{
450450
let s_expr = if optimize {
451451
let opt_ctx = OptimizerContext::new(ctx.clone(), metadata.clone());
452-
RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).run(&s_expr)?
452+
RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).optimize(&s_expr)?
453453
} else {
454454
*s_expr
455455
};

src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::binder::InternalColumnBinding;
3636
use crate::binder::MutationStrategy;
3737
use crate::binder::MutationType;
3838
use crate::optimizer::ir::SExpr;
39-
use crate::optimizer::operator::SubqueryRewriter;
39+
use crate::optimizer::optimizers::operator::SubqueryRewriter;
4040
use crate::optimizer::OptimizerContext;
4141
use crate::plans::BoundColumnRef;
4242
use crate::plans::Filter;
@@ -313,7 +313,7 @@ impl MutationExpression {
313313
let opt_ctx =
314314
OptimizerContext::new(binder.ctx.clone(), binder.metadata.clone());
315315
let mut rewriter = SubqueryRewriter::new(opt_ctx, None);
316-
let s_expr = rewriter.rewrite(&s_expr)?;
316+
let s_expr = rewriter.optimize(&s_expr)?;
317317

318318
Ok(MutationExpressionBindResult {
319319
input: s_expr,

src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use crate::normalize_identifier;
3333
use crate::optimizer::ir::ColumnSet;
3434
use crate::optimizer::ir::RelExpr;
3535
use crate::optimizer::ir::SExpr;
36-
use crate::optimizer::operator::FlattenInfo;
37-
use crate::optimizer::operator::SubqueryRewriter;
36+
use crate::optimizer::optimizers::operator::FlattenInfo;
37+
use crate::optimizer::optimizers::operator::SubqueryRewriter;
3838
use crate::optimizer::OptimizerContext;
3939
use crate::planner::binder::scalar::ScalarBinder;
4040
use crate::planner::binder::Binder;

src/query/sql/src/planner/optimizer/distributed/distributed.rs

Lines changed: 0 additions & 50 deletions
This file was deleted.

src/query/sql/src/planner/optimizer/ir/expr/m_expr.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use databend_common_exception::Result;
2020
use crate::optimizer::ir::expr::PatternExtractor;
2121
use crate::optimizer::ir::group::Group;
2222
use crate::optimizer::ir::Memo;
23-
use crate::optimizer::rule::AppliedRules;
24-
use crate::optimizer::rule::RulePtr;
25-
use crate::optimizer::rule::TransformResult;
23+
use crate::optimizer::optimizers::rule::AppliedRules;
24+
use crate::optimizer::optimizers::rule::RulePtr;
25+
use crate::optimizer::optimizers::rule::TransformResult;
2626
use crate::plans::RelOperator;
2727
use crate::IndexType;
2828

src/query/sql/src/planner/optimizer/ir/expr/s_expr.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use educe::Educe;
2424
use crate::optimizer::ir::property::RelExpr;
2525
use crate::optimizer::ir::property::RelationalProperty;
2626
use crate::optimizer::ir::StatInfo;
27-
use crate::optimizer::rule::AppliedRules;
28-
use crate::optimizer::rule::RuleID;
27+
use crate::optimizer::optimizers::rule::AppliedRules;
28+
use crate::optimizer::optimizers::rule::RuleID;
2929
use crate::plans::Exchange;
3030
use crate::plans::Operator;
3131
use crate::plans::RelOperator;

src/query/sql/src/planner/optimizer/mod.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,16 @@
1313
// limitations under the License.
1414

1515
mod cost;
16-
mod distributed;
1716
pub mod ir;
18-
pub mod operator;
1917
#[allow(clippy::module_inception)]
2018
mod optimizer;
2119
mod optimizer_context;
2220
pub mod optimizers;
23-
pub mod rule;
2421
mod statistics;
2522
mod util;
2623

27-
mod dynamic_sample;
28-
2924
pub use optimizer::optimize;
3025
pub use optimizer::optimize_query;
3126
pub use optimizer_context::OptimizerContext;
32-
pub use rule::agg_index;
33-
pub use rule::RuleFactory;
34-
pub use rule::RuleID;
35-
pub use rule::RuleSet;
36-
pub use rule::DEFAULT_REWRITE_RULES;
27+
pub use optimizers::rule::agg_index;
3728
pub use util::contains_local_table_scan;

src/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 34 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,24 @@ use log::info;
2323
use crate::binder::target_probe;
2424
use crate::binder::MutationStrategy;
2525
use crate::binder::MutationType;
26-
use crate::optimizer::distributed::optimize_distributed_query;
27-
use crate::optimizer::distributed::BroadcastToShuffleOptimizer;
28-
use crate::optimizer::distributed::SortAndLimitPushDownOptimizer;
2926
use crate::optimizer::ir::Memo;
3027
use crate::optimizer::ir::SExpr;
31-
use crate::optimizer::operator::DeduplicateJoinConditionOptimizer;
32-
use crate::optimizer::operator::PullUpFilterOptimizer;
33-
use crate::optimizer::operator::RuleNormalizeAggregateOptimizer;
34-
use crate::optimizer::operator::RuleStatsAggregateOptimizer;
35-
use crate::optimizer::operator::SingleToInnerOptimizer;
36-
use crate::optimizer::operator::SubqueryRewriter;
28+
use crate::optimizer::optimizers::distributed::BroadcastToShuffleOptimizer;
29+
use crate::optimizer::optimizers::operator::DeduplicateJoinConditionOptimizer;
30+
use crate::optimizer::optimizers::operator::PullUpFilterOptimizer;
31+
use crate::optimizer::optimizers::operator::RuleNormalizeAggregateOptimizer;
32+
use crate::optimizer::optimizers::operator::RuleStatsAggregateOptimizer;
33+
use crate::optimizer::optimizers::operator::SingleToInnerOptimizer;
34+
use crate::optimizer::optimizers::operator::SubqueryRewriter;
35+
use crate::optimizer::optimizers::recursive::RecursiveOptimizer;
36+
use crate::optimizer::optimizers::rule::RuleID;
37+
use crate::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES;
3738
use crate::optimizer::optimizers::CascadesOptimizer;
3839
use crate::optimizer::optimizers::DPhpy;
39-
use crate::optimizer::optimizers::RecursiveOptimizer;
4040
use crate::optimizer::statistics::CollectStatisticsOptimizer;
4141
use crate::optimizer::util::contains_local_table_scan;
4242
use crate::optimizer::util::contains_warehouse_table_scan;
4343
use crate::optimizer::OptimizerContext;
44-
use crate::optimizer::RuleID;
45-
use crate::optimizer::DEFAULT_REWRITE_RULES;
4644
use crate::plans::ConstantTableScan;
4745
use crate::plans::CopyIntoLocationPlan;
4846
use crate::plans::Join;
@@ -97,7 +95,7 @@ pub async fn optimize(opt_ctx: Arc<OptimizerContext>, plan: Plan) -> Result<Plan
9795
let mut s_expr = s_expr;
9896
if s_expr.contain_subquery() {
9997
s_expr =
100-
Box::new(SubqueryRewriter::new(opt_ctx.clone(), None).rewrite(&s_expr)?);
98+
Box::new(SubqueryRewriter::new(opt_ctx.clone(), None).optimize(&s_expr)?);
10199
}
102100
Ok(Plan::Explain {
103101
kind,
@@ -260,77 +258,53 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, mut s_expr: SExpr) -
260258
}
261259

262260
// 2. Eliminate subqueries by rewriting them into more efficient forms
263-
s_expr = SubqueryRewriter::new(opt_ctx.clone(), None).rewrite(&s_expr)?;
261+
s_expr = SubqueryRewriter::new(opt_ctx.clone(), None).optimize(&s_expr)?;
264262

265263
// 3. Apply statistics aggregation to gather and propagate statistics
266264
s_expr = RuleStatsAggregateOptimizer::new(opt_ctx.clone())
267-
.run(&s_expr)
265+
.optimize(&s_expr)
268266
.await?;
269267

270268
// 4. Collect statistics for SExpr nodes to support cost estimation
271269
s_expr = CollectStatisticsOptimizer::new(opt_ctx.clone())
272-
.run(&s_expr)
270+
.optimize(&s_expr)
273271
.await?;
274272

275273
// 5. Normalize aggregate, it should be executed before RuleSplitAggregate.
276-
s_expr = RuleNormalizeAggregateOptimizer::new().run(&s_expr)?;
274+
s_expr = RuleNormalizeAggregateOptimizer::new().optimize(&s_expr)?;
277275

278276
// 6. Pull up and infer filter.
279-
s_expr = PullUpFilterOptimizer::new(opt_ctx.clone()).run(&s_expr)?;
277+
s_expr = PullUpFilterOptimizer::new(opt_ctx.clone()).optimize(&s_expr)?;
280278

281279
// 7. Run default rewrite rules
282-
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).run(&s_expr)?;
280+
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).optimize(&s_expr)?;
283281

284282
// 8. Run post rewrite rules
285-
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::SplitAggregate]).run(&s_expr)?;
283+
s_expr =
284+
RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::SplitAggregate]).optimize(&s_expr)?;
286285

287286
// 9. Apply DPhyp algorithm for cost-based join reordering
288-
if opt_ctx.get_enable_dphyp() && opt_ctx.get_enable_join_reorder() {
289-
s_expr = DPhpy::new(opt_ctx.clone()).optimize(&s_expr).await?;
290-
}
287+
s_expr = DPhpy::new(opt_ctx.clone()).optimize(&s_expr).await?;
291288

292289
// 10. After join reorder, Convert some single join to inner join.
293-
s_expr = SingleToInnerOptimizer::new().run(&s_expr)?;
290+
s_expr = SingleToInnerOptimizer::new().optimize(&s_expr)?;
294291

295292
// 11. Deduplicate join conditions.
296-
s_expr = DeduplicateJoinConditionOptimizer::new().run(&s_expr)?;
293+
s_expr = DeduplicateJoinConditionOptimizer::new().optimize(&s_expr)?;
297294

298295
// 12. Apply join commutativity to further optimize join ordering
299296
if opt_ctx.get_enable_join_reorder() {
300297
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), [RuleID::CommuteJoin].as_slice())
301-
.run(&s_expr)?;
298+
.optimize(&s_expr)?;
302299
}
303300

304301
// 13. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case.
305-
let mut cascades = CascadesOptimizer::new(opt_ctx.clone())?;
306-
s_expr = match cascades.optimize(s_expr.clone()) {
307-
Ok(mut s_expr) => {
308-
// 14. Push down sort and limit operations for distributed execution
309-
// TODO(leiysky): do this optimization in cascades optimizer
310-
if opt_ctx.get_enable_distributed_optimization() {
311-
let sort_and_limit_optimizer = SortAndLimitPushDownOptimizer::create();
312-
s_expr = sort_and_limit_optimizer.optimize(&s_expr)?;
313-
}
314-
s_expr
315-
}
316-
317-
Err(e) => {
318-
info!(
319-
"CascadesOptimizer failed, fallback to heuristic optimizer: {}",
320-
e
321-
);
322-
if opt_ctx.get_enable_distributed_optimization() {
323-
s_expr = optimize_distributed_query(opt_ctx.get_table_ctx().clone(), &s_expr)?;
324-
}
325-
326-
s_expr
327-
}
328-
};
302+
s_expr = CascadesOptimizer::new(opt_ctx.clone())?.optimize(s_expr)?;
329303

330-
// 16. Eliminate unnecessary scalar calculations to clean up the final plan
304+
// 14. Eliminate unnecessary scalar calculations to clean up the final plan
331305
if !opt_ctx.get_planning_agg_index() {
332306
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), [RuleID::EliminateEvalScalar].as_slice())
333-
.run(&s_expr)?;
307+
.optimize(&s_expr)?;
334308
}
335309

336310
Ok(s_expr)
@@ -355,24 +329,25 @@ async fn get_optimized_memo(opt_ctx: Arc<OptimizerContext>, mut s_expr: SExpr) -
355329

356330
// Decorrelate subqueries, after this step, there should be no subquery in the expression.
357331
if s_expr.contain_subquery() {
358-
s_expr = SubqueryRewriter::new(opt_ctx.clone(), None).rewrite(&s_expr)?;
332+
s_expr = SubqueryRewriter::new(opt_ctx.clone(), None).optimize(&s_expr)?;
359333
}
360334

361335
s_expr = RuleStatsAggregateOptimizer::new(opt_ctx.clone())
362-
.run(&s_expr)
336+
.optimize(&s_expr)
363337
.await?;
364338

365339
// Collect statistics for each leaf node in SExpr.
366340
s_expr = CollectStatisticsOptimizer::new(opt_ctx.clone())
367-
.run(&s_expr)
341+
.optimize(&s_expr)
368342
.await?;
369343

370344
// Pull up and infer filter.
371-
s_expr = PullUpFilterOptimizer::new(opt_ctx.clone()).run(&s_expr)?;
345+
s_expr = PullUpFilterOptimizer::new(opt_ctx.clone()).optimize(&s_expr)?;
372346
// Run default rewrite rules
373-
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).run(&s_expr)?;
347+
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &DEFAULT_REWRITE_RULES).optimize(&s_expr)?;
374348
// Run post rewrite rules
375-
s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::SplitAggregate]).run(&s_expr)?;
349+
s_expr =
350+
RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::SplitAggregate]).optimize(&s_expr)?;
376351

377352
// Cost based optimization
378353
if opt_ctx.get_enable_dphyp() && opt_ctx.get_enable_join_reorder() {
@@ -388,7 +363,7 @@ async fn optimize_mutation(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Res
388363
// Optimize the input plan.
389364
let mut input_s_expr = optimize_query(opt_ctx.clone(), s_expr.child(0)?.clone()).await?;
390365
input_s_expr = RecursiveOptimizer::new(opt_ctx.clone(), &[RuleID::MergeFilterIntoMutation])
391-
.run(&input_s_expr)?;
366+
.optimize(&input_s_expr)?;
392367

393368
// For distributed query optimization, we need to remove the Exchange operator at the top of the plan.
394369
if let &RelOperator::Exchange(_) = input_s_expr.plan() {

0 commit comments

Comments
 (0)