diff --git a/rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs b/rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs index 4f47517e62e9e..79eb7f47d3592 100644 --- a/rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs +++ b/rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs @@ -470,16 +470,45 @@ impl KafkaPostProcessPlanner { } fn get_source_unique_column(&self, expr: &Expr) -> Result { + fn find_column_name(expr: &Expr) -> Result, CubeError> { + match expr { + Expr::Column(c) => Ok(Some(c.name.clone())), + Expr::Alias(e, _) => find_column_name(&**e), + Expr::ScalarUDF { args, .. } => { + let mut column_name: Option = None; + for arg in args { + if let Some(name) = find_column_name(arg)? { + if let Some(existing_name) = &column_name { + if existing_name != &name { + return Err(CubeError::user( + format!("Scalar function can only use a single column, expression: {:?}", expr), + )); + } + } else { + column_name = Some(name); + } + } + } + Ok(column_name) + } + _ => Ok(None), + } + } + let source_name = match expr { Expr::Column(c) => Ok(c.name.clone()), Expr::Alias(e, _) => match &**e { Expr::Column(c) => Ok(c.name.clone()), + Expr::ScalarUDF { .. } => find_column_name(expr)?.ok_or_else(|| { + CubeError::user(format!("Scalar function must contain at least one column, expression: {:?}", expr)) + }), _ => Err(CubeError::user(format!( - "Unique key can't be an expression in kafka streaming queries" + "Unique key can't be an expression in kafka streaming queries, expression: {:?}", + expr ))), }, _ => Err(CubeError::user( - "All expressions must have aliases in kafka streaming queries".to_string(), + format!("All expressions must have aliases in kafka streaming queries, expression: {:?}", expr), )), }?; diff --git a/rust/cubestore/cubestore/src/streaming/mod.rs b/rust/cubestore/cubestore/src/streaming/mod.rs index f73426df12b33..90c90ba0d59d1 100644 --- a/rust/cubestore/cubestore/src/streaming/mod.rs +++ b/rust/cubestore/cubestore/src/streaming/mod.rs @@ -1710,6 +1710,24 @@ mod tests { unique key (`message_id`, `an_id`) INDEX by_anonymous(`message_id`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") .await .expect_err("Validation should fail"); + + service + .exec_query("CREATE TABLE test.events_by_type_6 (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int, `TIMESTAMP` timestamp, `TIMESTAMP_SECOND` timestamp) \ + WITH (\ + stream_offset = 'earliest', + select_statement = 'SELECT \ + ANONYMOUSID, MESSAGEID, FILTER_ID, TIMESTAMP, \ + PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(TIMESTAMP, \\'UTC\\', \\'UTC\\'), \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.000\\'), \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSS\\', \\'UTC\\') `TIMESTAMP_SECOND` \ + FROM EVENTS_BY_TYPE \ + WHERE PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') >= PARSE_TIMESTAMP(\\'1970-01-01T01:00:00.000Z\\', \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') \ + AND + PARSE_TIMESTAMP(TIMESTAMP, \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') < PARSE_TIMESTAMP(\\'1970-01-01T01:10:00.000Z\\', \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') \ + \ + '\ + ) \ + unique key (`ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`, `TIMESTAMP`, `TIMESTAMP_SECOND`) INDEX by_anonymous(`ANONYMOUSID`, `TIMESTAMP_SECOND`,`TIMESTAMP`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") + .await + .unwrap(); }) .await; }