Skip to content

Commit 16be1c5

Browse files
committed
dev
1 parent 7ccd607 commit 16be1c5

File tree

1 file changed

+51
-3
lines changed

1 file changed

+51
-3
lines changed

rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,13 +470,61 @@ impl KafkaPostProcessPlanner {
470470
}
471471

472472
fn get_source_unique_column(&self, expr: &Expr) -> Result<Column, CubeError> {
473+
fn find_column_name(expr: &Expr) -> Option<String> {
474+
match expr {
475+
Expr::Column(c) => Some(c.name.clone()),
476+
Expr::Alias(e, _) => find_column_name(&**e),
477+
Expr::ScalarUDF { args, .. } => {
478+
let mut column_name: Option<String> = None;
479+
for arg in args {
480+
if let Some(name) = find_column_name(arg) {
481+
if let Some(existing_name) = &column_name {
482+
if existing_name != &name {
483+
return None;
484+
}
485+
} else {
486+
column_name = Some(name);
487+
}
488+
}
489+
}
490+
column_name
491+
}
492+
_ => None,
493+
}
494+
}
495+
473496
let source_name = match expr {
474497
Expr::Column(c) => Ok(c.name.clone()),
475498
Expr::Alias(e, _) => match &**e {
476499
Expr::Column(c) => Ok(c.name.clone()),
477-
_ => Err(CubeError::user(format!(
478-
"Unique key can't be an expression in kafka streaming queries"
479-
))),
500+
Expr::ScalarUDF { args, .. } => {
501+
let mut column_name: Option<String> = None;
502+
for arg in args {
503+
match find_column_name(arg) {
504+
Some(name) => {
505+
if let Some(existing_name) = &column_name {
506+
if existing_name != &name {
507+
return Err(CubeError::user(
508+
"Scalar function can only use a single column".to_string(),
509+
));
510+
}
511+
} else {
512+
column_name = Some(name);
513+
}
514+
}
515+
_ => {},
516+
}
517+
}
518+
column_name.ok_or_else(|| {
519+
CubeError::user("Scalar function must contain at least one column".to_string())
520+
})
521+
},
522+
_ => {
523+
println!("unknown expr: {:?}", e);
524+
Err(CubeError::user(format!(
525+
"Unique key can't be an expression in kafka streaming queries"
526+
)))
527+
},
480528
},
481529
_ => Err(CubeError::user(
482530
"All expressions must have aliases in kafka streaming queries".to_string(),

0 commit comments

Comments
 (0)