diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/plan_normalize.rs b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/plan_normalize.rs index 8433285bde22f..04709d926cb7c 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/plan_normalize.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/plan_normalize.rs @@ -82,7 +82,13 @@ fn plan_normalize( let new_expr = expr .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; let alias = alias.clone(); @@ -112,7 +118,7 @@ fn plan_normalize( LogicalPlan::Filter(Filter { predicate, input }) => { let input = plan_normalize(optimizer, input, remapped_columns, optimizer_config)?; let schema = input.schema(); - let predicate = expr_normalize( + let predicate = expr_normalize_stacked( optimizer, predicate, schema, @@ -133,7 +139,13 @@ fn plan_normalize( let new_window_expr = window_expr .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; @@ -163,13 +175,25 @@ fn plan_normalize( let new_group_expr = group_expr .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; let new_aggr_expr = aggr_expr .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; @@ -204,7 +228,13 @@ fn plan_normalize( let expr = expr .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; @@ -304,7 +334,7 @@ fn plan_normalize( let exprs = exprs .iter() .map(|expr| { - expr_normalize( + expr_normalize_stacked( optimizer, expr, schema, @@ -313,6 +343,7 @@ fn plan_normalize( ) }) .collect::>>()?; + Partitioning::Hash(exprs, *n) } }; @@ -364,7 +395,7 @@ fn plan_normalize( let filters = filters .iter() .map(|expr| { - expr_normalize( + expr_normalize_stacked( optimizer, expr, &projected_schema, @@ -447,7 +478,7 @@ fn plan_normalize( .map(|row| { row.iter() .map(|expr| { - expr_normalize( + expr_normalize_stacked( optimizer, expr, schema, @@ -508,7 +539,13 @@ fn plan_normalize( let new_expr = expr .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; let new_schema = build_table_udf_schema(&input, &new_expr)?; @@ -547,6 +584,16 @@ fn plan_normalize( } } +fn expr_normalize_stacked( + optimizer: &PlanNormalize, + expr: &Expr, + schema: &DFSchema, + remapped_columns: &HashMap, + optimizer_config: &OptimizerConfig, +) -> Result { + expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config).map(|e| *e) +} + /// Recursively normalizes expressions. fn expr_normalize( optimizer: &PlanNormalize, @@ -554,34 +601,28 @@ fn expr_normalize( schema: &DFSchema, remapped_columns: &HashMap, optimizer_config: &OptimizerConfig, -) -> Result { +) -> Result> { match expr { Expr::Alias(expr, alias) => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; let alias = alias.clone(); - Ok(Expr::Alias(expr, alias)) + Ok(Box::new(Expr::Alias(expr, alias))) } Expr::OuterColumn(data_type, column) => { let data_type = data_type.clone(); let column = column_normalize(optimizer, column, remapped_columns, optimizer_config)?; - Ok(Expr::OuterColumn(data_type, column)) + Ok(Box::new(Expr::OuterColumn(data_type, column))) } Expr::Column(column) => { let column = column_normalize(optimizer, column, remapped_columns, optimizer_config)?; - Ok(Expr::Column(column)) + Ok(Box::new(Expr::Column(column))) } - e @ Expr::ScalarVariable(..) => Ok(e.clone()), + e @ Expr::ScalarVariable(..) => Ok(Box::new(e.clone())), - e @ Expr::Literal(..) => Ok(e.clone()), + e @ Expr::Literal(..) => Ok(Box::new(e.clone())), Expr::BinaryExpr { left, op, right } => binary_expr_normalize( optimizer, @@ -599,28 +640,17 @@ fn expr_normalize( right, all, } => { - let left = Box::new(expr_normalize( - optimizer, - left, - schema, - remapped_columns, - optimizer_config, - )?); + let left = expr_normalize(optimizer, left, schema, remapped_columns, optimizer_config)?; let op = *op; - let right = Box::new(expr_normalize( - optimizer, - right, - schema, - remapped_columns, - optimizer_config, - )?); + let right = + expr_normalize(optimizer, right, schema, remapped_columns, optimizer_config)?; let all = *all; - Ok(Expr::AnyExpr { + Ok(Box::new(Expr::AnyExpr { left, op, right, all, - }) + })) } Expr::Like(Like { @@ -630,27 +660,21 @@ fn expr_normalize( escape_char, }) => { let negated = *negated; - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - let pattern = Box::new(expr_normalize( + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + let pattern = expr_normalize( optimizer, pattern, schema, remapped_columns, optimizer_config, - )?); + )?; let escape_char = *escape_char; - Ok(Expr::Like(Like { + Ok(Box::new(Expr::Like(Like { negated, expr, pattern, escape_char, - })) + }))) } Expr::ILike(Like { @@ -660,27 +684,21 @@ fn expr_normalize( escape_char, }) => { let negated = *negated; - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - let pattern = Box::new(expr_normalize( + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + let pattern = expr_normalize( optimizer, pattern, schema, remapped_columns, optimizer_config, - )?); + )?; let escape_char = *escape_char; - Ok(Expr::ILike(Like { + Ok(Box::new(Expr::ILike(Like { negated, expr, pattern, escape_char, - })) + }))) } Expr::SimilarTo(Like { @@ -690,89 +708,47 @@ fn expr_normalize( escape_char, }) => { let negated = *negated; - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - let pattern = Box::new(expr_normalize( + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + let pattern = expr_normalize( optimizer, pattern, schema, remapped_columns, optimizer_config, - )?); + )?; let escape_char = *escape_char; - Ok(Expr::SimilarTo(Like { + Ok(Box::new(Expr::SimilarTo(Like { negated, expr, pattern, escape_char, - })) + }))) } Expr::Not(expr) => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - Ok(Expr::Not(expr)) + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + Ok(Box::new(Expr::Not(expr))) } Expr::IsNotNull(expr) => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - Ok(Expr::IsNotNull(expr)) + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + Ok(Box::new(Expr::IsNotNull(expr))) } Expr::IsNull(expr) => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - Ok(Expr::IsNull(expr)) + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + Ok(Box::new(Expr::IsNull(expr))) } Expr::Negative(expr) => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - Ok(Expr::Negative(expr)) + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + Ok(Box::new(Expr::Negative(expr))) } Expr::GetIndexedField { expr, key } => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - let key = Box::new(expr_normalize( - optimizer, - key, - schema, - remapped_columns, - optimizer_config, - )?); - Ok(Expr::GetIndexedField { expr, key }) + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + let key = expr_normalize(optimizer, key, schema, remapped_columns, optimizer_config)?; + Ok(Box::new(Expr::GetIndexedField { expr, key })) } Expr::Between { @@ -781,34 +757,16 @@ fn expr_normalize( low, high, } => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; let negated = *negated; - let low = Box::new(expr_normalize( - optimizer, - low, - schema, - remapped_columns, - optimizer_config, - )?); - let high = Box::new(expr_normalize( - optimizer, - high, - schema, - remapped_columns, - optimizer_config, - )?); - Ok(Expr::Between { + let low = expr_normalize(optimizer, low, schema, remapped_columns, optimizer_config)?; + let high = expr_normalize(optimizer, high, schema, remapped_columns, optimizer_config)?; + Ok(Box::new(Expr::Between { expr, negated, low, high, - }) + })) } Expr::Case { @@ -818,78 +776,50 @@ fn expr_normalize( } => { let expr = expr .as_ref() - .map(|e| { - Ok::<_, DataFusionError>(Box::new(expr_normalize( - optimizer, - e, - schema, - remapped_columns, - optimizer_config, - )?)) - }) + .map(|e| expr_normalize(optimizer, e, schema, remapped_columns, optimizer_config)) .transpose()?; let when_then_expr = when_then_expr .iter() .map(|(when, then)| { Ok(( - Box::new(expr_normalize( + expr_normalize( optimizer, when, schema, remapped_columns, optimizer_config, - )?), - Box::new(expr_normalize( + )?, + expr_normalize( optimizer, then, schema, remapped_columns, optimizer_config, - )?), + )?, )) }) .collect::>>()?; let else_expr = else_expr .as_ref() - .map(|e| { - Ok::<_, DataFusionError>(Box::new(expr_normalize( - optimizer, - e, - schema, - remapped_columns, - optimizer_config, - )?)) - }) + .map(|e| expr_normalize(optimizer, e, schema, remapped_columns, optimizer_config)) .transpose()?; - Ok(Expr::Case { + Ok(Box::new(Expr::Case { expr, when_then_expr, else_expr, - }) + })) } Expr::Cast { expr, data_type } => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; let data_type = data_type.clone(); - Ok(Expr::Cast { expr, data_type }) + Ok(Box::new(Expr::Cast { expr, data_type })) } Expr::TryCast { expr, data_type } => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; let data_type = data_type.clone(); - Ok(Expr::TryCast { expr, data_type }) + Ok(Box::new(Expr::TryCast { expr, data_type })) } Expr::Sort { @@ -897,20 +827,14 @@ fn expr_normalize( asc, nulls_first, } => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; let asc = *asc; let nulls_first = *nulls_first; - Ok(Expr::Sort { + Ok(Box::new(Expr::Sort { expr, asc, nulls_first, - }) + })) } Expr::ScalarFunction { fun, args } => { @@ -922,7 +846,8 @@ fn expr_normalize( remapped_columns, optimizer_config, )?; - Ok(Expr::ScalarFunction { fun, args }) + + Ok(Box::new(Expr::ScalarFunction { fun, args })) } Expr::ScalarUDF { fun, args } => { @@ -930,10 +855,17 @@ fn expr_normalize( let args = args .iter() .map(|arg| { - expr_normalize(optimizer, arg, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + arg, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; - Ok(Expr::ScalarUDF { fun, args }) + + Ok(Box::new(Expr::ScalarUDF { fun, args })) } Expr::TableUDF { fun, args } => { @@ -941,10 +873,17 @@ fn expr_normalize( let args = args .iter() .map(|arg| { - expr_normalize(optimizer, arg, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + arg, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; - Ok(Expr::TableUDF { fun, args }) + + Ok(Box::new(Expr::TableUDF { fun, args })) } Expr::AggregateFunction { @@ -957,7 +896,13 @@ fn expr_normalize( let args = args .iter() .map(|arg| { - expr_normalize(optimizer, arg, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + arg, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; let distinct = *distinct; @@ -966,17 +911,23 @@ fn expr_normalize( .map(|expr| { expr.iter() .map(|e| { - expr_normalize(optimizer, e, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + e, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>() }) .transpose()?; - Ok(Expr::AggregateFunction { + Ok(Box::new(Expr::AggregateFunction { fun, args, distinct, within_group, - }) + })) } Expr::WindowFunction { @@ -990,29 +941,47 @@ fn expr_normalize( let args = args .iter() .map(|arg| { - expr_normalize(optimizer, arg, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + arg, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; let partition_by = partition_by .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; let order_by = order_by .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; let window_frame = *window_frame; - Ok(Expr::WindowFunction { + Ok(Box::new(Expr::WindowFunction { fun, args, partition_by, order_by, window_frame, - }) + })) } Expr::AggregateUDF { fun, args } => { @@ -1020,10 +989,16 @@ fn expr_normalize( let args = args .iter() .map(|arg| { - expr_normalize(optimizer, arg, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + arg, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; - Ok(Expr::AggregateUDF { fun, args }) + Ok(Box::new(Expr::AggregateUDF { fun, args })) } Expr::InList { @@ -1045,42 +1020,33 @@ fn expr_normalize( subquery, negated, } => { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); - let subquery = Box::new(expr_normalize( + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; + let subquery = expr_normalize( optimizer, subquery, schema, remapped_columns, optimizer_config, - )?); + )?; let negated = *negated; - Ok(Expr::InSubquery { + Ok(Box::new(Expr::InSubquery { expr, subquery, negated, - }) + })) } - e @ Expr::Wildcard => Ok(e.clone()), + e @ Expr::Wildcard => Ok(Box::new(e.clone())), - e @ Expr::QualifiedWildcard { .. } => Ok(e.clone()), + e @ Expr::QualifiedWildcard { .. } => Ok(Box::new(e.clone())), - Expr::GroupingSet(grouping_set) => { - let grouping_set = grouping_set_normalize( - optimizer, - grouping_set, - schema, - remapped_columns, - optimizer_config, - )?; - Ok(Expr::GroupingSet(grouping_set)) - } + Expr::GroupingSet(grouping_set) => grouping_set_normalize( + optimizer, + grouping_set, + schema, + remapped_columns, + optimizer_config, + ), } } @@ -1111,7 +1077,9 @@ fn scalar_function_normalize( let fun = fun.clone(); let mut args = args .iter() - .map(|arg| expr_normalize(optimizer, arg, schema, remapped_columns, optimizer_config)) + .map(|arg| { + expr_normalize_stacked(optimizer, arg, schema, remapped_columns, optimizer_config) + }) .collect::>>()?; // If the function is `DatePart` or `DateTrunc` and the first argument is a literal string, @@ -1141,36 +1109,50 @@ fn grouping_set_normalize( schema: &DFSchema, remapped_columns: &HashMap, optimizer_config: &OptimizerConfig, -) -> Result { +) -> Result> { match grouping_set { GroupingSet::Rollup(exprs) => { let exprs = exprs .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; - Ok(GroupingSet::Rollup(exprs)) + + Ok(Box::new(Expr::GroupingSet(GroupingSet::Rollup(exprs)))) } GroupingSet::Cube(exprs) => { let exprs = exprs .iter() .map(|expr| { - expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config) + expr_normalize_stacked( + optimizer, + expr, + schema, + remapped_columns, + optimizer_config, + ) }) .collect::>>()?; - Ok(GroupingSet::Cube(exprs)) + + Ok(Box::new(Expr::GroupingSet(GroupingSet::Cube(exprs)))) } GroupingSet::GroupingSets(exprs) => { let exprs = exprs .iter() .map(|exprs| { - Ok(exprs + exprs .iter() .map(|expr| { - expr_normalize( + expr_normalize_stacked( optimizer, expr, schema, @@ -1178,10 +1160,13 @@ fn grouping_set_normalize( optimizer_config, ) }) - .collect::>>()?) + .collect::>>() }) .collect::>>()?; - Ok(GroupingSet::GroupingSets(exprs)) + + Ok(Box::new(Expr::GroupingSet(GroupingSet::GroupingSets( + exprs, + )))) } } } @@ -1200,22 +1185,10 @@ fn binary_expr_normalize( schema: &DFSchema, remapped_columns: &HashMap, optimizer_config: &OptimizerConfig, -) -> Result { - let left = Box::new(expr_normalize( - optimizer, - left, - schema, - remapped_columns, - optimizer_config, - )?); +) -> Result> { + let left = expr_normalize(optimizer, left, schema, remapped_columns, optimizer_config)?; let op = *op; - let right = Box::new(expr_normalize( - optimizer, - right, - schema, - remapped_columns, - optimizer_config, - )?); + let right = expr_normalize(optimizer, right, schema, remapped_columns, optimizer_config)?; // Check if the expression is `DATE - DATE` and replace it with `DATEDIFF` with same semantics. // Rationale to do this in optimizer than rewrites is that while the expression @@ -1238,27 +1211,25 @@ fn binary_expr_normalize( *right, *left, ]; - return Ok(Expr::ScalarUDF { fun, args }); + return Ok(Box::new(Expr::ScalarUDF { fun, args })); } // Check if the expression is `TIMESTAMP DATE` or `DATE TIMESTAMP` // and cast the `DATE` to `TIMESTAMP` to match the types. match (&left_type, &right_type) { (DataType::Timestamp(_, _), DataType::Date32) => { - let new_right = evaluate_expr(optimizer, right.cast_to(&left_type, schema)?)?; - return Ok(Expr::BinaryExpr { + return Ok(Box::new(Expr::BinaryExpr { left, op, - right: Box::new(new_right), - }); + right: evaluate_expr(optimizer, right.cast_to(&left_type, schema)?)?, + })); } (DataType::Date32, DataType::Timestamp(_, _)) => { - let new_left = evaluate_expr(optimizer, left.cast_to(&right_type, schema)?)?; - return Ok(Expr::BinaryExpr { - left: Box::new(new_left), + return Ok(Box::new(Expr::BinaryExpr { + left: evaluate_expr(optimizer, left.cast_to(&right_type, schema)?)?, op, right, - }); + })); } _ => (), }; @@ -1269,25 +1240,25 @@ fn binary_expr_normalize( let (other_type, literal_on_the_left) = match (left.as_ref(), right.as_ref()) { (_, Expr::Literal(ScalarValue::Utf8(Some(_)))) => (left_type, false), (Expr::Literal(ScalarValue::Utf8(Some(_))), _) => (right_type, true), - _ => return Ok(Expr::BinaryExpr { left, op, right }), + _ => return Ok(Box::new(Expr::BinaryExpr { left, op, right })), }; + let Some(cast_type) = binary_expr_cast_literal(&op, &other_type) else { - return Ok(Expr::BinaryExpr { left, op, right }); + return Ok(Box::new(Expr::BinaryExpr { left, op, right })); }; + if literal_on_the_left { - let new_left = evaluate_expr(optimizer, left.cast_to(&cast_type, schema)?)?; - Ok(Expr::BinaryExpr { - left: Box::new(new_left), + Ok(Box::new(Expr::BinaryExpr { + left: evaluate_expr(optimizer, left.cast_to(&cast_type, schema)?)?, op, right, - }) + })) } else { - let new_right = evaluate_expr(optimizer, right.cast_to(&cast_type, schema)?)?; - Ok(Expr::BinaryExpr { + Ok(Box::new(Expr::BinaryExpr { left, op, - right: Box::new(new_right), - }) + right: evaluate_expr(optimizer, right.cast_to(&cast_type, schema)?)?, + })) } } @@ -1350,20 +1321,14 @@ fn in_list_expr_normalize( schema: &DFSchema, remapped_columns: &HashMap, optimizer_config: &OptimizerConfig, -) -> Result { - let expr = Box::new(expr_normalize( - optimizer, - expr, - schema, - remapped_columns, - optimizer_config, - )?); +) -> Result> { + let expr = expr_normalize(optimizer, expr, schema, remapped_columns, optimizer_config)?; let expr_type = expr.get_type(schema)?; let expr_is_timestamp = matches!(expr_type, DataType::Timestamp(_, _)); let list = list .iter() .map(|list_expr| { - let list_expr_normalized = expr_normalize( + let list_expr_normalized = expr_normalize_stacked( optimizer, list_expr, schema, @@ -1373,23 +1338,95 @@ fn in_list_expr_normalize( if !expr_is_timestamp { return Ok(list_expr_normalized); } + let list_expr_type = list_expr_normalized.get_type(schema)?; if !matches!(list_expr_type, DataType::Date32) { return Ok(list_expr_normalized); } - evaluate_expr(optimizer, list_expr_normalized.cast_to(&expr_type, schema)?) + + evaluate_expr_stacked(optimizer, list_expr_normalized.cast_to(&expr_type, schema)?) }) .collect::>>()?; - Ok(Expr::InList { + + Ok(Box::new(Expr::InList { expr, list, negated, - }) + })) } -/// Evaluates an expression to a constant if possible. -fn evaluate_expr(optimizer: &PlanNormalize, expr: Expr) -> Result { +fn evaluate_expr_stacked(optimizer: &PlanNormalize, expr: Expr) -> Result { let execution_props = &optimizer.cube_ctx.state.execution_props; let mut const_evaluator = ConstEvaluator::new(execution_props); expr.rewrite(&mut const_evaluator) } + +/// Evaluates an expression to a constant if possible. +fn evaluate_expr(optimizer: &PlanNormalize, expr: Expr) -> Result> { + Ok(Box::new(evaluate_expr_stacked(optimizer, expr)?)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::compile::test::{ + get_test_tenant_ctx, rewrite_engine::create_test_postgresql_cube_context, run_async_test, + }; + use datafusion::{ + arrow::datatypes::{DataType, Field, Schema}, + logical_plan::{col, lit, LogicalPlanBuilder}, + }; + + /// Helper function to create a deeply nested OR expression. + /// This creates a chain like: col = 1 OR col = 2 OR col = 3 OR ... OR col = depth + fn create_deeply_nested_or_expr(column_name: &str, depth: usize) -> Expr { + if depth == 0 { + return col(column_name).eq(lit(0i32)); + } + + let mut expr = col(column_name).eq(lit(0i32)); + + for i in 1..depth { + expr = expr.or(col(column_name).eq(lit(i as i32))); + } + + expr + } + + // plan_normalize is recursive, at the same time ExprRewriter from DF is too + // let's guard it with test, that our code in dev profile is optimized to rewrite N nodes + #[test] + fn test_stack_overflow_deeply_nested_or() -> Result<()> { + run_async_test(async move { + let meta = get_test_tenant_ctx(); + let cube_ctx = create_test_postgresql_cube_context(meta) + .await + .expect("Failed to create cube context"); + + // Create a simple table + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Int32, true), + ]); + + let table_scan = LogicalPlanBuilder::scan_empty(Some("test_table"), &schema, None) + .expect("Failed to create table scan") + .build() + .expect("Failed to build plan"); + + // Create a deeply nested OR expression (should cause stack overflow) + let deeply_nested_filter = create_deeply_nested_or_expr("value", 200); + + let plan = LogicalPlanBuilder::from(table_scan) + .filter(deeply_nested_filter) + .expect("Failed to add filter") + .build() + .expect("Failed to build plan"); + + let optimizer = PlanNormalize::new(&cube_ctx); + optimizer.optimize(&plan, &OptimizerConfig::new()).unwrap(); + }); + + Ok(()) + } +} diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index df7a13227d6cb..9e78921bf910f 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -1,5 +1,3 @@ -use std::{collections::HashMap, env, ops::Deref, sync::Arc}; - use super::{convert_sql_to_cube_query, CompilationResult, QueryPlan}; use crate::{ compile::{ @@ -23,6 +21,8 @@ use crate::{ use async_trait::async_trait; use cubeclient::models::V1CubeMetaType; use datafusion::{arrow::datatypes::SchemaRef, dataframe::DataFrame as DFDataFrame}; +use std::future::Future; +use std::{collections::HashMap, env, ops::Deref, sync::Arc}; use uuid::Uuid; pub mod rewrite_engine; @@ -1201,3 +1201,45 @@ pub async fn convert_select_to_query_plan_with_meta( query.unwrap() } + +pub struct AsyncTestOptions { + stack_bytes: usize, +} + +impl Default for AsyncTestOptions { + fn default() -> Self { + Self { + // By default tokio stack size is aligned with OS default: + // 1mb for Windows, 2mb for macOS, 8mb for linux + // Let's align everything to 8mb + stack_bytes: 8 * 1024 * 1024, + } + } +} + +pub fn run_async_test(fut: F) +where + F: Future + Send + 'static, +{ + run_async_test_opt(AsyncTestOptions::default(), fut) +} + +pub fn run_async_test_opt(opts: AsyncTestOptions, fut: F) +where + F: Future + Send + 'static, +{ + std::thread::Builder::new() + .name("test-rt".into()) + .stack_size(opts.stack_bytes) + .spawn(|| { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(fut); + }) + .unwrap() + .join() + .unwrap(); +}