Skip to content

Commit daf7088

Browse files
committed
dev
1 parent f8ed9a6 commit daf7088

File tree

1 file changed

+11
-31
lines changed

1 file changed

+11
-31
lines changed

rust/cubestore/cubestore/src/streaming/kafka_post_processing.rs

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -470,58 +470,38 @@ impl KafkaPostProcessPlanner {
470470
}
471471

472472
fn get_source_unique_column(&self, expr: &Expr) -> Result<Column, CubeError> {
473-
fn find_column_name(expr: &Expr) -> Option<String> {
473+
fn find_column_name(expr: &Expr) -> Result<Option<String>, CubeError> {
474474
match expr {
475-
Expr::Column(c) => Some(c.name.clone()),
475+
Expr::Column(c) => Ok(Some(c.name.clone())),
476476
Expr::Alias(e, _) => find_column_name(&**e),
477477
Expr::ScalarUDF { args, .. } => {
478478
let mut column_name: Option<String> = None;
479479
for arg in args {
480-
if let Some(name) = find_column_name(arg) {
480+
if let Some(name) = find_column_name(arg)? {
481481
if let Some(existing_name) = &column_name {
482482
if existing_name != &name {
483-
return None;
483+
return Err(CubeError::user(
484+
"Scalar function can only use a single column".to_string(),
485+
));
484486
}
485487
} else {
486488
column_name = Some(name);
487489
}
488490
}
489491
}
490-
column_name
492+
Ok(column_name)
491493
}
492-
_ => None,
494+
_ => Ok(None),
493495
}
494496
}
495497

496498
let source_name = match expr {
497499
Expr::Column(c) => Ok(c.name.clone()),
498500
Expr::Alias(e, _) => match &**e {
499501
Expr::Column(c) => Ok(c.name.clone()),
500-
Expr::ScalarUDF { args, .. } => {
501-
let mut column_name: Option<String> = None;
502-
for arg in args {
503-
match find_column_name(arg) {
504-
Some(name) => {
505-
if let Some(existing_name) = &column_name {
506-
if existing_name != &name {
507-
return Err(CubeError::user(
508-
"Scalar function can only use a single column"
509-
.to_string(),
510-
));
511-
}
512-
} else {
513-
column_name = Some(name);
514-
}
515-
}
516-
_ => {}
517-
}
518-
}
519-
column_name.ok_or_else(|| {
520-
CubeError::user(
521-
"Scalar function must contain at least one column".to_string(),
522-
)
523-
})
524-
}
502+
Expr::ScalarUDF { .. } => find_column_name(expr)?.ok_or_else(|| {
503+
CubeError::user("Scalar function must contain at least one column".to_string())
504+
}),
525505
_ => {
526506
println!("unknown expr: {:?}", e);
527507
Err(CubeError::user(format!(

0 commit comments

Comments
 (0)