diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index fda4b7808b0ea..4051cf8b9922b 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -133,7 +133,7 @@ impl PhysicalPlanBuilder { self.build_materialized_cte(s_expr, materialized_cte, stat_info) .await } - RelOperator::MaterializeCTERef(cte_consumer) => { + RelOperator::MaterializedCTERef(cte_consumer) => { self.build_cte_consumer(cte_consumer, stat_info).await } RelOperator::Sequence(sequence) => { diff --git a/src/query/sql/src/executor/physical_plans/physical_cte_consumer.rs b/src/query/sql/src/executor/physical_plans/physical_cte_consumer.rs index 18a205a9f8809..66f55f579b7fb 100644 --- a/src/query/sql/src/executor/physical_plans/physical_cte_consumer.rs +++ b/src/query/sql/src/executor/physical_plans/physical_cte_consumer.rs @@ -41,7 +41,7 @@ impl MaterializeCTERef { impl PhysicalPlanBuilder { pub(crate) async fn build_cte_consumer( &mut self, - cte_consumer: &crate::plans::MaterializeCTERef, + cte_consumer: &crate::plans::MaterializedCTERef, stat_info: PlanStatsInfo, ) -> Result { let mut fields = Vec::new(); diff --git a/src/query/sql/src/planner/binder/bind_context.rs b/src/query/sql/src/planner/binder/bind_context.rs index 9a20026dc631d..71c0f5829061a 100644 --- a/src/query/sql/src/planner/binder/bind_context.rs +++ b/src/query/sql/src/planner/binder/bind_context.rs @@ -43,6 +43,7 @@ use crate::binder::project_set::SetReturningInfo; use crate::binder::window::WindowInfo; use crate::binder::ColumnBindingBuilder; use crate::normalize_identifier; +use crate::optimizer::ir::SExpr; use crate::plans::ScalarExpr; use crate::ColumnSet; use crate::IndexType; @@ -197,9 +198,15 @@ impl CteContext { pub struct CteInfo { pub columns_alias: Vec, pub query: Query, - pub materialized: bool, pub recursive: bool, pub columns: Vec, + pub materialized_cte_info: Option, +} + +#[derive(Clone, Debug)] +pub struct MaterializedCTEInfo { + pub bound_s_expr: SExpr, + pub bound_context: BindContext, } impl BindContext { diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs index 224bcb5dcf72a..34b335a39209c 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use databend_common_ast::ast::Query; @@ -25,18 +26,21 @@ use crate::binder::BindContext; use crate::binder::Binder; use crate::binder::CteContext; use crate::binder::CteInfo; +use crate::binder::MaterializedCTEInfo; use crate::normalize_identifier; use crate::optimizer::ir::SExpr; -use crate::plans::MaterializeCTERef; use crate::plans::MaterializedCTE; +use crate::plans::MaterializedCTERef; use crate::plans::RelOperator; use crate::plans::Sequence; +use crate::ColumnBinding; impl Binder { pub fn init_cte(&mut self, bind_context: &mut BindContext, with: &Option) -> Result<()> { let Some(with) = with else { return Ok(()); }; + for cte in with.ctes.iter() { let cte_name = self.normalize_identifier(&cte.alias.name).name; if bind_context.cte_context.cte_map.contains_key(&cte_name) { @@ -45,6 +49,21 @@ impl Binder { ))); } + let materialized_cte_info = if cte.materialized { + let (s_expr, cte_bind_context) = self.bind_cte_definition( + &cte_name, + &bind_context.cte_context.cte_map, + &cte.query, + )?; + let materialized_cte_info = MaterializedCTEInfo { + bound_s_expr: s_expr, + bound_context: cte_bind_context, + }; + Some(materialized_cte_info) + } else { + None + }; + let column_name = cte .alias .columns @@ -56,8 +75,8 @@ impl Binder { columns_alias: column_name, query: *cte.query.clone(), recursive: with.recursive, - materialized: cte.materialized, columns: vec![], + materialized_cte_info, }; bind_context.cte_context.cte_map.insert(cte_name, cte_info); } @@ -70,6 +89,7 @@ impl Binder { table_name: &str, alias: &Option, cte_info: &CteInfo, + producer_column_bindings: &[ColumnBinding], ) -> Result<(SExpr, BindContext)> { let (s_expr, cte_bind_context) = self.bind_cte_definition( table_name, @@ -117,15 +137,24 @@ impl Binder { let output_columns = cte_output_columns.iter().map(|c| c.index).collect(); let mut new_bind_context = bind_context.clone(); - for column in cte_output_columns { - new_bind_context.add_column_binding(column); + for column in cte_output_columns.iter() { + new_bind_context.add_column_binding(column.clone()); } - let s_expr = SExpr::create_leaf(Arc::new(RelOperator::MaterializeCTERef( - MaterializeCTERef { + let mut column_mapping = HashMap::new(); + for (index_in_ref, index_in_producer) in cte_output_columns + .iter() + .zip(producer_column_bindings.iter()) + { + column_mapping.insert(index_in_ref.index, index_in_producer.index); + } + + let s_expr = SExpr::create_leaf(Arc::new(RelOperator::MaterializedCTERef( + MaterializedCTERef { cte_name: table_name.to_string(), output_columns, def: s_expr, + column_mapping, }, ))); Ok((s_expr, new_bind_context)) @@ -164,13 +193,16 @@ impl Binder { let mut current_expr = main_query_expr; for cte in with.ctes.iter().rev() { - if cte.materialized { - let cte_name = self.normalize_identifier(&cte.alias.name).name; - let (s_expr, bind_context) = - self.bind_cte_definition(&cte_name, &cte_context.cte_map, &cte.query)?; + let cte_name = self.normalize_identifier(&cte.alias.name).name; + let cte_info = cte_context.cte_map.get(&cte_name).ok_or_else(|| { + ErrorCode::Internal(format!("CTE '{}' not found in context", cte_name)) + })?; + if let Some(materialized_cte_info) = &cte_info.materialized_cte_info { + let s_expr = materialized_cte_info.bound_s_expr.clone(); + let bind_context = materialized_cte_info.bound_context.clone(); let materialized_cte = - MaterializedCTE::new(cte_name, Some(bind_context.columns), None); + MaterializedCTE::new(cte_name, Some(bind_context.columns.clone()), None); let materialized_cte = SExpr::create_unary(materialized_cte, s_expr); let sequence = Sequence {}; current_expr = SExpr::create_binary(sequence, materialized_cte, current_expr); diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs index d43bc3f7dfbba..c13415e2baee9 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs @@ -79,8 +79,14 @@ impl Binder { let cte_map = bind_context.cte_context.cte_map.clone(); if let Some(cte_info) = cte_map.get(&table_name) { - if cte_info.materialized { - return self.bind_cte_consumer(bind_context, &table_name, alias, cte_info); + if let Some(materialized_cte_info) = &cte_info.materialized_cte_info { + return self.bind_cte_consumer( + bind_context, + &table_name, + alias, + cte_info, + &materialized_cte_info.bound_context.columns, + ); } else { if self .metadata diff --git a/src/query/sql/src/planner/optimizer/ir/format.rs b/src/query/sql/src/planner/optimizer/ir/format.rs index 957e4c77c35b7..6e2163b78eb9e 100644 --- a/src/query/sql/src/planner/optimizer/ir/format.rs +++ b/src/query/sql/src/planner/optimizer/ir/format.rs @@ -81,7 +81,7 @@ fn display_rel_op(rel_op: &RelOperator) -> String { RelOperator::MutationSource(_) => "MutationSource".to_string(), RelOperator::CompactBlock(_) => "CompactBlock".to_string(), RelOperator::MaterializedCTE(_) => "MaterializedCTE".to_string(), - RelOperator::MaterializeCTERef(_) => "MaterializeCTERef".to_string(), + RelOperator::MaterializedCTERef(_) => "MaterializeCTERef".to_string(), RelOperator::Sequence(_) => "Sequence".to_string(), } } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 56dd2240017ca..1d91b48fe03fd 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -36,6 +36,7 @@ use crate::optimizer::optimizers::operator::SubqueryDecorrelatorOptimizer; use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer; use crate::optimizer::optimizers::rule::RuleID; use crate::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES; +use crate::optimizer::optimizers::CTEFilterPushdownOptimizer; use crate::optimizer::optimizers::CascadesOptimizer; use crate::optimizer::optimizers::DPhpyOptimizer; use crate::optimizer::pipeline::OptimizerPipeline; @@ -254,32 +255,34 @@ pub async fn optimize_query(opt_ctx: Arc, s_expr: SExpr) -> Re opt_ctx.clone(), &DEFAULT_REWRITE_RULES, )) - // 8. Run post rewrite rules + // 8. CTE filter pushdown optimization + .add(CTEFilterPushdownOptimizer::new(opt_ctx.clone())) + // 9. Run post rewrite rules .add(RecursiveRuleOptimizer::new(opt_ctx.clone(), &[ RuleID::SplitAggregate, ])) - // 9. Apply DPhyp algorithm for cost-based join reordering + // 10. Apply DPhyp algorithm for cost-based join reordering .add(DPhpyOptimizer::new(opt_ctx.clone())) - // 10. After join reorder, Convert some single join to inner join. + // 11. After join reorder, Convert some single join to inner join. .add(SingleToInnerOptimizer::new()) - // 11. Deduplicate join conditions. + // 12. Deduplicate join conditions. .add(DeduplicateJoinConditionOptimizer::new()) - // 12. Apply join commutativity to further optimize join ordering + // 13. Apply join commutativity to further optimize join ordering .add_if( opt_ctx.get_enable_join_reorder(), RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::CommuteJoin].as_slice()), ) - // 13. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case. + // 14. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case. .add(CascadesOptimizer::new(opt_ctx.clone())?) - // 14. Eliminate unnecessary scalar calculations to clean up the final plan + // 15. Eliminate unnecessary scalar calculations to clean up the final plan .add_if( !opt_ctx.get_planning_agg_index(), RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::EliminateEvalScalar].as_slice()), ) - // 15. Clean up unused CTEs + // 16. Clean up unused CTEs .add(CleanupUnusedCTEOptimizer); - // 15. Execute the pipeline + // 17. Execute the pipeline let s_expr = pipeline.execute().await?; Ok(s_expr) diff --git a/src/query/sql/src/planner/optimizer/optimizers/cte_filter_pushdown.rs b/src/query/sql/src/planner/optimizer/optimizers/cte_filter_pushdown.rs new file mode 100644 index 0000000000000..da3e34dc15c9d --- /dev/null +++ b/src/query/sql/src/planner/optimizer/optimizers/cte_filter_pushdown.rs @@ -0,0 +1,200 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use databend_common_ast::Span; +use databend_common_exception::Result; + +use crate::optimizer::ir::SExpr; +use crate::optimizer::optimizers::operator::PullUpFilterOptimizer; +use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer; +use crate::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES; +use crate::optimizer::Optimizer; +use crate::optimizer::OptimizerContext; +use crate::plans::BoundColumnRef; +use crate::plans::Filter; +use crate::plans::FunctionCall; +use crate::plans::RelOperator; +use crate::plans::ScalarExpr; +use crate::plans::VisitorMut; + +pub struct CTEFilterPushdownOptimizer { + cte_filters: HashMap>>, + pull_up_filter_optimizer: PullUpFilterOptimizer, + rule_optimizer: RecursiveRuleOptimizer, +} + +struct ColumnMappingRewriter { + mapping: HashMap, +} + +impl VisitorMut<'_> for ColumnMappingRewriter { + fn visit_bound_column_ref(&mut self, col: &mut BoundColumnRef) -> Result<()> { + if let Some(&new_index) = self.mapping.get(&col.column.index) { + col.column.index = new_index; + } + Ok(()) + } +} + +impl CTEFilterPushdownOptimizer { + pub fn new(ctx: Arc) -> Self { + let pull_up_filter_optimizer = PullUpFilterOptimizer::new(ctx.clone()); + let inner_optimizer = RecursiveRuleOptimizer::new(ctx.clone(), &DEFAULT_REWRITE_RULES); + Self { + cte_filters: HashMap::new(), + pull_up_filter_optimizer, + rule_optimizer: inner_optimizer, + } + } + + #[recursive::recursive] + fn collect_filters(&mut self, s_expr: &SExpr) -> Result<()> { + if let RelOperator::Filter(filter) = s_expr.plan() { + let child = s_expr.child(0)?; + if let RelOperator::MaterializedCTERef(cte) = child.plan() { + let mut and_predicate = if filter.predicates.len() == 1 { + filter.predicates[0].clone() + } else { + filter.predicates.iter().skip(1).fold( + filter.predicates[0].clone(), + |acc, pred| { + ScalarExpr::FunctionCall(FunctionCall { + span: Span::None, + func_name: "and".to_string(), + params: vec![], + arguments: vec![acc, pred.clone()], + }) + }, + ) + }; + + let mut rewriter = ColumnMappingRewriter { + mapping: cte.column_mapping.clone(), + }; + rewriter.visit(&mut and_predicate)?; + + match self.cte_filters.get_mut(&cte.cte_name) { + Some(Some(predicates)) => { + predicates.push(and_predicate); + } + Some(None) => { + // Already marked as None, do nothing + } + None => { + self.cte_filters + .insert(cte.cte_name.clone(), Some(vec![and_predicate])); + } + } + } + } else { + for child in s_expr.children() { + if let RelOperator::MaterializedCTERef(cte) = child.plan() { + self.cte_filters.insert(cte.cte_name.clone(), None); + } + } + } + + for child in s_expr.children() { + self.collect_filters(child)?; + } + + Ok(()) + } + + #[recursive::recursive] + fn add_filters_to_ctes(&self, s_expr: &SExpr) -> Result { + let new_children = s_expr + .children() + .map(|child| self.add_filters_to_ctes(child)) + .collect::>>()?; + + let mut result = if new_children + .iter() + .zip(s_expr.children()) + .any(|(new, old)| !new.eq(old)) + { + s_expr.replace_children(new_children.into_iter().map(Arc::new)) + } else { + s_expr.clone() + }; + + if let RelOperator::MaterializedCTE(cte) = s_expr.plan() { + if let Some(Some(predicates)) = self.cte_filters.get(&cte.cte_name) { + if !predicates.is_empty() { + log::info!("Pushing predicates to CTE {}", cte.cte_name); + let or_predicate = if predicates.len() == 1 { + predicates[0].clone() + } else { + predicates + .iter() + .skip(1) + .fold(predicates[0].clone(), |acc, pred| { + ScalarExpr::FunctionCall(FunctionCall { + span: Span::None, + func_name: "or".to_string(), + params: vec![], + arguments: vec![acc, pred.clone()], + }) + }) + }; + + let filter = Filter { + predicates: vec![or_predicate], + }; + + let filter_expr = SExpr::create_unary( + Arc::new(RelOperator::Filter(filter)), + Arc::new(result.child(0)?.clone()), + ); + + result = result.replace_children(vec![Arc::new(filter_expr)]); + } + } + } + + Ok(result) + } +} + +#[async_trait] +impl Optimizer for CTEFilterPushdownOptimizer { + async fn optimize(&mut self, s_expr: &SExpr) -> Result { + self.cte_filters.clear(); + + self.collect_filters(s_expr)?; + + if self.cte_filters.iter().all(|(_, v)| v.is_none()) { + return Ok(s_expr.clone()); + } + + let expr_with_filters = self.add_filters_to_ctes(s_expr)?; + + let expr_with_pulled_up_filters = self + .pull_up_filter_optimizer + .optimize(&expr_with_filters) + .await?; + + self.rule_optimizer + .optimize(&expr_with_pulled_up_filters) + .await + } + + fn name(&self) -> String { + "CTEFilterPushdownOptimizer".to_string() + } +} diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs index fd90d73eaa962..b726fde014e8d 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs @@ -297,7 +297,7 @@ impl DPhpyOptimizer { join_relation: Option<&SExpr>, ) -> Result<(Arc, bool)> { let cte_consumer = match s_expr.plan() { - RelOperator::MaterializeCTERef(consumer) => consumer, + RelOperator::MaterializedCTERef(consumer) => consumer, _ => unreachable!(), }; @@ -332,7 +332,7 @@ impl DPhpyOptimizer { table_indexes.push(scan.table_index); } - if let RelOperator::MaterializeCTERef(cte_consumer) = s_expr.plan() { + if let RelOperator::MaterializedCTERef(cte_consumer) = s_expr.plan() { Self::collect_table_indexes_recursive(&cte_consumer.def, table_indexes); } @@ -404,7 +404,7 @@ impl DPhpyOptimizer { RelOperator::Join(_) => self.process_join_node(s_expr, join_conditions).await, RelOperator::Sequence(_) => self.process_sequence_node(s_expr).await, - RelOperator::MaterializeCTERef(_) => { + RelOperator::MaterializedCTERef(_) => { self.process_cte_consumer_node(s_expr, join_relation).await } diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs index be3032562a04b..d035d4f6248f3 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs @@ -93,7 +93,7 @@ pub async fn dynamic_sample( | RelOperator::Mutation(_) | RelOperator::CompactBlock(_) | RelOperator::MaterializedCTE(_) - | RelOperator::MaterializeCTERef(_) + | RelOperator::MaterializedCTERef(_) | RelOperator::Sequence(_) | RelOperator::MutationSource(_) => { s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)) diff --git a/src/query/sql/src/planner/optimizer/optimizers/mod.rs b/src/query/sql/src/planner/optimizer/optimizers/mod.rs index aa38f80db04fc..6fa380d8d997b 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/mod.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod cascades; +pub mod cte_filter_pushdown; pub mod distributed; mod hyper_dp; pub mod operator; @@ -20,5 +21,6 @@ pub mod recursive; pub mod rule; pub use cascades::CascadesOptimizer; +pub use cte_filter_pushdown::CTEFilterPushdownOptimizer; pub use hyper_dp::DPhpyOptimizer; pub use operator::CleanupUnusedCTEOptimizer; diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs index 958859d155c53..7f6274efc445f 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/cte/cleanup_unused_cte.rs @@ -41,7 +41,7 @@ impl CleanupUnusedCTEOptimizer { referenced_ctes: &mut HashMap, ) -> Result<()> { // Check if current node is a MaterializeCTERef - if let RelOperator::MaterializeCTERef(consumer) = s_expr.plan() { + if let RelOperator::MaterializedCTERef(consumer) = s_expr.plan() { *referenced_ctes .entry(consumer.cte_name.clone()) .or_insert(0) += 1; diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs index f94cc0cb2ba30..e2197fb50a80e 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs @@ -335,7 +335,7 @@ impl SubqueryDecorrelatorOptimizer { | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::MutationSource(_) - | RelOperator::MaterializeCTERef(_) + | RelOperator::MaterializedCTERef(_) | RelOperator::CompactBlock(_) => Ok(s_expr.clone()), } } diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_grouping_sets_to_union.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_grouping_sets_to_union.rs index d00e40203f015..7d6c82ba27e76 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_grouping_sets_to_union.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_grouping_sets_to_union.rs @@ -34,8 +34,8 @@ use crate::plans::AggregateMode; use crate::plans::CastExpr; use crate::plans::ConstantExpr; use crate::plans::EvalScalar; -use crate::plans::MaterializeCTERef; use crate::plans::MaterializedCTE; +use crate::plans::MaterializedCTERef; use crate::plans::RelOp; use crate::plans::Sequence; use crate::plans::UnionAll; @@ -106,6 +106,11 @@ impl Rule for RuleGroupingSetsToUnion { .cloned() .collect(); + let column_mapping = agg_input_columns + .iter() + .map(|index| (*index, *index)) + .collect(); + if let Some(grouping_sets) = &agg.grouping_sets { if !grouping_sets.sets.is_empty() { let mut children = Vec::with_capacity(grouping_sets.sets.len()); @@ -127,10 +132,11 @@ impl Rule for RuleGroupingSetsToUnion { agg_input.clone(), ); - let cte_consumer = SExpr::create_leaf(MaterializeCTERef { + let cte_consumer = SExpr::create_leaf(MaterializedCTERef { cte_name: temp_cte_name, output_columns: agg_input_columns.clone(), def: agg_input.clone(), + column_mapping, }); let mask = (1 << grouping_sets.dup_group_items.len()) - 1; diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs index 00449c81fb277..06b8ec88ac9c7 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs @@ -153,7 +153,7 @@ fn find_group_by_keys( | RelOperator::Mutation(_) | RelOperator::MutationSource(_) | RelOperator::MaterializedCTE(_) - | RelOperator::MaterializeCTERef(_) + | RelOperator::MaterializedCTERef(_) | RelOperator::CompactBlock(_) | RelOperator::Sequence(_) => {} } diff --git a/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs b/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs index 8f6ebd8ebda47..f992ebadaeeef 100644 --- a/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs +++ b/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs @@ -142,12 +142,12 @@ impl CollectStatisticsOptimizer { } Ok(s_expr) } - RelOperator::MaterializeCTERef(cte_ref) => { + RelOperator::MaterializedCTERef(cte_ref) => { let def_with_stats = self.collect(&cte_ref.def).await?; let mut new_cte_ref = cte_ref.clone(); new_cte_ref.def = def_with_stats; - Ok(s_expr.replace_plan(Arc::new(RelOperator::MaterializeCTERef(new_cte_ref)))) + Ok(s_expr.replace_plan(Arc::new(RelOperator::MaterializedCTERef(new_cte_ref)))) } _ => { let mut children = Vec::with_capacity(s_expr.arity()); diff --git a/src/query/sql/src/planner/plans/cte_consumer.rs b/src/query/sql/src/planner/plans/cte_consumer.rs index 7b2f68db4c2ef..95d80abc0ad17 100644 --- a/src/query/sql/src/planner/plans/cte_consumer.rs +++ b/src/query/sql/src/planner/plans/cte_consumer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::hash::Hash; use std::hash::Hasher; use std::sync::Arc; @@ -31,19 +32,20 @@ use crate::plans::Operator; use crate::plans::RelOp; #[derive(Clone, Debug, PartialEq, Eq)] -pub struct MaterializeCTERef { +pub struct MaterializedCTERef { pub cte_name: String, pub output_columns: Vec, pub def: SExpr, + pub column_mapping: HashMap, } -impl Hash for MaterializeCTERef { +impl Hash for MaterializedCTERef { fn hash(&self, state: &mut H) { self.cte_name.hash(state); } } -impl Operator for MaterializeCTERef { +impl Operator for MaterializedCTERef { fn rel_op(&self) -> RelOp { RelOp::MaterializeCTERef } diff --git a/src/query/sql/src/planner/plans/operator.rs b/src/query/sql/src/planner/plans/operator.rs index 38e249052d85b..5e3e70664b8c7 100644 --- a/src/query/sql/src/planner/plans/operator.rs +++ b/src/query/sql/src/planner/plans/operator.rs @@ -43,8 +43,8 @@ use crate::plans::ExpressionScan; use crate::plans::Filter; use crate::plans::Join; use crate::plans::Limit; -use crate::plans::MaterializeCTERef; use crate::plans::MaterializedCTE; +use crate::plans::MaterializedCTERef; use crate::plans::Mutation; use crate::plans::OptimizeCompactBlock as CompactBlock; use crate::plans::ProjectSet; @@ -167,7 +167,7 @@ pub enum RelOperator { CompactBlock(CompactBlock), MutationSource(MutationSource), MaterializedCTE(MaterializedCTE), - MaterializeCTERef(MaterializeCTERef), + MaterializedCTERef(MaterializedCTERef), Sequence(Sequence), } @@ -272,6 +272,6 @@ impl_try_from_rel_operator! { CompactBlock, MutationSource, MaterializedCTE, - MaterializeCTERef, + MaterializedCTERef, Sequence } diff --git a/src/query/sql/src/planner/plans/operator_macros.rs b/src/query/sql/src/planner/plans/operator_macros.rs index d3a2f176f33a4..6cf5f7c9a2135 100644 --- a/src/query/sql/src/planner/plans/operator_macros.rs +++ b/src/query/sql/src/planner/plans/operator_macros.rs @@ -111,7 +111,7 @@ macro_rules! impl_match_rel_op { RelOperator::CompactBlock($rel_op) => $rel_op.$method($($arg),*), RelOperator::MutationSource($rel_op) => $rel_op.$method($($arg),*), RelOperator::MaterializedCTE($rel_op) => $rel_op.$method($($arg),*), - RelOperator::MaterializeCTERef($rel_op) => $rel_op.$method($($arg),*), + RelOperator::MaterializedCTERef($rel_op) => $rel_op.$method($($arg),*), RelOperator::Sequence($rel_op) => $rel_op.$method($($arg),*), } } diff --git a/tests/sqllogictests/suites/mode/standalone/explain/cte_filter_pushdown.test b/tests/sqllogictests/suites/mode/standalone/explain/cte_filter_pushdown.test new file mode 100644 index 0000000000000..256a17d1b0967 --- /dev/null +++ b/tests/sqllogictests/suites/mode/standalone/explain/cte_filter_pushdown.test @@ -0,0 +1,653 @@ +# Test cases for CTE filter pushdown optimization +# This test verifies that filters can be correctly pushed down into materialized CTEs + +statement ok +create table t (a int, b int, c int); + +statement ok +insert into t values (1, 10, 100), (1, 20, 200), (2, 30, 300), (2, 40, 400), (3, 50, 500); + +# Test 1: Basic CTE filter pushdown without aggregation +# Filter on base columns should be pushed down +query T +explain with cte1 as materialized ( + select a, b from t +) +select * from cte1 where a = 1; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.a (#0) = 1)] +│ ├── estimated rows: 1.67 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true(t.a (#0) = 1)], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#3), t.b (#4)] + ├── filters: [is_true(cte1.a (#3) = 1)] + ├── estimated rows: 1.67 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#3), b (#4)] + └── estimated rows: 5.00 + +# Test 2: CTE with aggregation - filter on GROUP BY column should be pushed down +query T +explain with cte1 as materialized ( + select a, count(*) as cnt, sum(b) as sum_b + from t + group by a +) +select * from cte1 where a = 1; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 1.00 +│ └── AggregatePartial +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 1.00 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.a (#0) = 1)] +│ ├── estimated rows: 1.67 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true(t.a (#0) = 1)], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#5), COUNT(*) (#8), sum(b) (#9)] + ├── filters: [is_true(cte1.a (#5) = 1)] + ├── estimated rows: 1.00 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#5), COUNT(*) (#8), sum(b) (#9)] + └── estimated rows: 3.00 + +# Test 3: CTE with aggregation - filter on aggregate result should NOT be pushed down +query T +explain with cte1 as materialized ( + select a, count(*) as cnt, sum(b) as sum_b + from t + group by a +) +select * from cte1 where sum_b > 25; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── Filter +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── filters: [is_true(sum(b) (#4) > 25)] +│ ├── estimated rows: 0.00 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 3.00 +│ └── AggregatePartial +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 3.00 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#5), COUNT(*) (#8), sum(b) (#9)] + ├── filters: [is_true(cte1.sum_b (#9) > 25)] + ├── estimated rows: 0.00 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#5), COUNT(*) (#8), sum(b) (#9)] + └── estimated rows: 3.00 + +# Test 4: Mixed filters - GROUP BY column filter should be pushed down, aggregate filter should remain +query T +explain with cte1 as materialized ( + select a, count(*) as cnt, sum(b) as sum_b + from t + group by a +) +select * from cte1 where a > 1 and sum_b > 25; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── Filter +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── filters: [is_true(sum(b) (#4) > 25)] +│ ├── estimated rows: 0.00 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 2.00 +│ └── AggregatePartial +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 2.00 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.a (#0) > 1)] +│ ├── estimated rows: 3.33 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true(t.a (#0) > 1)], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#5), COUNT(*) (#8), sum(b) (#9)] + ├── filters: [is_true(cte1.a (#5) > 1), is_true(cte1.sum_b (#9) > 25)] + ├── estimated rows: 0.00 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#5), COUNT(*) (#8), sum(b) (#9)] + └── estimated rows: 3.00 + +# Test 5: Multiple CTE references with different filters +# Should create OR condition in CTE definition +query T +explain with cte1 as materialized ( + select a, count(*) as cnt + from t + group by a +) +select * from cte1 as c1 where c1.a = 1 +union all +select * from cte1 as c2 where c2.a = 2; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), t.a (#0)] +│ ├── group by: [a] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 2.00 +│ └── AggregatePartial +│ ├── group by: [a] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 2.00 +│ └── Filter +│ ├── output columns: [t.a (#0)] +│ ├── filters: [is_true((t.a (#0) = 1 OR t.a (#0) = 2))] +│ ├── estimated rows: 2.78 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true((t.a (#0) = 1 OR t.a (#0) = 2))], limit: NONE] +│ └── estimated rows: 5.00 +└── UnionAll + ├── output columns: [a (#12), cnt (#13)] + ├── estimated rows: 2.00 + ├── Filter + │ ├── output columns: [t.a (#4), COUNT(*) (#7)] + │ ├── filters: [is_true(c1.a (#4) = 1)] + │ ├── estimated rows: 1.00 + │ └── MaterializeCTERef + │ ├── cte_name: cte1 + │ ├── cte_schema: [a (#4), COUNT(*) (#7)] + │ └── estimated rows: 3.00 + └── Filter + ├── output columns: [t.a (#8), COUNT(*) (#11)] + ├── filters: [is_true(c2.a (#8) = 2)] + ├── estimated rows: 1.00 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#8), COUNT(*) (#11)] + └── estimated rows: 3.00 + +# Test 6: Multiple CTE references with three different filters +# Should create complex OR condition in CTE definition +query T +explain with cte1 as materialized ( + select a, count(*) as cnt + from t + group by a +) +select * from cte1 where a = 1 +union all +select * from cte1 where a = 2 +union all +select * from cte1 where a = 3; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), t.a (#0)] +│ ├── group by: [a] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 3.00 +│ └── AggregatePartial +│ ├── group by: [a] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 3.00 +│ └── Filter +│ ├── output columns: [t.a (#0)] +│ ├── filters: [is_true(((t.a (#0) = 1 OR t.a (#0) = 2) OR t.a (#0) = 3))] +│ ├── estimated rows: 3.52 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true(((t.a (#0) = 1 OR t.a (#0) = 2) OR t.a (#0) = 3))], limit: NONE] +│ └── estimated rows: 5.00 +└── UnionAll + ├── output columns: [a (#18), cnt (#19)] + ├── estimated rows: 3.00 + ├── UnionAll + │ ├── output columns: [a (#12), cnt (#13)] + │ ├── estimated rows: 2.00 + │ ├── Filter + │ │ ├── output columns: [t.a (#4), COUNT(*) (#7)] + │ │ ├── filters: [is_true(cte1.a (#4) = 1)] + │ │ ├── estimated rows: 1.00 + │ │ └── MaterializeCTERef + │ │ ├── cte_name: cte1 + │ │ ├── cte_schema: [a (#4), COUNT(*) (#7)] + │ │ └── estimated rows: 3.00 + │ └── Filter + │ ├── output columns: [t.a (#8), COUNT(*) (#11)] + │ ├── filters: [is_true(cte1.a (#8) = 2)] + │ ├── estimated rows: 1.00 + │ └── MaterializeCTERef + │ ├── cte_name: cte1 + │ ├── cte_schema: [a (#8), COUNT(*) (#11)] + │ └── estimated rows: 3.00 + └── Filter + ├── output columns: [t.a (#14), COUNT(*) (#17)] + ├── filters: [is_true(cte1.a (#14) = 3)] + ├── estimated rows: 1.00 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#14), COUNT(*) (#17)] + └── estimated rows: 3.00 + +# Test 8: CTE with existing filter condition and additional pushdown filter +query T +explain with cte1 as materialized ( + select a, b from t where a > 0 +) +select * from cte1 where a = 1 and b < 50; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.a (#0) > 0), is_true(t.a (#0) = 1), is_true(t.b (#1) < 50)] +│ ├── estimated rows: 1.67 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [and_filters(and_filters(t.a (#0) > 0, t.a (#0) = 1), t.b (#1) < 50)], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#3), t.b (#4)] + ├── filters: [is_true(cte1.a (#3) = 1), is_true(cte1.b (#4) < 50)] + ├── estimated rows: 1.67 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#3), b (#4)] + └── estimated rows: 5.00 + +# Test 9: CTE with aggregation and HAVING clause +query T +explain with cte1 as materialized ( + select a, count(*) as cnt, sum(b) as sum_b + from t + group by a + having count(*) > 0 +) +select * from cte1 where a > 1; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── Filter +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── filters: [COUNT(*) (#3) > 0] +│ ├── estimated rows: 0.00 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 2.00 +│ └── AggregatePartial +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 2.00 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.a (#0) > 1)] +│ ├── estimated rows: 3.33 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true(t.a (#0) > 1)], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#5), COUNT(*) (#8), sum(b) (#9)] + ├── filters: [is_true(cte1.a (#5) > 1)] + ├── estimated rows: 0.00 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#5), COUNT(*) (#8), sum(b) (#9)] + └── estimated rows: 0.00 + +# Test 10: Complex aggregation with multiple GROUP BY columns +query T +explain with cte1 as materialized ( + select a, b, count(*) as cnt + from t + group by a, b +) +select * from cte1 where a = 1 and b > 15; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), t.a (#0), t.b (#1)] +│ ├── group by: [a, b] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 1.67 +│ └── AggregatePartial +│ ├── group by: [a, b] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 1.67 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.a (#0) = 1), is_true(t.b (#1) > 15)] +│ ├── estimated rows: 1.67 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [and_filters(t.a (#0) = 1, t.b (#1) > 15)], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#4), t.b (#5), COUNT(*) (#7)] + ├── filters: [is_true(cte1.a (#4) = 1), is_true(cte1.b (#5) > 15)] + ├── estimated rows: 1.67 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#4), b (#5), COUNT(*) (#7)] + └── estimated rows: 5.00 + +# Test 11: Test case that demonstrates nested Filter issue +# This should show merged filters, not nested Filter nodes +query T +explain with cte1 as materialized ( + select a, count(*) as cnt, sum(b) as sum_b + from t + group by a +) +select * from cte1 +where a = 1 -- Should be pushed down to before GROUP BY +and sum_b > 25; -- Should remain after aggregation +---- +Sequence +├── MaterializedCTE: cte1 +│ └── Filter +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── filters: [is_true(sum(b) (#4) > 25)] +│ ├── estimated rows: 0.00 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), sum(b) (#4), t.a (#0)] +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 1.00 +│ └── AggregatePartial +│ ├── group by: [a] +│ ├── aggregate functions: [count(), sum(b)] +│ ├── estimated rows: 1.00 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.a (#0) = 1)] +│ ├── estimated rows: 1.67 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true(t.a (#0) = 1)], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#5), COUNT(*) (#8), sum(b) (#9)] + ├── filters: [is_true(cte1.a (#5) = 1), is_true(cte1.sum_b (#9) > 25)] + ├── estimated rows: 0.00 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#5), COUNT(*) (#8), sum(b) (#9)] + └── estimated rows: 3.00 + +# Test 12: Test filter pushdown with IN clause +query T +explain with cte1 as materialized ( + select a, count(*) as cnt + from t + group by a +) +select * from cte1 where a in (1, 2); +---- +Sequence +├── MaterializedCTE: cte1 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), t.a (#0)] +│ ├── group by: [a] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 2.00 +│ └── AggregatePartial +│ ├── group by: [a] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 2.00 +│ └── Filter +│ ├── output columns: [t.a (#0)] +│ ├── filters: [is_true((t.a (#0) = 1 OR t.a (#0) = 2))] +│ ├── estimated rows: 2.78 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true((t.a (#0) = 1 OR t.a (#0) = 2))], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#4), COUNT(*) (#7)] + ├── filters: [is_true((cte1.a (#4) = 1 OR cte1.a (#4) = 2))] + ├── estimated rows: 1.67 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#4), COUNT(*) (#7)] + └── estimated rows: 3.00 + +# Test 13: Test filter pushdown with complex conditions +query T +explain with cte1 as materialized ( + select a, b, count(*) as cnt + from t + group by a, b +) +select * from cte1 where (a = 1 or a = 2) and b > 10; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── AggregateFinal +│ ├── output columns: [COUNT(*) (#3), t.a (#0), t.b (#1)] +│ ├── group by: [a, b] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 2.78 +│ └── AggregatePartial +│ ├── group by: [a, b] +│ ├── aggregate functions: [count()] +│ ├── estimated rows: 2.78 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.b (#1) > 10), is_true((t.a (#0) = 1 OR t.a (#0) = 2))] +│ ├── estimated rows: 2.78 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [and_filters(t.b (#1) > 10, (t.a (#0) = 1 OR t.a (#0) = 2))], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#4), t.b (#5), COUNT(*) (#7)] + ├── filters: [is_true(cte1.b (#5) > 10), is_true((cte1.a (#4) = 1 OR cte1.a (#4) = 2))] + ├── estimated rows: 2.78 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#4), b (#5), COUNT(*) (#7)] + └── estimated rows: 5.00 + +# Test 15: Test filter pushdown with subquery +query T +explain select * from ( + with cte1 as materialized ( + select a, count(*) as cnt from t group by a + ) + select * from cte1 where a > 1 +) where cnt > 0; +---- +Filter +├── output columns: [t.a (#4), COUNT(*) (#7)] +├── filters: [cte1.cnt (#7) > 0] +├── estimated rows: 0.00 +└── Sequence + ├── MaterializedCTE: cte1 + │ └── AggregateFinal + │ ├── output columns: [COUNT(*) (#3), t.a (#0)] + │ ├── group by: [a] + │ ├── aggregate functions: [count()] + │ ├── estimated rows: 2.00 + │ └── AggregatePartial + │ ├── group by: [a] + │ ├── aggregate functions: [count()] + │ ├── estimated rows: 2.00 + │ └── Filter + │ ├── output columns: [t.a (#0)] + │ ├── filters: [is_true(t.a (#0) > 1)] + │ ├── estimated rows: 3.33 + │ └── TableScan + │ ├── table: default.default.t + │ ├── output columns: [a (#0)] + │ ├── read rows: 5 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── pruning stats: [segments: , blocks: ] + │ ├── push downs: [filters: [is_true(t.a (#0) > 1)], limit: NONE] + │ └── estimated rows: 5.00 + └── Filter + ├── output columns: [t.a (#4), COUNT(*) (#7)] + ├── filters: [is_true(cte1.a (#4) > 1)] + ├── estimated rows: 2.00 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#4), COUNT(*) (#7)] + └── estimated rows: 3.00 + +# Test 17: CTE with window function +query T +explain with cte1 as materialized ( + select a, b, row_number() over (partition by a order by b) as rn + from t +) +select * from cte1 where a = 1; +---- +Sequence +├── MaterializedCTE: cte1 +│ └── Window +│ ├── output columns: [t.a (#0), t.b (#1), row_number() OVER (PARTITION BY a ORDER BY b) (#3)] +│ ├── aggregate function: [row_number] +│ ├── partition by: [a] +│ ├── order by: [b] +│ ├── frame: [Range: Preceding(None) ~ CurrentRow] +│ └── WindowPartition +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── hash keys: [a] +│ ├── estimated rows: 1.67 +│ └── Filter +│ ├── output columns: [t.a (#0), t.b (#1)] +│ ├── filters: [is_true(t.a (#0) = 1)] +│ ├── estimated rows: 1.67 +│ └── TableScan +│ ├── table: default.default.t +│ ├── output columns: [a (#0), b (#1)] +│ ├── read rows: 5 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [is_true(t.a (#0) = 1)], limit: NONE] +│ └── estimated rows: 5.00 +└── Filter + ├── output columns: [t.a (#4), t.b (#5), row_number() OVER (PARTITION BY a ORDER BY b) (#7)] + ├── filters: [is_true(cte1.a (#4) = 1)] + ├── estimated rows: 1.67 + └── MaterializeCTERef + ├── cte_name: cte1 + ├── cte_schema: [a (#4), b (#5), row_number() OVER (PARTITION BY a ORDER BY b) (#7)] + └── estimated rows: 5.00 + +statement ok +drop table t; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/materialized_cte.test b/tests/sqllogictests/suites/mode/standalone/explain/materialized_cte.test index 6776a69cfc4dd..ed7fe4e085699 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/materialized_cte.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/materialized_cte.test @@ -5,7 +5,7 @@ Sequence ├── MaterializedCTE: t1 │ └── TableScan │ ├── table: default.system.numbers -│ ├── output columns: [number (#2)] +│ ├── output columns: [number (#0)] │ ├── read rows: 10 │ ├── read size: < 1 KiB │ ├── partitions total: 1 @@ -13,22 +13,22 @@ Sequence │ ├── push downs: [filters: [], limit: NONE] │ └── estimated rows: 10.00 └── HashJoin - ├── output columns: [numbers.number (#0)] + ├── output columns: [numbers.number (#1)] ├── join type: INNER - ├── build keys: [t2.b (#1)] - ├── probe keys: [t1.a (#0)] + ├── build keys: [t2.b (#2)] + ├── probe keys: [t1.a (#1)] ├── keys is null equal: [false] ├── filters: [] ├── build join filters: - │ └── filter id:0, build key:t2.b (#1), probe key:t1.a (#0), filter type:inlist,min_max + │ └── filter id:0, build key:t2.b (#2), probe key:t1.a (#1), filter type:inlist,min_max ├── estimated rows: 100.00 ├── MaterializeCTERef(Build) │ ├── cte_name: t1 - │ ├── cte_schema: [number (#1)] + │ ├── cte_schema: [number (#2)] │ └── estimated rows: 10.00 └── MaterializeCTERef(Probe) ├── cte_name: t1 - ├── cte_schema: [number (#0)] + ├── cte_schema: [number (#1)] └── estimated rows: 10.00 query T @@ -38,7 +38,7 @@ Sequence ├── MaterializedCTE: t1 │ └── TableScan │ ├── table: default.system.numbers -│ ├── output columns: [number (#3)] +│ ├── output columns: [number (#0)] │ ├── read rows: 10 │ ├── read size: < 1 KiB │ ├── partitions total: 1 @@ -49,23 +49,23 @@ Sequence ├── MaterializedCTE: t2 │ └── MaterializeCTERef │ ├── cte_name: t1 - │ ├── cte_schema: [number (#2)] + │ ├── cte_schema: [number (#1)] │ └── estimated rows: 10.00 └── HashJoin - ├── output columns: [numbers.number (#0)] + ├── output columns: [numbers.number (#2)] ├── join type: INNER - ├── build keys: [t2.b (#1)] - ├── probe keys: [t1.a (#0)] + ├── build keys: [t2.b (#3)] + ├── probe keys: [t1.a (#2)] ├── keys is null equal: [false] ├── filters: [] ├── build join filters: - │ └── filter id:0, build key:t2.b (#1), probe key:t1.a (#0), filter type:inlist,min_max + │ └── filter id:0, build key:t2.b (#3), probe key:t1.a (#2), filter type:inlist,min_max ├── estimated rows: 100.00 ├── MaterializeCTERef(Build) │ ├── cte_name: t2 - │ ├── cte_schema: [number (#1)] + │ ├── cte_schema: [number (#3)] │ └── estimated rows: 10.00 └── MaterializeCTERef(Probe) ├── cte_name: t1 - ├── cte_schema: [number (#0)] + ├── cte_schema: [number (#2)] └── estimated rows: 10.00 diff --git a/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_join/push_down_filter_self_join.test b/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_join/push_down_filter_self_join.test index ddd365e14102c..0f1631d0b7fe2 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_join/push_down_filter_self_join.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_join/push_down_filter_self_join.test @@ -14,11 +14,11 @@ select * from D; Sequence ├── MaterializedCTE: a │ └── UnionAll -│ ├── output columns: [a (#22), b (#23)] +│ ├── output columns: [a (#4), b (#5)] │ ├── estimated rows: 20.00 │ ├── TableScan │ │ ├── table: default.default.t1 -│ │ ├── output columns: [a (#18), b (#19)] +│ │ ├── output columns: [a (#0), b (#1)] │ │ ├── read rows: 10 │ │ ├── read size: < 1 KiB │ │ ├── partitions total: 1 @@ -28,7 +28,7 @@ Sequence │ │ └── estimated rows: 10.00 │ └── TableScan │ ├── table: default.default.t2 -│ ├── output columns: [a (#20), b (#21)] +│ ├── output columns: [a (#2), b (#3)] │ ├── read rows: 10 │ ├── read size: < 1 KiB │ ├── partitions total: 1 @@ -40,21 +40,21 @@ Sequence ├── MaterializedCTE: b │ └── MaterializeCTERef │ ├── cte_name: a - │ ├── cte_schema: [a (#16), b (#17)] + │ ├── cte_schema: [a (#10), b (#11)] │ └── estimated rows: 20.00 └── HashJoin - ├── output columns: [a (#4), b (#5), b (#11), a (#10)] + ├── output columns: [a (#16), b (#17), b (#23), a (#22)] ├── join type: INNER - ├── build keys: [b2.a (#10)] - ├── probe keys: [b1.a (#4)] + ├── build keys: [b2.a (#22)] + ├── probe keys: [b1.a (#16)] ├── keys is null equal: [false] - ├── filters: [d.b (#5) < d.b (#11)] + ├── filters: [d.b (#17) < d.b (#23)] ├── estimated rows: 400.00 ├── MaterializeCTERef(Build) │ ├── cte_name: b - │ ├── cte_schema: [a (#10), b (#11)] + │ ├── cte_schema: [a (#22), b (#23)] │ └── estimated rows: 20.00 └── MaterializeCTERef(Probe) ├── cte_name: b - ├── cte_schema: [a (#4), b (#5)] + ├── cte_schema: [a (#16), b (#17)] └── estimated rows: 20.00 diff --git a/tests/sqllogictests/suites/tpch/join_order.test b/tests/sqllogictests/suites/tpch/join_order.test index 2e7d4f8486bb2..5fff8d6c96702 100644 --- a/tests/sqllogictests/suites/tpch/join_order.test +++ b/tests/sqllogictests/suites/tpch/join_order.test @@ -728,20 +728,20 @@ Sequence ├── MaterializedCTE │ ├── cte_name: revenue │ ├── ref_count: 2 -│ └── Scan: default.tpch_test.lineitem (#3) (read rows: 6001215) +│ └── Scan: default.tpch_test.lineitem (#0) (read rows: 6001215) └── HashJoin: INNER ├── Build │ └── MaterializeCTERef │ ├── cte_name: revenue - │ └── cte_schema: [l_suppkey (#28), total_revenue (#44)] + │ └── cte_schema: [l_suppkey (#47), total_revenue (#63)] └── Probe └── HashJoin: INNER ├── Build │ └── MaterializeCTERef │ ├── cte_name: revenue - │ └── cte_schema: [l_suppkey (#9), total_revenue (#25)] + │ └── cte_schema: [l_suppkey (#28), total_revenue (#44)] └── Probe - └── Scan: default.tpch_test.supplier (#0) (read rows: 10000) + └── Scan: default.tpch_test.supplier (#1) (read rows: 10000) # Q16