From 83da0a9daef922ccd3eb7760638f4abf51e835a6 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Sun, 5 Oct 2025 15:47:29 +0800 Subject: [PATCH 1/2] init --- .../sql/src/planner/optimizer/optimizer.rs | 19 +- .../common_subexpression/analyze.rs | 83 +++++++ .../optimizers/common_subexpression/mod.rs | 20 ++ .../common_subexpression/optimizer.rs | 48 ++++ .../common_subexpression/rewrite.rs | 231 ++++++++++++++++++ .../common_subexpression/table_signature.rs | 59 +++++ .../src/planner/optimizer/optimizers/mod.rs | 2 + 7 files changed, 454 insertions(+), 8 deletions(-) create mode 100644 src/query/sql/src/planner/optimizer/optimizers/common_subexpression/analyze.rs create mode 100644 src/query/sql/src/planner/optimizer/optimizers/common_subexpression/mod.rs create mode 100644 src/query/sql/src/planner/optimizer/optimizers/common_subexpression/optimizer.rs create mode 100644 src/query/sql/src/planner/optimizer/optimizers/common_subexpression/rewrite.rs create mode 100644 src/query/sql/src/planner/optimizer/optimizers/common_subexpression/table_signature.rs diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 1d91b48fe03fd..d68514789dbf7 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -38,6 +38,7 @@ use crate::optimizer::optimizers::rule::RuleID; use crate::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES; use crate::optimizer::optimizers::CTEFilterPushdownOptimizer; use crate::optimizer::optimizers::CascadesOptimizer; +use crate::optimizer::optimizers::CommonSubexpressionOptimizer; use crate::optimizer::optimizers::DPhpyOptimizer; use crate::optimizer::pipeline::OptimizerPipeline; use crate::optimizer::statistics::CollectStatisticsOptimizer; @@ -261,28 +262,30 @@ pub async fn optimize_query(opt_ctx: Arc, s_expr: SExpr) -> Re .add(RecursiveRuleOptimizer::new(opt_ctx.clone(), &[ RuleID::SplitAggregate, ])) - // 10. Apply DPhyp algorithm for cost-based join reordering + // 10. Apply CSE optimization to reduce redundant computations + .add(CommonSubexpressionOptimizer::new(opt_ctx.clone())) + // 11. Apply DPhyp algorithm for cost-based join reordering .add(DPhpyOptimizer::new(opt_ctx.clone())) - // 11. After join reorder, Convert some single join to inner join. + // 12. After join reorder, Convert some single join to inner join. .add(SingleToInnerOptimizer::new()) - // 12. Deduplicate join conditions. + // 13. Deduplicate join conditions. .add(DeduplicateJoinConditionOptimizer::new()) - // 13. Apply join commutativity to further optimize join ordering + // 14. Apply join commutativity to further optimize join ordering .add_if( opt_ctx.get_enable_join_reorder(), RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::CommuteJoin].as_slice()), ) - // 14. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case. + // 15. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case. .add(CascadesOptimizer::new(opt_ctx.clone())?) - // 15. Eliminate unnecessary scalar calculations to clean up the final plan + // 16. Eliminate unnecessary scalar calculations to clean up the final plan .add_if( !opt_ctx.get_planning_agg_index(), RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::EliminateEvalScalar].as_slice()), ) - // 16. Clean up unused CTEs + // 17. Clean up unused CTEs .add(CleanupUnusedCTEOptimizer); - // 17. Execute the pipeline + // 18. Execute the pipeline let s_expr = pipeline.execute().await?; Ok(s_expr) diff --git a/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/analyze.rs b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/analyze.rs new file mode 100644 index 0000000000000..de035c03072f9 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/analyze.rs @@ -0,0 +1,83 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use databend_common_exception::Result; + +use crate::optimizer::ir::SExpr; +use crate::optimizer::optimizers::common_subexpression::rewrite::SExprReplacement; +use crate::optimizer::optimizers::common_subexpression::table_signature::collect_table_signatures; +use crate::planner::metadata::Metadata; +use crate::plans::MaterializedCTE; +use crate::plans::MaterializedCTERef; +use crate::plans::RelOperator; +pub fn analyze_common_subexpression( + s_expr: &SExpr, + metadata: &Metadata, +) -> Result<(Vec, Vec)> { + let signature_to_exprs = collect_table_signatures(s_expr, metadata); + let mut replacements = vec![]; + let mut materialized_ctes = vec![]; + for exprs in signature_to_exprs.values() { + process_candidate_expressions(exprs, &mut replacements, &mut materialized_ctes, metadata)?; + } + Ok((replacements, materialized_ctes)) +} + +fn process_candidate_expressions( + candidates: &[(Vec, SExpr)], + replacements: &mut Vec, + materialized_ctes: &mut Vec, + _metadata: &Metadata, +) -> Result<()> { + if candidates.len() < 2 { + return Ok(()); + } + + let cte_def = &candidates[0].1; + let cte_def_columns = cte_def.derive_relational_prop()?.output_columns.clone(); + let cte_name = format!("cte_cse_{}", materialized_ctes.len()); + + let cte_plan = MaterializedCTE::new(cte_name.clone(), None, None); + let cte_expr = SExpr::create_unary( + Arc::new(RelOperator::MaterializedCTE(cte_plan)), + Arc::new(cte_def.clone()), + ); + materialized_ctes.push(cte_expr); + + for (path, expr) in candidates { + let cte_ref_columns = expr.derive_relational_prop()?.output_columns.clone(); + let column_mapping = cte_def_columns + .iter() + .copied() + .zip(cte_ref_columns.iter().copied()) + .collect::>(); + let cte_ref = MaterializedCTERef { + cte_name: cte_name.clone(), + output_columns: cte_ref_columns.iter().copied().collect(), + def: expr.clone(), + column_mapping, + }; + let cte_ref_expr = Arc::new(SExpr::create_leaf(Arc::new( + RelOperator::MaterializedCTERef(cte_ref), + ))); + replacements.push(SExprReplacement { + path: path.clone(), + new_expr: cte_ref_expr.clone(), + }); + } + Ok(()) +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/mod.rs new file mode 100644 index 0000000000000..df12662a66e3d --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/mod.rs @@ -0,0 +1,20 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod analyze; +mod optimizer; +mod rewrite; +mod table_signature; + +pub use optimizer::CommonSubexpressionOptimizer; diff --git a/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/optimizer.rs new file mode 100644 index 0000000000000..bf48532e60a94 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/optimizer.rs @@ -0,0 +1,48 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use databend_common_exception::Result; + +use crate::optimizer::ir::SExpr; +use crate::optimizer::optimizers::common_subexpression::analyze::analyze_common_subexpression; +use crate::optimizer::optimizers::common_subexpression::rewrite::rewrite_sexpr; +use crate::optimizer::Optimizer; +use crate::optimizer::OptimizerContext; + +pub struct CommonSubexpressionOptimizer { + pub(crate) _opt_ctx: Arc, +} + +#[async_trait] +impl Optimizer for CommonSubexpressionOptimizer { + async fn optimize(&mut self, s_expr: &SExpr) -> Result { + let metadata = self._opt_ctx.get_metadata(); + let metadata = metadata.read(); + let (replacements, materialized_ctes) = analyze_common_subexpression(s_expr, &metadata)?; + rewrite_sexpr(s_expr, replacements, materialized_ctes) + } + + fn name(&self) -> String { + "CommonSubexpressionOptimizer".to_string() + } +} + +impl CommonSubexpressionOptimizer { + pub fn new(opt_ctx: Arc) -> Self { + Self { _opt_ctx: opt_ctx } + } +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/rewrite.rs b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/rewrite.rs new file mode 100644 index 0000000000000..fda51381a89ba --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/rewrite.rs @@ -0,0 +1,231 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; + +use crate::optimizer::ir::SExpr; +use crate::plans::RelOperator; +use crate::plans::Sequence; + +/// Replace a subtree at the specified path in the SExpr tree. +/// +/// # Arguments +/// * `root` - The root SExpr to perform replacement on +/// * `path` - A slice of child indices specifying the path to the replacement position +/// * `replacement` - The new SExpr to replace the subtree at the specified position +/// +/// # Returns +/// A new SExpr with the replacement performed, or an error if the path is invalid +/// +/// # Example +/// If path is [0, 1], this will replace the second child (index 1) of the first child (index 0) of root. +pub fn replace_at_path(root: &SExpr, path: &[usize], replacement: Arc) -> Result { + if path.is_empty() { + // Replace the root itself + return Ok((*replacement).clone()); + } + + let first_index = path[0]; + if first_index >= root.children.len() { + return Err(ErrorCode::Internal(format!( + "Invalid path in replace_at_path: path: {:?}, root: {:?}", + path, root + ))); + } + + // Recursively replace in the subtree + let remaining_path = &path[1..]; + let old_child = &root.children[first_index]; + let new_child = Arc::new(replace_at_path(old_child, remaining_path, replacement)?); + + // Create new children with the replaced child + let mut new_children = root.children.clone(); + new_children[first_index] = new_child; + + // Return a new SExpr with updated children + Ok(root.replace_children(new_children)) +} + +pub fn wrap_with_sequence(materialized_cte: SExpr, s_expr: SExpr) -> SExpr { + let sequence = Sequence; + SExpr::create_binary( + Arc::new(RelOperator::Sequence(sequence)), + Arc::new(materialized_cte), + Arc::new(s_expr), + ) +} + +pub fn rewrite_sexpr( + s_expr: &SExpr, + replacements: Vec, + materialized_ctes: Vec, +) -> Result { + let mut result = s_expr.clone(); + + for replacement in replacements { + result = replace_at_path(&result, &replacement.path, replacement.new_expr)?; + } + + for cte_expr in materialized_ctes { + result = wrap_with_sequence(cte_expr, result); + } + + Ok(result) +} + +/// Represents a single SExpr replacement operation +#[derive(Clone, Debug)] +pub struct SExprReplacement { + /// Path to the location where replacement should occur + pub path: Vec, + /// The new expression to replace with + pub new_expr: Arc, +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::replace_at_path; + use crate::optimizer::ir::SExpr; + use crate::plans::RelOperator; + use crate::plans::Scan; + + fn create_scan_expr(table_index: u32) -> SExpr { + let scan = Scan { + table_index: table_index as usize, + ..Default::default() + }; + SExpr::create_leaf(Arc::new(RelOperator::Scan(scan))) + } + + fn create_join_expr(left: Arc, right: Arc) -> SExpr { + use crate::plans::Join; + use crate::plans::JoinType; + + let join = Join { + equi_conditions: vec![], + non_equi_conditions: vec![], + join_type: JoinType::Cross, + marker_index: None, + from_correlated_subquery: false, + need_hold_hash_table: false, + is_lateral: false, + single_to_inner: None, + build_side_cache_info: None, + }; + SExpr::create_binary(Arc::new(RelOperator::Join(join)), left, right) + } + + #[test] + fn test_replace_at_root() { + let original = create_scan_expr(1); + let replacement = Arc::new(create_scan_expr(2)); + + let result = replace_at_path(&original, &[], replacement).unwrap(); + + if let RelOperator::Scan(scan) = result.plan.as_ref() { + assert_eq!(scan.table_index, 2); + } else { + panic!("Expected Scan operator"); + } + } + + #[test] + fn test_replace_first_child() { + let left = Arc::new(create_scan_expr(1)); + let right = Arc::new(create_scan_expr(2)); + let original = create_join_expr(left, right); + + let replacement = Arc::new(create_scan_expr(3)); + let result = replace_at_path(&original, &[0], replacement).unwrap(); + + // Check that the left child was replaced + let new_left = result.child(0).unwrap(); + if let RelOperator::Scan(scan) = new_left.plan.as_ref() { + assert_eq!(scan.table_index, 3); + } else { + panic!("Expected Scan operator"); + } + + // Check that the right child is unchanged + let new_right = result.child(1).unwrap(); + if let RelOperator::Scan(scan) = new_right.plan.as_ref() { + assert_eq!(scan.table_index, 2); + } else { + panic!("Expected Scan operator"); + } + } + + #[test] + fn test_replace_nested_path() { + // Create a nested structure: Join(Join(Scan1, Scan2), Scan3) + let scan1 = Arc::new(create_scan_expr(1)); + let scan2 = Arc::new(create_scan_expr(2)); + let inner_join = Arc::new(create_join_expr(scan1, scan2)); + let scan3 = Arc::new(create_scan_expr(3)); + let outer_join = create_join_expr(inner_join, scan3); + + // Replace the right child of the left child (path [0, 1]) + let replacement = Arc::new(create_scan_expr(4)); + let result = replace_at_path(&outer_join, &[0, 1], replacement).unwrap(); + + // Navigate to the replaced position + let left_child = result.child(0).unwrap(); + let replaced_child = left_child.child(1).unwrap(); + + if let RelOperator::Scan(scan) = replaced_child.plan.as_ref() { + assert_eq!(scan.table_index, 4); + } else { + panic!("Expected Scan operator"); + } + + // Check that other nodes are unchanged + let left_left_child = left_child.child(0).unwrap(); + if let RelOperator::Scan(scan) = left_left_child.plan.as_ref() { + assert_eq!(scan.table_index, 1); + } else { + panic!("Expected Scan operator"); + } + } + + #[test] + fn test_invalid_path_out_of_bounds() { + let original = create_scan_expr(1); + let replacement = Arc::new(create_scan_expr(2)); + + let result = replace_at_path(&original, &[0], replacement); + + assert!(result.is_err()); + let error = result.unwrap_err(); + assert!(error.to_string().contains("out of bounds")); + } + + #[test] + fn test_invalid_path_deep() { + let left = Arc::new(create_scan_expr(1)); + let right = Arc::new(create_scan_expr(2)); + let original = create_join_expr(left, right); + + let replacement = Arc::new(create_scan_expr(3)); + let result = replace_at_path(&original, &[0, 0], replacement); + + assert!(result.is_err()); + let error = result.unwrap_err(); + assert!(error.to_string().contains("out of bounds")); + } +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/table_signature.rs b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/table_signature.rs new file mode 100644 index 0000000000000..8fbfe1a94bd20 --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/table_signature.rs @@ -0,0 +1,59 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; +use std::collections::HashMap; + +use crate::optimizer::ir::SExpr; +use crate::planner::metadata::Metadata; +use crate::plans::RelOperator; +use crate::IndexType; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct TableSignature { + pub tables: BTreeSet, +} + +pub fn collect_table_signatures( + root: &SExpr, + metadata: &Metadata, +) -> HashMap, SExpr)>> { + let mut signature_to_exprs = HashMap::new(); + let mut path = Vec::new(); + collect_table_signatures_rec(root, &mut path, metadata, &mut signature_to_exprs); + signature_to_exprs +} + +fn collect_table_signatures_rec( + expr: &SExpr, + path: &mut Vec, + metadata: &Metadata, + signature_to_exprs: &mut HashMap, SExpr)>>, +) { + for (child_index, child) in expr.children().enumerate() { + path.push(child_index); + collect_table_signatures_rec(child, path, metadata, signature_to_exprs); + path.pop(); + } + + if let RelOperator::Scan(scan) = expr.plan.as_ref() { + let mut tables = BTreeSet::new(); + let table_entry = metadata.table(scan.table_index); + tables.insert(table_entry.table().get_id() as IndexType); + signature_to_exprs + .entry(TableSignature { tables }) + .or_default() + .push((path.clone(), expr.clone())); + } +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/mod.rs index 6fa380d8d997b..a4aeac54cac30 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/mod.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod cascades; +mod common_subexpression; pub mod cte_filter_pushdown; pub mod distributed; mod hyper_dp; @@ -21,6 +22,7 @@ pub mod recursive; pub mod rule; pub use cascades::CascadesOptimizer; +pub use common_subexpression::CommonSubexpressionOptimizer; pub use cte_filter_pushdown::CTEFilterPushdownOptimizer; pub use hyper_dp::DPhpyOptimizer; pub use operator::CleanupUnusedCTEOptimizer; From 556d3e2f2b661db82e2069a3ab6ca5c9baea553b Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 10 Oct 2025 16:12:49 +0800 Subject: [PATCH 2/2] prune columns for cte --- Cargo.lock | 1 - .../physical_aggregate_final.rs | 13 +- .../src/physical_plans/physical_async_func.rs | 11 +- .../physical_plans/physical_cte_consumer.rs | 17 +- .../physical_plans/physical_eval_scalar.rs | 15 +- .../src/physical_plans/physical_exchange.rs | 12 +- .../physical_expression_scan.rs | 5 +- .../src/physical_plans/physical_filter.rs | 8 +- .../src/physical_plans/physical_join.rs | 26 +-- .../src/physical_plans/physical_limit.rs | 5 +- .../physical_materialized_cte.rs | 45 ++-- .../src/physical_plans/physical_mutation.rs | 5 +- .../physical_plans/physical_plan_builder.rs | 218 +++++++++++++++++- .../physical_plans/physical_project_set.rs | 10 +- .../physical_plans/physical_secure_filter.rs | 11 +- .../src/physical_plans/physical_sort.rs | 14 +- .../src/physical_plans/physical_udf.rs | 11 +- .../src/physical_plans/physical_window.rs | 19 +- .../common_subexpression/analyze.rs | 4 +- 19 files changed, 333 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cd83593b9352..77021c24d3bc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5089,7 +5089,6 @@ dependencies = [ "logcall", "map-api", "maplit", - "pin-project", "poem", "pretty_assertions", "prometheus-client 0.22.3", diff --git a/src/query/service/src/physical_plans/physical_aggregate_final.rs b/src/query/service/src/physical_plans/physical_aggregate_final.rs index 216536ca92d15..02f6f66651b51 100644 --- a/src/query/service/src/physical_plans/physical_aggregate_final.rs +++ b/src/query/service/src/physical_plans/physical_aggregate_final.rs @@ -208,23 +208,20 @@ impl PhysicalPlanBuilder { &mut self, s_expr: &SExpr, agg: &Aggregate, - mut required: ColumnSet, + required: ColumnSet, stat_info: PlanStatsInfo, ) -> Result { // 1. Prune unused Columns. let mut used = vec![]; for item in &agg.aggregate_functions { if required.contains(&item.index) { - required.extend(item.scalar.used_columns()); used.push(item.clone()); } } - agg.group_items.iter().for_each(|i| { - // If the group item comes from a complex expression, we only include the final - // column index here. The used columns will be included in its EvalScalar child. - required.insert(i.index); - }); + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); // single key without aggregation if agg.group_items.is_empty() && used.is_empty() { @@ -245,7 +242,7 @@ impl PhysicalPlanBuilder { }; // 2. Build physical plan. - let input = self.build(s_expr.child(0)?, required).await?; + let input = self.build(s_expr.child(0)?, child_required).await?; let input_schema = input.output_schema()?; let group_items = agg.group_items.iter().map(|v| v.index).collect::>(); diff --git a/src/query/service/src/physical_plans/physical_async_func.rs b/src/query/service/src/physical_plans/physical_async_func.rs index c7f7b8ed355d1..4858ffc35adae 100644 --- a/src/query/service/src/physical_plans/physical_async_func.rs +++ b/src/query/service/src/physical_plans/physical_async_func.rs @@ -132,23 +132,26 @@ impl PhysicalPlanBuilder { &mut self, s_expr: &SExpr, async_func_plan: &databend_common_sql::plans::AsyncFunction, - mut required: ColumnSet, + required: ColumnSet, stat_info: PlanStatsInfo, ) -> Result { // 1. Prune unused Columns. let mut used = vec![]; for item in async_func_plan.items.iter() { if required.contains(&item.index) { - required.extend(item.scalar.used_columns()); used.push(item.clone()); } } + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); + // 2. Build physical plan. if used.is_empty() { - return self.build(s_expr.child(0)?, required).await; + return self.build(s_expr.child(0)?, child_required).await; } - let input = self.build(s_expr.child(0)?, required).await?; + let input = self.build(s_expr.child(0)?, child_required).await?; let input_schema = input.output_schema()?; let async_func_descs = used diff --git a/src/query/service/src/physical_plans/physical_cte_consumer.rs b/src/query/service/src/physical_plans/physical_cte_consumer.rs index 602eb9e3d943e..11ab266c3487c 100644 --- a/src/query/service/src/physical_plans/physical_cte_consumer.rs +++ b/src/query/service/src/physical_plans/physical_cte_consumer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::collections::HashMap; use databend_common_exception::Result; use databend_common_expression::DataField; @@ -93,11 +94,23 @@ impl PhysicalPlanBuilder { cte_consumer: &databend_common_sql::plans::MaterializedCTERef, stat_info: PlanStatsInfo, ) -> Result { + let def_to_ref = cte_consumer + .column_mapping + .iter() + .map(|(k, v)| (*v, *k)) + .collect::>(); + let cte_output_columns: Vec<_> = self + .cte_required_columns + .get(&cte_consumer.cte_name) + .unwrap() + .iter() + .map(|c| def_to_ref.get(c).unwrap()) + .collect(); let mut fields = Vec::new(); let metadata = self.metadata.read(); - for index in &cte_consumer.output_columns { - let column = metadata.column(*index); + for index in cte_output_columns.iter() { + let column = metadata.column(**index); let data_type = column.data_type(); fields.push(DataField::new(&index.to_string(), data_type)); } diff --git a/src/query/service/src/physical_plans/physical_eval_scalar.rs b/src/query/service/src/physical_plans/physical_eval_scalar.rs index 4cf767a82cb2a..560985a7d1c4a 100644 --- a/src/query/service/src/physical_plans/physical_eval_scalar.rs +++ b/src/query/service/src/physical_plans/physical_eval_scalar.rs @@ -192,7 +192,7 @@ impl PhysicalPlanBuilder { &mut self, s_expr: &SExpr, eval_scalar: &databend_common_sql::plans::EvalScalar, - mut required: ColumnSet, + required: ColumnSet, stat_info: PlanStatsInfo, ) -> Result { // 1. Prune unused Columns. @@ -204,19 +204,20 @@ impl PhysicalPlanBuilder { continue; } used.push(s.clone()); - s.scalar.used_columns().iter().for_each(|c| { - required.insert(*c); - }) } + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); + // 2. Build physical plan. if used.is_empty() { - self.build(s_expr.child(0)?, required).await + self.build(s_expr.child(0)?, child_required).await } else { let child = s_expr.child(0)?; let input = if let Some(new_child) = self.try_eliminate_flatten_columns(&used, child)? { - self.build(&new_child, required).await? + self.build(&new_child, child_required.clone()).await? } else { - self.build(child, required).await? + self.build(child, child_required).await? }; let column_projections: HashSet = column_projections diff --git a/src/query/service/src/physical_plans/physical_exchange.rs b/src/query/service/src/physical_plans/physical_exchange.rs index a794ba7ce44a3..dab1a1aebff83 100644 --- a/src/query/service/src/physical_plans/physical_exchange.rs +++ b/src/query/service/src/physical_plans/physical_exchange.rs @@ -97,17 +97,15 @@ impl PhysicalPlanBuilder { &mut self, s_expr: &SExpr, exchange: &databend_common_sql::plans::Exchange, - mut required: ColumnSet, + required: ColumnSet, ) -> Result { // 1. Prune unused Columns. - if let databend_common_sql::plans::Exchange::Hash(exprs) = exchange { - for expr in exprs { - required.extend(expr.used_columns()); - } - } + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); // 2. Build physical plan. - let input = self.build(s_expr.child(0)?, required).await?; + let input = self.build(s_expr.child(0)?, child_required).await?; let input_schema = input.output_schema()?; let mut keys = vec![]; let mut allow_adjust_parallelism = true; diff --git a/src/query/service/src/physical_plans/physical_expression_scan.rs b/src/query/service/src/physical_plans/physical_expression_scan.rs index 8e463c114deb3..20ecf66a77d3f 100644 --- a/src/query/service/src/physical_plans/physical_expression_scan.rs +++ b/src/query/service/src/physical_plans/physical_expression_scan.rs @@ -117,7 +117,10 @@ impl PhysicalPlanBuilder { scan: &databend_common_sql::plans::ExpressionScan, required: ColumnSet, ) -> Result { - let input = self.build(s_expr.child(0)?, required).await?; + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); + let input = self.build(s_expr.child(0)?, child_required).await?; let input_schema = input.output_schema()?; let values = scan diff --git a/src/query/service/src/physical_plans/physical_filter.rs b/src/query/service/src/physical_plans/physical_filter.rs index 8efa752e7d23b..d72f59b8c6d4f 100644 --- a/src/query/service/src/physical_plans/physical_filter.rs +++ b/src/query/service/src/physical_plans/physical_filter.rs @@ -137,12 +137,12 @@ impl PhysicalPlanBuilder { stat_info: PlanStatsInfo, ) -> Result { // 1. Prune unused Columns. - let used = filter.predicates.iter().fold(required.clone(), |acc, v| { - acc.union(&v.used_columns()).cloned().collect() - }); + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); // 2. Build physical plan. - let input = self.build(s_expr.child(0)?, used).await?; + let input = self.build(s_expr.child(0)?, child_required).await?; required = required .union(self.metadata.read().get_retained_column()) .cloned() diff --git a/src/query/service/src/physical_plans/physical_join.rs b/src/query/service/src/physical_plans/physical_join.rs index 85e0830565602..55b3d16df516c 100644 --- a/src/query/service/src/physical_plans/physical_join.rs +++ b/src/query/service/src/physical_plans/physical_join.rs @@ -144,28 +144,10 @@ impl PhysicalPlanBuilder { others_required.insert(*column); } } - - // Include columns referenced in left conditions and right conditions. - let left_required: ColumnSet = join - .equi_conditions - .iter() - .fold(required.clone(), |acc, v| { - acc.union(&v.left.used_columns()).cloned().collect() - }) - .union(&others_required) - .cloned() - .collect(); - let right_required: ColumnSet = join - .equi_conditions - .iter() - .fold(required.clone(), |acc, v| { - acc.union(&v.right.used_columns()).cloned().collect() - }) - .union(&others_required) - .cloned() - .collect(); - let left_required = left_required.union(&others_required).cloned().collect(); - let right_required = right_required.union(&others_required).cloned().collect(); + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let left_required = child_required.remove(0); + let right_required = child_required.remove(0); // 2. Build physical plan. // Choose physical join type by join conditions diff --git a/src/query/service/src/physical_plans/physical_limit.rs b/src/query/service/src/physical_plans/physical_limit.rs index e20ce418eb79e..94448215a3896 100644 --- a/src/query/service/src/physical_plans/physical_limit.rs +++ b/src/query/service/src/physical_plans/physical_limit.rs @@ -149,7 +149,10 @@ impl PhysicalPlanBuilder { } // 2. Build physical plan. - let input_plan = self.build(s_expr.child(0)?, required).await?; + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); + let input_plan = self.build(s_expr.child(0)?, child_required).await?; if limit.before_exchange || limit.lazy_columns.is_empty() || !support_lazy_materialize { return Ok(PhysicalPlan::new(Limit { input: input_plan, diff --git a/src/query/service/src/physical_plans/physical_materialized_cte.rs b/src/query/service/src/physical_plans/physical_materialized_cte.rs index 22f9c2ae7a8f4..37066449307e1 100644 --- a/src/query/service/src/physical_plans/physical_materialized_cte.rs +++ b/src/query/service/src/physical_plans/physical_materialized_cte.rs @@ -16,9 +16,10 @@ use std::any::Any; use databend_common_exception::Result; use databend_common_expression::DataSchemaRef; -use databend_common_sql::optimizer::ir::RelExpr; +use databend_common_pipeline_transforms::TransformPipelineHelper; +use databend_common_sql::evaluator::BlockOperator; +use databend_common_sql::evaluator::CompoundBlockOperator; use databend_common_sql::optimizer::ir::SExpr; -use databend_common_sql::ColumnBinding; use crate::physical_plans::explain::PlanStatsInfo; use crate::physical_plans::format::MaterializedCTEFormatter; @@ -38,7 +39,7 @@ pub struct MaterializedCTE { pub stat_info: Option, pub input: PhysicalPlan, pub cte_name: String, - pub cte_output_columns: Option>, + pub cte_output_columns: Option>, pub ref_count: usize, pub channel_size: Option, pub meta: PhysicalPlanMeta, @@ -95,13 +96,20 @@ impl IPhysicalPlan for MaterializedCTE { let input_schema = self.input.output_schema()?; if let Some(output_columns) = &self.cte_output_columns { - PipelineBuilder::build_result_projection( - &builder.func_ctx, - input_schema, - output_columns, - &mut builder.main_pipeline, - false, - )?; + let mut projections = Vec::with_capacity(output_columns.len()); + for index in output_columns { + projections.push(input_schema.index_of(index.to_string().as_str())?); + } + let num_input_columns = input_schema.num_fields(); + builder.main_pipeline.add_transformer(|| { + CompoundBlockOperator::new( + vec![BlockOperator::Project { + projection: projections.clone(), + }], + builder.func_ctx.clone(), + num_input_columns, + ) + }); } builder.main_pipeline.try_resize(1)?; @@ -123,20 +131,19 @@ impl PhysicalPlanBuilder { materialized_cte: &databend_common_sql::plans::MaterializedCTE, stat_info: PlanStatsInfo, ) -> Result { - let required = match &materialized_cte.cte_output_columns { - Some(o) => o.iter().map(|c| c.index).collect(), - None => RelExpr::with_s_expr(s_expr.child(0)?) - .derive_relational_prop()? - .output_columns - .clone(), - }; - let input = self.build(s_expr.child(0)?, required).await?; + let required = self + .cte_required_columns + .get(&materialized_cte.cte_name) + .unwrap() + .clone(); + let cte_output_columns = Some(required.iter().copied().collect()); + let input = self.build_physical_plan(s_expr.child(0)?, required).await?; Ok(PhysicalPlan::new(MaterializedCTE { plan_id: 0, stat_info: Some(stat_info), input, cte_name: materialized_cte.cte_name.clone(), - cte_output_columns: materialized_cte.cte_output_columns.clone(), + cte_output_columns, ref_count: materialized_cte.ref_count, channel_size: materialized_cte.channel_size, meta: PhysicalPlanMeta::new("MaterializedCTE"), diff --git a/src/query/service/src/physical_plans/physical_mutation.rs b/src/query/service/src/physical_plans/physical_mutation.rs index d61ba6fa9e175..e85b714f3d318 100644 --- a/src/query/service/src/physical_plans/physical_mutation.rs +++ b/src/query/service/src/physical_plans/physical_mutation.rs @@ -297,7 +297,10 @@ impl PhysicalPlanBuilder { let udf_col_num = required_udf_ids.len(); required.extend(required_udf_ids); - let mut plan = self.build(s_expr.child(0)?, required).await?; + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); + let mut plan = self.build(s_expr.child(0)?, child_required).await?; if *no_effect { return Ok(plan); } diff --git a/src/query/service/src/physical_plans/physical_plan_builder.rs b/src/query/service/src/physical_plans/physical_plan_builder.rs index 5190a683affe3..987678687bf7a 100644 --- a/src/query/service/src/physical_plans/physical_plan_builder.rs +++ b/src/query/service/src/physical_plans/physical_plan_builder.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use databend_common_catalog::plan::PartStatistics; @@ -31,14 +32,14 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::physical_plans::explain::PlanStatsInfo; use crate::physical_plans::physical_plan::PhysicalPlan; - pub struct PhysicalPlanBuilder { pub metadata: MetadataRef, pub ctx: Arc, pub func_ctx: FunctionContext, pub dry_run: bool, - // DataMutation info, used to build MergeInto physical plan pub mutation_build_info: Option, + pub cte_required_columns: HashMap, + pub is_cte_required_columns_collected: bool, } impl PhysicalPlanBuilder { @@ -50,6 +51,8 @@ impl PhysicalPlanBuilder { func_ctx, dry_run, mutation_build_info: None, + cte_required_columns: HashMap::new(), + is_cte_required_columns_collected: false, } } @@ -63,6 +66,11 @@ impl PhysicalPlanBuilder { } pub async fn build(&mut self, s_expr: &SExpr, required: ColumnSet) -> Result { + if !self.is_cte_required_columns_collected { + self.collect_cte_required_columns(s_expr, required.clone())?; + self.is_cte_required_columns_collected = true; + } + let mut plan = self.build_physical_plan(s_expr, required).await?; plan.adjust_plan_id(&mut 0); @@ -154,6 +162,212 @@ impl PhysicalPlanBuilder { pub fn set_metadata(&mut self, metadata: MetadataRef) { self.metadata = metadata; } + + pub(crate) fn derive_child_required_columns( + &self, + s_expr: &SExpr, + parent_required: &ColumnSet, + ) -> Result> { + let arity = s_expr.arity(); + if arity == 0 { + return Ok(vec![]); + } + + let mut child_required: Vec = + (0..arity).map(|_| parent_required.clone()).collect(); + + match s_expr.plan() { + RelOperator::MaterializedCTE(cte) => { + let output_columns = if let Some(columns) = &cte.cte_output_columns { + columns.iter().map(|c| c.index).collect::() + } else { + RelExpr::with_s_expr(s_expr.child(0)?) + .derive_relational_prop()? + .output_columns + .clone() + }; + child_required[0] = output_columns; + } + RelOperator::EvalScalar(eval_scalar) => { + let req = &mut child_required[0]; + for item in &eval_scalar.items { + if parent_required.contains(&item.index) { + for col in item.scalar.used_columns() { + req.insert(col); + } + } + } + } + RelOperator::Filter(filter) => { + let req = &mut child_required[0]; + for predicate in &filter.predicates { + req.extend(predicate.used_columns()); + } + } + RelOperator::SecureFilter(filter) => { + let req = &mut child_required[0]; + for predicate in &filter.predicates { + req.extend(predicate.used_columns()); + } + } + RelOperator::Aggregate(agg) => { + let req = &mut child_required[0]; + for item in &agg.group_items { + req.insert(item.index); + for col in item.scalar.used_columns() { + req.insert(col); + } + } + for item in &agg.aggregate_functions { + if parent_required.contains(&item.index) { + for col in item.scalar.used_columns() { + req.insert(col); + } + } + } + } + RelOperator::Window(window) => { + let req = &mut child_required[0]; + for item in &window.arguments { + req.extend(item.scalar.used_columns()); + req.insert(item.index); + } + for item in &window.partition_by { + req.extend(item.scalar.used_columns()); + req.insert(item.index); + } + for item in &window.order_by { + req.extend(item.order_by_item.scalar.used_columns()); + req.insert(item.order_by_item.index); + } + } + RelOperator::Sort(sort) => { + let req = &mut child_required[0]; + for item in &sort.items { + req.insert(item.index); + } + } + RelOperator::Limit(_) => { + // no extra columns needed beyond parent_required + } + RelOperator::Join(join) => { + let mut others_required = join + .non_equi_conditions + .iter() + .fold(parent_required.clone(), |acc, v| { + acc.union(&v.used_columns()).cloned().collect() + }); + if let Some(cache_info) = &join.build_side_cache_info { + for column in &cache_info.columns { + others_required.insert(*column); + } + } + + let left_required: ColumnSet = join + .equi_conditions + .iter() + .fold(parent_required.clone(), |acc, v| { + acc.union(&v.left.used_columns()).cloned().collect() + }) + .union(&others_required) + .cloned() + .collect(); + let right_required: ColumnSet = join + .equi_conditions + .iter() + .fold(parent_required.clone(), |acc, v| { + acc.union(&v.right.used_columns()).cloned().collect() + }) + .union(&others_required) + .cloned() + .collect(); + + child_required[0] = left_required.union(&others_required).cloned().collect(); + child_required[1] = right_required.union(&others_required).cloned().collect(); + } + RelOperator::UnionAll(_) => { + // already initialised with parent_required clone + } + RelOperator::Exchange(databend_common_sql::plans::Exchange::Hash(exprs)) => { + let req = &mut child_required[0]; + for expr in exprs { + req.extend(expr.used_columns()); + } + } + RelOperator::Exchange(_) => {} + RelOperator::ProjectSet(project_set) => { + let req = &mut child_required[0]; + for item in &project_set.srfs { + if parent_required.contains(&item.index) { + for col in item.scalar.used_columns() { + req.insert(col); + } + } + } + } + RelOperator::Udf(udf) => { + let req = &mut child_required[0]; + for item in &udf.items { + if parent_required.contains(&item.index) { + for col in item.scalar.used_columns() { + req.insert(col); + } + } + } + } + RelOperator::AsyncFunction(async_func) => { + let req = &mut child_required[0]; + for item in &async_func.items { + if parent_required.contains(&item.index) { + for col in item.scalar.used_columns() { + req.insert(col); + } + } + } + } + RelOperator::Mutation(_) => { + // same as parent_required + } + RelOperator::Sequence(_) => { + // same as parent_required for each child + } + RelOperator::ExpressionScan(_) => { + // same as parent_required for single child + } + _ => { + // default: keep parent_required for all children + } + } + + Ok(child_required) + } + + fn collect_cte_required_columns(&mut self, s_expr: &SExpr, required: ColumnSet) -> Result<()> { + match s_expr.plan() { + RelOperator::MaterializedCTERef(cte_ref) => { + let mut required_mapped = ColumnSet::new(); + for col in required { + if let Some(mapped) = cte_ref.column_mapping.get(&col) { + required_mapped.insert(*mapped); + } + } + self.cte_required_columns + .entry(cte_ref.cte_name.clone()) + .and_modify(|cols| { + *cols = cols.union(&required_mapped).cloned().collect(); + }) + .or_insert(required_mapped); + Ok(()) + } + _ => { + let child_required = self.derive_child_required_columns(s_expr, &required)?; + for (idx, columns) in child_required.into_iter().enumerate() { + self.collect_cte_required_columns(s_expr.child(idx)?, columns)?; + } + Ok(()) + } + } + } } #[derive(Clone)] diff --git a/src/query/service/src/physical_plans/physical_project_set.rs b/src/query/service/src/physical_plans/physical_project_set.rs index ea54f83364cf5..b016ca15571d2 100644 --- a/src/query/service/src/physical_plans/physical_project_set.rs +++ b/src/query/service/src/physical_plans/physical_project_set.rs @@ -148,17 +148,17 @@ impl PhysicalPlanBuilder { &mut self, s_expr: &SExpr, project_set: &databend_common_sql::plans::ProjectSet, - mut required: ColumnSet, + required: ColumnSet, stat_info: PlanStatsInfo, ) -> Result { // 1. Prune unused Columns. let column_projections = required.clone().into_iter().collect::>(); - for s in project_set.srfs.iter() { - required.extend(s.scalar.used_columns().iter().copied()); - } + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); // 2. Build physical plan. - let input = self.build(s_expr.child(0)?, required).await?; + let input = self.build(s_expr.child(0)?, child_required).await?; let input_schema = input.output_schema()?; let srf_exprs = project_set .srfs diff --git a/src/query/service/src/physical_plans/physical_secure_filter.rs b/src/query/service/src/physical_plans/physical_secure_filter.rs index 9e8e19230c430..3cef39a3c38d2 100644 --- a/src/query/service/src/physical_plans/physical_secure_filter.rs +++ b/src/query/service/src/physical_plans/physical_secure_filter.rs @@ -143,15 +143,12 @@ impl PhysicalPlanBuilder { stat_info: PlanStatsInfo, ) -> Result { // 1. Prune unused Columns. - let used = secure_filter - .predicates - .iter() - .fold(required.clone(), |acc, v| { - acc.union(&v.used_columns()).cloned().collect() - }); + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); // 2. Build physical plan. - let input = self.build(s_expr.child(0)?, used).await?; + let input = self.build(s_expr.child(0)?, child_required).await?; required = required .union(self.metadata.read().get_retained_column()) .cloned() diff --git a/src/query/service/src/physical_plans/physical_sort.rs b/src/query/service/src/physical_plans/physical_sort.rs index 020c67837b04a..d3eb8875b59d6 100644 --- a/src/query/service/src/physical_plans/physical_sort.rs +++ b/src/query/service/src/physical_plans/physical_sort.rs @@ -378,13 +378,13 @@ impl PhysicalPlanBuilder { &mut self, s_expr: &SExpr, sort: &databend_common_sql::plans::Sort, - mut required: ColumnSet, + required: ColumnSet, stat_info: PlanStatsInfo, ) -> Result { // 1. Prune unused Columns. - sort.items.iter().for_each(|s| { - required.insert(s.index); - }); + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); // If the query will be optimized by lazy reading, we don't need to do pre-projection. let pre_projection: Option> = if self.metadata.read().lazy_columns().is_empty() { @@ -418,7 +418,9 @@ impl PhysicalPlanBuilder { None => SortStep::Single, }; - let input_plan = self.build(s_expr.unary_child(), required).await?; + let input_plan = self + .build(s_expr.unary_child(), child_required.clone()) + .await?; return Ok(PhysicalPlan::new(WindowPartition { meta: PhysicalPlanMeta::new("WindowPartition"), @@ -444,7 +446,7 @@ impl PhysicalPlanBuilder { let enable_fixed_rows = settings.get_enable_fixed_rows_sort()?; let Some(after_exchange) = sort.after_exchange else { - let input_plan = self.build(s_expr.unary_child(), required).await?; + let input_plan = self.build(s_expr.unary_child(), child_required).await?; return Ok(PhysicalPlan::new(Sort { input: input_plan, order_by, diff --git a/src/query/service/src/physical_plans/physical_udf.rs b/src/query/service/src/physical_plans/physical_udf.rs index 1dbe22efb8682..3bcf6c2eff211 100644 --- a/src/query/service/src/physical_plans/physical_udf.rs +++ b/src/query/service/src/physical_plans/physical_udf.rs @@ -160,23 +160,26 @@ impl PhysicalPlanBuilder { &mut self, s_expr: &SExpr, udf_plan: &databend_common_sql::plans::Udf, - mut required: ColumnSet, + required: ColumnSet, stat_info: PlanStatsInfo, ) -> Result { // 1. Prune unused Columns. let mut used = vec![]; for item in udf_plan.items.iter() { if required.contains(&item.index) { - required.extend(item.scalar.used_columns()); used.push(item.clone()); } } + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); + // 2. Build physical plan. if used.is_empty() { - return self.build(s_expr.child(0)?, required).await; + return self.build(s_expr.child(0)?, child_required).await; } - let input = self.build(s_expr.child(0)?, required).await?; + let input = self.build(s_expr.child(0)?, child_required).await?; let input_schema = input.output_schema()?; let udf_funcs = used diff --git a/src/query/service/src/physical_plans/physical_window.rs b/src/query/service/src/physical_plans/physical_window.rs index 090997e0e9407..b38b46333dfd5 100644 --- a/src/query/service/src/physical_plans/physical_window.rs +++ b/src/query/service/src/physical_plans/physical_window.rs @@ -323,7 +323,7 @@ impl PhysicalPlanBuilder { &mut self, s_expr: &SExpr, window: &databend_common_sql::plans::Window, - mut required: ColumnSet, + required: ColumnSet, _stat_info: PlanStatsInfo, ) -> Result { // 1. DO NOT Prune unused Columns cause window may not in required, eg: @@ -334,21 +334,12 @@ impl PhysicalPlanBuilder { // The scalar items in window function is not replaced yet. // The will be replaced in physical plan builder. - window.arguments.iter().for_each(|item| { - required.extend(item.scalar.used_columns()); - required.insert(item.index); - }); - window.partition_by.iter().for_each(|item| { - required.extend(item.scalar.used_columns()); - required.insert(item.index); - }); - window.order_by.iter().for_each(|item| { - required.extend(item.order_by_item.scalar.used_columns()); - required.insert(item.order_by_item.index); - }); + let mut child_required = self.derive_child_required_columns(s_expr, &required)?; + debug_assert_eq!(child_required.len(), s_expr.arity()); + let child_required = child_required.remove(0); // 2. Build physical plan. - let input = self.build(s_expr.child(0)?, required).await?; + let input = self.build(s_expr.child(0)?, child_required).await?; let mut w = window.clone(); let input_schema = input.output_schema()?; diff --git a/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/analyze.rs b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/analyze.rs index de035c03072f9..848a712b96b0a 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/analyze.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/common_subexpression/analyze.rs @@ -60,10 +60,10 @@ fn process_candidate_expressions( for (path, expr) in candidates { let cte_ref_columns = expr.derive_relational_prop()?.output_columns.clone(); - let column_mapping = cte_def_columns + let column_mapping = cte_ref_columns .iter() .copied() - .zip(cte_ref_columns.iter().copied()) + .zip(cte_def_columns.iter().copied()) .collect::>(); let cte_ref = MaterializedCTERef { cte_name: cte_name.clone(),