Skip to content

feat: support push down filter to materialized cte #18493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 12, 2025
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl PhysicalPlanBuilder {
self.build_materialized_cte(s_expr, materialized_cte, stat_info)
.await
}
RelOperator::MaterializeCTERef(cte_consumer) => {
RelOperator::MaterializedCTERef(cte_consumer) => {
self.build_cte_consumer(cte_consumer, stat_info).await
}
RelOperator::Sequence(sequence) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl MaterializeCTERef {
impl PhysicalPlanBuilder {
pub(crate) async fn build_cte_consumer(
&mut self,
cte_consumer: &crate::plans::MaterializeCTERef,
cte_consumer: &crate::plans::MaterializedCTERef,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
let mut fields = Vec::new();
Expand Down
9 changes: 8 additions & 1 deletion src/query/sql/src/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::binder::project_set::SetReturningInfo;
use crate::binder::window::WindowInfo;
use crate::binder::ColumnBindingBuilder;
use crate::normalize_identifier;
use crate::optimizer::ir::SExpr;
use crate::plans::ScalarExpr;
use crate::ColumnSet;
use crate::IndexType;
Expand Down Expand Up @@ -197,9 +198,15 @@ impl CteContext {
pub struct CteInfo {
pub columns_alias: Vec<String>,
pub query: Query,
pub materialized: bool,
pub recursive: bool,
pub columns: Vec<ColumnBinding>,
pub materialized_cte_info: Option<MaterializedCTEInfo>,
}

#[derive(Clone, Debug)]
pub struct MaterializedCTEInfo {
pub bound_s_expr: SExpr,
pub bound_context: BindContext,
}

impl BindContext {
Expand Down
54 changes: 43 additions & 11 deletions src/query/sql/src/planner/binder/bind_table_reference/bind_cte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use databend_common_ast::ast::Query;
Expand All @@ -25,18 +26,21 @@ use crate::binder::BindContext;
use crate::binder::Binder;
use crate::binder::CteContext;
use crate::binder::CteInfo;
use crate::binder::MaterializedCTEInfo;
use crate::normalize_identifier;
use crate::optimizer::ir::SExpr;
use crate::plans::MaterializeCTERef;
use crate::plans::MaterializedCTE;
use crate::plans::MaterializedCTERef;
use crate::plans::RelOperator;
use crate::plans::Sequence;
use crate::ColumnBinding;

impl Binder {
pub fn init_cte(&mut self, bind_context: &mut BindContext, with: &Option<With>) -> Result<()> {
let Some(with) = with else {
return Ok(());
};

for cte in with.ctes.iter() {
let cte_name = self.normalize_identifier(&cte.alias.name).name;
if bind_context.cte_context.cte_map.contains_key(&cte_name) {
Expand All @@ -45,6 +49,21 @@ impl Binder {
)));
}

let materialized_cte_info = if cte.materialized {
let (s_expr, cte_bind_context) = self.bind_cte_definition(
&cte_name,
&bind_context.cte_context.cte_map,
&cte.query,
)?;
let materialized_cte_info = MaterializedCTEInfo {
bound_s_expr: s_expr,
bound_context: cte_bind_context,
};
Some(materialized_cte_info)
} else {
None
};

let column_name = cte
.alias
.columns
Expand All @@ -56,8 +75,8 @@ impl Binder {
columns_alias: column_name,
query: *cte.query.clone(),
recursive: with.recursive,
materialized: cte.materialized,
columns: vec![],
materialized_cte_info,
};
bind_context.cte_context.cte_map.insert(cte_name, cte_info);
}
Expand All @@ -70,6 +89,7 @@ impl Binder {
table_name: &str,
alias: &Option<TableAlias>,
cte_info: &CteInfo,
producer_column_bindings: &[ColumnBinding],
) -> Result<(SExpr, BindContext)> {
let (s_expr, cte_bind_context) = self.bind_cte_definition(
table_name,
Expand Down Expand Up @@ -117,15 +137,24 @@ impl Binder {
let output_columns = cte_output_columns.iter().map(|c| c.index).collect();

let mut new_bind_context = bind_context.clone();
for column in cte_output_columns {
new_bind_context.add_column_binding(column);
for column in cte_output_columns.iter() {
new_bind_context.add_column_binding(column.clone());
}

let s_expr = SExpr::create_leaf(Arc::new(RelOperator::MaterializeCTERef(
MaterializeCTERef {
let mut column_mapping = HashMap::new();
for (index_in_ref, index_in_producer) in cte_output_columns
.iter()
.zip(producer_column_bindings.iter())
{
column_mapping.insert(index_in_ref.index, index_in_producer.index);
}

let s_expr = SExpr::create_leaf(Arc::new(RelOperator::MaterializedCTERef(
MaterializedCTERef {
cte_name: table_name.to_string(),
output_columns,
def: s_expr,
column_mapping,
},
)));
Ok((s_expr, new_bind_context))
Expand Down Expand Up @@ -164,13 +193,16 @@ impl Binder {
let mut current_expr = main_query_expr;

for cte in with.ctes.iter().rev() {
if cte.materialized {
let cte_name = self.normalize_identifier(&cte.alias.name).name;
let (s_expr, bind_context) =
self.bind_cte_definition(&cte_name, &cte_context.cte_map, &cte.query)?;
let cte_name = self.normalize_identifier(&cte.alias.name).name;
let cte_info = cte_context.cte_map.get(&cte_name).ok_or_else(|| {
ErrorCode::Internal(format!("CTE '{}' not found in context", cte_name))
})?;
if let Some(materialized_cte_info) = &cte_info.materialized_cte_info {
let s_expr = materialized_cte_info.bound_s_expr.clone();
let bind_context = materialized_cte_info.bound_context.clone();

let materialized_cte =
MaterializedCTE::new(cte_name, Some(bind_context.columns), None);
MaterializedCTE::new(cte_name, Some(bind_context.columns.clone()), None);
let materialized_cte = SExpr::create_unary(materialized_cte, s_expr);
let sequence = Sequence {};
current_expr = SExpr::create_binary(sequence, materialized_cte, current_expr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,14 @@ impl Binder {

let cte_map = bind_context.cte_context.cte_map.clone();
if let Some(cte_info) = cte_map.get(&table_name) {
if cte_info.materialized {
return self.bind_cte_consumer(bind_context, &table_name, alias, cte_info);
if let Some(materialized_cte_info) = &cte_info.materialized_cte_info {
return self.bind_cte_consumer(
bind_context,
&table_name,
alias,
cte_info,
&materialized_cte_info.bound_context.columns,
);
} else {
if self
.metadata
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/optimizer/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn display_rel_op(rel_op: &RelOperator) -> String {
RelOperator::MutationSource(_) => "MutationSource".to_string(),
RelOperator::CompactBlock(_) => "CompactBlock".to_string(),
RelOperator::MaterializedCTE(_) => "MaterializedCTE".to_string(),
RelOperator::MaterializeCTERef(_) => "MaterializeCTERef".to_string(),
RelOperator::MaterializedCTERef(_) => "MaterializeCTERef".to_string(),
RelOperator::Sequence(_) => "Sequence".to_string(),
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::optimizer::optimizers::operator::SubqueryDecorrelatorOptimizer;
use crate::optimizer::optimizers::recursive::RecursiveRuleOptimizer;
use crate::optimizer::optimizers::rule::RuleID;
use crate::optimizer::optimizers::rule::DEFAULT_REWRITE_RULES;
use crate::optimizer::optimizers::CTEFilterPushdownOptimizer;
use crate::optimizer::optimizers::CascadesOptimizer;
use crate::optimizer::optimizers::DPhpyOptimizer;
use crate::optimizer::pipeline::OptimizerPipeline;
Expand Down Expand Up @@ -254,32 +255,34 @@ pub async fn optimize_query(opt_ctx: Arc<OptimizerContext>, s_expr: SExpr) -> Re
opt_ctx.clone(),
&DEFAULT_REWRITE_RULES,
))
// 8. Run post rewrite rules
// 8. CTE filter pushdown optimization
.add(CTEFilterPushdownOptimizer::new(opt_ctx.clone()))
// 9. Run post rewrite rules
.add(RecursiveRuleOptimizer::new(opt_ctx.clone(), &[
RuleID::SplitAggregate,
]))
// 9. Apply DPhyp algorithm for cost-based join reordering
// 10. Apply DPhyp algorithm for cost-based join reordering
.add(DPhpyOptimizer::new(opt_ctx.clone()))
// 10. After join reorder, Convert some single join to inner join.
// 11. After join reorder, Convert some single join to inner join.
.add(SingleToInnerOptimizer::new())
// 11. Deduplicate join conditions.
// 12. Deduplicate join conditions.
.add(DeduplicateJoinConditionOptimizer::new())
// 12. Apply join commutativity to further optimize join ordering
// 13. Apply join commutativity to further optimize join ordering
.add_if(
opt_ctx.get_enable_join_reorder(),
RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::CommuteJoin].as_slice()),
)
// 13. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case.
// 14. Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case.
.add(CascadesOptimizer::new(opt_ctx.clone())?)
// 14. Eliminate unnecessary scalar calculations to clean up the final plan
// 15. Eliminate unnecessary scalar calculations to clean up the final plan
.add_if(
!opt_ctx.get_planning_agg_index(),
RecursiveRuleOptimizer::new(opt_ctx.clone(), [RuleID::EliminateEvalScalar].as_slice()),
)
// 15. Clean up unused CTEs
// 16. Clean up unused CTEs
.add(CleanupUnusedCTEOptimizer);

// 15. Execute the pipeline
// 17. Execute the pipeline
let s_expr = pipeline.execute().await?;

Ok(s_expr)
Expand Down
Loading