@@ -470,16 +470,45 @@ impl KafkaPostProcessPlanner {
470470 }
471471
472472 fn get_source_unique_column ( & self , expr : & Expr ) -> Result < Column , CubeError > {
473+ fn find_column_name ( expr : & Expr ) -> Result < Option < String > , CubeError > {
474+ match expr {
475+ Expr :: Column ( c) => Ok ( Some ( c. name . clone ( ) ) ) ,
476+ Expr :: Alias ( e, _) => find_column_name ( & * * e) ,
477+ Expr :: ScalarUDF { args, .. } => {
478+ let mut column_name: Option < String > = None ;
479+ for arg in args {
480+ if let Some ( name) = find_column_name ( arg) ? {
481+ if let Some ( existing_name) = & column_name {
482+ if existing_name != & name {
483+ return Err ( CubeError :: user (
484+ format ! ( "Scalar function can only use a single column, expression: {:?}" , expr) ,
485+ ) ) ;
486+ }
487+ } else {
488+ column_name = Some ( name) ;
489+ }
490+ }
491+ }
492+ Ok ( column_name)
493+ }
494+ _ => Ok ( None ) ,
495+ }
496+ }
497+
473498 let source_name = match expr {
474499 Expr :: Column ( c) => Ok ( c. name . clone ( ) ) ,
475500 Expr :: Alias ( e, _) => match & * * e {
476501 Expr :: Column ( c) => Ok ( c. name . clone ( ) ) ,
502+ Expr :: ScalarUDF { .. } => find_column_name ( expr) ?. ok_or_else ( || {
503+ CubeError :: user ( format ! ( "Scalar function must contain at least one column, expression: {:?}" , expr) )
504+ } ) ,
477505 _ => Err ( CubeError :: user ( format ! (
478- "Unique key can't be an expression in kafka streaming queries"
506+ "Unique key can't be an expression in kafka streaming queries, expression: {:?}" ,
507+ expr
479508 ) ) ) ,
480509 } ,
481510 _ => Err ( CubeError :: user (
482- "All expressions must have aliases in kafka streaming queries" . to_string ( ) ,
511+ format ! ( "All expressions must have aliases in kafka streaming queries, expression: {:?}" , expr ) ,
483512 ) ) ,
484513 } ?;
485514
0 commit comments