Skip to content

Commit 96308be

Browse files
committed
WIP: Make field_index return Result
1 parent 8898235 commit 96308be

File tree

1 file changed

+13
-22
lines changed
  • rust/cubestore/cubestore/src/queryplanner/topk

1 file changed

+13
-22
lines changed

rust/cubestore/cubestore/src/queryplanner/topk/plan.rs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ fn materialize_topk_under_limit_sort(fetch: usize, sort_expr: &Vec<SortExpr>, so
107107
&sort_expr,
108108
sort_input.schema(),
109109
projection.as_ref().map(|c| c.input_columns.as_slice()),
110-
) {
110+
)? {
111111
sort_columns = sc;
112112
} else {
113113
return Ok(None);
@@ -276,21 +276,15 @@ fn extract_projections_and_havings(p: &Arc<LogicalPlan>) -> Result<Option<Column
276276
for e in expr {
277277
match e {
278278
Expr::Alias(Alias { expr: box Expr::Column(c), relation: _, name: _ }) | Expr::Column(c) => {
279-
// TODO upgrade DF: These field_index errors should return Err (right?) as
280-
// we do in the Filter case only because it's in a transform_up with map_data.
281-
let Some(fi) = field_index(in_schema, c.relation.as_ref(), &c.name) else {
282-
return Ok(None);
283-
};
279+
let fi = field_index(in_schema, c.relation.as_ref(), &c.name)?;
284280
input_columns.push(fi);
285281
},
286282
Expr::Alias(Alias { expr: box Expr::ScalarFunction(ScalarFunction { func, args }), relation: _, name: _})
287283
| Expr::ScalarFunction(ScalarFunction { func, args }) => match func.name() {
288284
// TODO upgrade DF: Use as_any() or something
289285
"cardinality" => match &args[0] {
290286
Expr::Column(c) => {
291-
let Some(fi) = field_index(in_schema, c.relation.as_ref(), &c.name) else {
292-
return Ok(None);
293-
};
287+
let fi = field_index(in_schema, c.relation.as_ref(), &c.name)?;
294288
input_columns.push(fi);
295289
}
296290
_ => return Ok(None),
@@ -372,9 +366,7 @@ fn extract_projections_and_havings(p: &Arc<LogicalPlan>) -> Result<Option<Column
372366
node.unalias_nested().transform_data(|node|
373367
match node {
374368
Expr::Column(c) => {
375-
let Some(fi) = field_index(in_schema, c.relation.as_ref(), &c.name) else {
376-
return Err(DataFusionError::Plan(format!("Field '{}' not found", &c.name))); // TODO upgrade DF: spend brain cycles on error message
377-
};
369+
let fi = field_index(in_schema, c.relation.as_ref(), &c.name)?;
378370
let replacement = inner_column_projection.post_projection[fi].clone();
379371
// Transformed::yes/no doesn't matter here.
380372
// let unequal = &replacement != &node;
@@ -424,7 +416,7 @@ fn extract_sort_columns(
424416
sort_expr: &[SortExpr],
425417
schema: &DFSchema,
426418
projection: Option<&[usize]>,
427-
) -> Option<Vec<SortColumn>> {
419+
) -> Result<Option<Vec<SortColumn>>, DataFusionError> {
428420
let mut sort_columns = Vec::with_capacity(sort_expr.len());
429421
for e in sort_expr {
430422
let SortExpr {expr, asc, nulls_first} = e;
@@ -435,27 +427,26 @@ fn extract_sort_columns(
435427
index = p[index];
436428
}
437429
if index < group_key_len {
438-
return None;
430+
return Ok(None);
439431
}
440432
sort_columns.push(SortColumn {
441433
agg_index: index - group_key_len,
442434
asc: *asc,
443435
nulls_first: *nulls_first,
444436
})
445437
}
446-
_ => return None,
438+
_ => return Ok(None),
447439
}
448440
}
449-
Some(sort_columns)
441+
Ok(Some(sort_columns))
450442
}
451443

452-
fn field_index(schema: &DFSchema, qualifier: Option<&TableReference>, name: &str) -> Option<usize> {
444+
// It is actually an error if expressions are nonsense expressions that don't evaluate on the given
445+
// schema. So we return Result (instead of Option<_>) now.
446+
fn field_index(schema: &DFSchema, qualifier: Option<&TableReference>, name: &str) -> Result<usize, DataFusionError> {
447+
// Calling field_not_found is exactly `schema.index_of_column(col: &Column)` behavior.
453448
schema.index_of_column_by_name(qualifier, name)
454-
455-
// TODO upgrade DF: Reconsider.
456-
// schema
457-
// .iter()
458-
// .position(|f| f.qualifier().map(|s| s.as_str()) == qualifier && f.name() == name)
449+
.ok_or_else(|| datafusion::common::field_not_found(qualifier.cloned(), name, schema))
459450
}
460451

461452
pub fn plan_topk(

0 commit comments

Comments
 (0)