diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index e1e6bfb632e6a..02d362466750b 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -34,7 +34,7 @@ use serde::{Deserialize, Serialize}; use std::{ any::Any, cmp::min, - collections::HashMap, + collections::{HashMap, HashSet}, convert::TryInto, fmt, future::Future, @@ -252,7 +252,7 @@ impl CubeScanWrapperNode { } } -fn expr_name(e: &Expr, schema: &Arc) -> Result { +fn expr_name(e: &Expr, schema: &DFSchema) -> Result { match e { Expr::Column(col) => Ok(col.name.clone()), Expr::Sort { expr, .. } => expr_name(expr, schema), @@ -260,10 +260,164 @@ fn expr_name(e: &Expr, schema: &Arc) -> Result { } } +/// Holds column remapping for generated SQL +/// Can be used to remap expression in logical plans on top, +/// and to generate mapping between schema and Cube load query in wrapper +pub struct ColumnRemapping { + column_remapping: HashMap, +} + +impl ColumnRemapping { + /// Generate member_fields for CubeScanExecutionPlan, which contains SQL with this remapping. + /// Cube will respond with aliases after remapping, which we must use to read response. + /// Schema in DF will stay the same as before remapping. + /// So result would have all aliases after remapping in order derived from `schema`. + pub fn member_fields(&self, schema: &DFSchema) -> Vec { + schema + .fields() + .iter() + .map(|f| { + MemberField::Member( + self.column_remapping + .get(&Column::from_name(f.name().to_string())) + .map(|x| x.name.to_string()) + .unwrap_or(f.name().to_string()), + ) + }) + .collect() + } + + /// Replace every column expression in `expr` according to this remapping. Column expressions + /// not present in `self` will stay the same. + pub fn remap(&self, expr: &Expr) -> result::Result { + replace_col( + expr.clone(), + &self.column_remapping.iter().map(|(k, v)| (k, v)).collect(), + ) + .map_err(|_| CubeError::internal(format!("Can't rename columns for expr: {expr:?}",))) + } +} + +/// Builds new column mapping +/// One remapper for one context: all unqualified columns with same name are assumed the same column +struct Remapper { + from_alias: Option, + can_rename_columns: bool, + remapping: HashMap, + used_targets: HashSet, +} + +impl Remapper { + /// Constructs new Remapper + /// `from_alias` would be used as qualifier after remapping + /// When `can_rename_columns` is enabled, column names will be generated. + /// When it's disabled, column names must stay the same. + /// Column qualifiers can change in both cases. + pub fn new(from_alias: Option, can_rename_columns: bool) -> Self { + Remapper { + from_alias, + can_rename_columns, + + remapping: HashMap::new(), + used_targets: HashSet::new(), + } + } + + /// Generate new alias for expression + /// `original_expr` is the one we are generating alias for + /// `expr` can be same or modified, i.e. when previous column remapping is applied. + /// `expr` would be used to generate new alias when `can_rename_columns` is enabled. + /// When `original_expr` is column it would remap both unqualified and qualified colunms to new alias + pub fn add_expr( + &mut self, + schema: &DFSchema, + original_expr: &Expr, + expr: &Expr, + ) -> result::Result { + static NON_ID_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"[^a-zA-Z0-9_]").unwrap()); + + let original_alias = expr_name(original_expr, schema)?; + let original_alias_key = Column::from_name(&original_alias); + if let Some(alias_column) = self.remapping.get(&original_alias_key) { + return Ok(alias_column.name.clone()); + } + + let alias = if self.can_rename_columns { + let alias = expr_name(&expr, &schema)?; + let mut truncated_alias = NON_ID_REGEX + .replace_all(&alias, "_") + .trim_start_matches("_") + .to_lowercase(); + truncated_alias.truncate(16); + let mut alias = truncated_alias.clone(); + for i in 1..10000 { + if !self.used_targets.contains(&alias) { + break; + } + alias = format!("{}_{}", truncated_alias, i); + } + alias + } else { + original_alias.clone() + }; + + if self.used_targets.contains(&alias) { + return Err(CubeError::internal(format!( + "Can't generate SQL for column expr: duplicate alias {alias}" + ))); + } + + self.used_targets.insert(alias.clone()); + self.remapping + .insert(original_alias_key, Column::from_name(&alias)); + if let Some(from_alias) = &self.from_alias { + self.remapping.insert( + Column { + name: original_alias.clone(), + relation: Some(from_alias.clone()), + }, + Column { + name: alias.clone(), + relation: Some(from_alias.clone()), + }, + ); + if let Expr::Column(column) = &original_expr { + if let Some(original_relation) = &column.relation { + if original_relation != from_alias { + self.remapping.insert( + Column { + name: original_alias.clone(), + relation: Some(original_relation.clone()), + }, + Column { + name: alias.clone(), + relation: Some(from_alias.clone()), + }, + ); + } + } + } + } + + Ok(alias) + } + + pub fn into_remapping(self) -> Option { + if self.remapping.len() > 0 { + Some(ColumnRemapping { + column_remapping: self.remapping, + }) + } else { + None + } + } +} + pub struct SqlGenerationResult { pub data_source: Option, pub from_alias: Option, - pub column_remapping: Option>, + pub column_remapping: Option, pub sql: SqlQuery, pub request: TransportLoadRequestQuery, } @@ -332,11 +486,7 @@ impl CubeScanWrapperNode { .await .and_then(|SqlGenerationResult { data_source, mut sql, request, column_remapping, .. }| -> result::Result<_, CubeError> { let member_fields = if let Some(column_remapping) = column_remapping { - schema - .fields() - .iter() - .map(|f| MemberField::Member(column_remapping.get(&Column::from_name(f.name().to_string())).map(|x| x.name.to_string()).unwrap_or(f.name().to_string()))) - .collect() + column_remapping.member_fields(schema) } else { schema .fields() @@ -600,6 +750,8 @@ impl CubeScanWrapperNode { .await? }; + let column_remapping = column_remapping.as_ref(); + let mut subqueries_sql = HashMap::new(); for subquery in subqueries.iter() { let SqlGenerationResult { @@ -625,8 +777,8 @@ impl CubeScanWrapperNode { subqueries_sql.insert(field.qualified_name(), sql_string); } let subqueries_sql = Arc::new(subqueries_sql); - let mut next_remapping = HashMap::new(); let alias = alias.or(from_alias.clone()); + let mut next_remapper = Remapper::new(alias.clone(), can_rename_columns); if let Some(data_source) = data_source { let generator = plan .meta @@ -645,9 +797,8 @@ impl CubeScanWrapperNode { projection_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -660,9 +811,8 @@ impl CubeScanWrapperNode { flat_group_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -675,9 +825,8 @@ impl CubeScanWrapperNode { aggr_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -690,9 +839,8 @@ impl CubeScanWrapperNode { filter_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -705,9 +853,8 @@ impl CubeScanWrapperNode { window_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -720,9 +867,8 @@ impl CubeScanWrapperNode { order_expr.clone(), sql, generator.clone(), - &column_remapping, - &mut next_remapping, - alias.clone(), + column_remapping, + &mut next_remapper, can_rename_columns, ungrouped_scan_node.clone(), subqueries_sql.clone(), @@ -870,11 +1016,7 @@ impl CubeScanWrapperNode { data_source: Some(data_source), from_alias: alias, sql: sql_response.sql, - column_remapping: if next_remapping.len() > 0 { - Some(next_remapping) - } else { - None - }, + column_remapping: next_remapper.into_remapping(), request: load_request.clone(), }) } else { @@ -915,11 +1057,7 @@ impl CubeScanWrapperNode { data_source: Some(data_source), from_alias: alias, sql, - column_remapping: if next_remapping.len() > 0 { - Some(next_remapping) - } else { - None - }, + column_remapping: next_remapper.into_remapping(), request, }) } @@ -960,29 +1098,16 @@ impl CubeScanWrapperNode { exprs: Vec, mut sql: SqlQuery, generator: Arc, - column_remapping: &Option>, - next_remapping: &mut HashMap, - from_alias: Option, + column_remapping: Option<&ColumnRemapping>, + next_remapper: &mut Remapper, can_rename_columns: bool, ungrouped_scan_node: Option>, subqueries: Arc>, ) -> result::Result<(Vec, SqlQuery), CubeError> { - static NON_ID_REGEX: LazyLock = - LazyLock::new(|| Regex::new(r"[^a-zA-Z0-9_]").unwrap()); - let mut aliased_columns = Vec::new(); for original_expr in exprs { - let expr = if let Some(column_remapping) = column_remapping.as_ref() { - let mut expr = replace_col( - original_expr.clone(), - &column_remapping.iter().map(|(k, v)| (k, v)).collect(), - ) - .map_err(|_| { - CubeError::internal(format!( - "Can't rename columns for expr: {:?}", - original_expr - )) - })?; + let expr = if let Some(column_remapping) = column_remapping { + let mut expr = column_remapping.remap(&original_expr)?; if !can_rename_columns { let original_alias = expr_name(&original_expr, &schema)?; if original_alias != expr_name(&expr, &schema)? { @@ -1006,75 +1131,7 @@ impl CubeScanWrapperNode { Self::escape_interpolation_quotes(expr_sql, ungrouped_scan_node.is_some()); sql = new_sql_query; - let original_alias = expr_name(&original_expr, &schema)?; - let original_alias_key = Column::from_name(&original_alias); - if let Some(alias_column) = next_remapping.get(&original_alias_key) { - let alias = alias_column.name.clone(); - aliased_columns.push(AliasedColumn { - expr: expr_sql, - alias, - }); - continue; - } - - let alias = if can_rename_columns { - let alias = expr_name(&expr, &schema)?; - let mut truncated_alias = NON_ID_REGEX - .replace_all(&alias, "_") - .trim_start_matches("_") - .to_lowercase(); - truncated_alias.truncate(16); - let mut alias = truncated_alias.clone(); - for i in 1..10000 { - if !next_remapping - .iter() - .any(|(_, v)| v == &Column::from_name(&alias)) - { - break; - } - alias = format!("{}_{}", truncated_alias, i); - } - alias - } else { - original_alias.clone() - }; - if !next_remapping.contains_key(&Column::from_name(&alias)) { - next_remapping.insert(original_alias_key, Column::from_name(&alias)); - if let Some(from_alias) = &from_alias { - next_remapping.insert( - Column { - name: original_alias.clone(), - relation: Some(from_alias.clone()), - }, - Column { - name: alias.clone(), - relation: Some(from_alias.clone()), - }, - ); - if let Expr::Column(column) = &original_expr { - if let Some(original_relation) = &column.relation { - if original_relation != from_alias { - next_remapping.insert( - Column { - name: original_alias.clone(), - relation: Some(original_relation.clone()), - }, - Column { - name: alias.clone(), - relation: Some(from_alias.clone()), - }, - ); - } - } - } - } - } else { - return Err(CubeError::internal(format!( - "Can't generate SQL for column expr: duplicate alias {}", - alias - ))); - } - + let alias = next_remapper.add_expr(&schema, &original_expr, &expr)?; aliased_columns.push(AliasedColumn { expr: expr_sql, alias,