Skip to content

Commit d10bf5b

Browse files
committed
WIP: Make extract_projection_and_having handle multiple projections/havings.
1 parent d5959f3 commit d10bf5b

File tree

1 file changed

+147
-43
lines changed
  • rust/cubestore/cubestore/src/queryplanner/topk

1 file changed

+147
-43
lines changed

rust/cubestore/cubestore/src/queryplanner/topk/plan.rs

Lines changed: 147 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::queryplanner::udfs::{
88
};
99
use datafusion::arrow::compute::SortOptions;
1010
use datafusion::arrow::datatypes::{DataType, Field, Schema};
11+
use datafusion::common::tree_node::{Transformed, TreeNode};
1112
use datafusion::error::DataFusionError;
1213
use datafusion::execution::SessionState;
1314
use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction};
@@ -78,7 +79,7 @@ pub fn materialize_topk(p: LogicalPlan) -> Result<LogicalPlan, DataFusionError>
7879

7980
/// Returns Ok(None) when materialization failed (without error) and the original plan should be returned.
8081
fn materialize_topk_under_limit_sort(fetch: usize, sort_expr: &Vec<SortExpr>, sort_input: &Arc<LogicalPlan>) -> Result<Option<LogicalPlan>, DataFusionError> {
81-
let projection = extract_projection_and_having(&sort_input);
82+
let projection = extract_projections_and_havings(&sort_input)?;
8283

8384
let aggregate = projection.as_ref().map(|p| p.input).unwrap_or(sort_input);
8485
match aggregate.as_ref() {
@@ -238,79 +239,182 @@ fn extract_aggregate_fun(e: &Expr) -> Option<(TopKAggregateFunction, &Vec<Expr>)
238239

239240
#[derive(Debug)]
240241
struct ColumnProjection<'a> {
242+
// The (sole) column index within `input.schema()` that the post_projection expr uses.
241243
input_columns: Vec<usize>,
242244
input: &'a Arc<LogicalPlan>,
245+
// Output schema (after applying `having_expr` and then `post_projection` and then aliases). In
246+
// other words, this saves the top level projection's aliases.
243247
schema: &'a DFSchemaRef,
248+
// Defined on `input` schema. Excludes Expr::Aliases necessary to produce the output schema, `schema`.
244249
post_projection: Vec<Expr>,
250+
// Defined on `input` schema
245251
having_expr: Option<Expr>,
246252
}
247253

248-
fn extract_having(p: &Arc<LogicalPlan>) -> (Option<Expr>, &Arc<LogicalPlan>) {
249-
// Filter's "having" flag is not relevant to us. It is used by DF to get the proper wildcard
250-
// expansion behavior in the analysis pass (before LogicalPlan optimizations, and before we
251-
// materialize the topk node here).
252-
match p.as_ref() {
253-
LogicalPlan::Filter(Filter { predicate, input, having: _, .. }) => (Some(predicate.clone()), input),
254-
_ => (None, p),
255-
}
256-
}
257254

258-
fn extract_projection_and_having(p: &LogicalPlan) -> Option<ColumnProjection> {
259-
match p {
255+
fn extract_projections_and_havings(p: &Arc<LogicalPlan>) -> Result<Option<ColumnProjection>, DataFusionError> {
256+
// Goal: Deal with arbitrary series of Projection and Filter, where the Projections are column
257+
// projections (or cardinality(column)), on top of an underlying node.
258+
//
259+
// Real world example: p = Projection > Filter > Projection > Aggregation
260+
//
261+
// Because the Sort node above p is defined in terms of the projection outputs, it needs those
262+
// outputs remapped to projection inputs.
263+
264+
match p.as_ref() {
260265
LogicalPlan::Projection(Projection {
261266
expr,
262267
input,
263268
schema,
264269
..
265270
}) => {
266271
let in_schema = input.schema();
267-
let mut input_columns = Vec::with_capacity(expr.len());
268-
let mut post_projection = Vec::with_capacity(expr.len());
272+
let mut input_columns: Vec<usize> = Vec::with_capacity(expr.len());
273+
274+
// Check that this projection is a column (or cardinality(column)) projection first.
269275
for e in expr {
270-
fn make_column(in_field_qualifier: Option<&TableReference>, in_field: &Field) -> datafusion::common::Column {
271-
datafusion::common::Column {
272-
relation: in_field_qualifier.map(|tr| tr.clone()),
273-
name: in_field.name().clone(),
274-
}
275-
}
276276
match e {
277277
Expr::Alias(Alias { expr: box Expr::Column(c), relation: _, name: _ }) | Expr::Column(c) => {
278-
let fi = field_index(in_schema, c.relation.as_ref(), &c.name)?;
278+
// TODO upgrade DF: These field_index errors should return Err (right?) as
279+
// we do in the Filter case only because it's in a transform_up with map_data.
280+
let Some(fi) = field_index(in_schema, c.relation.as_ref(), &c.name) else {
281+
return Ok(None);
282+
};
279283
input_columns.push(fi);
280-
let (in_field_qualifier, in_field) = in_schema.qualified_field(fi);
281-
post_projection.push(Expr::Column(make_column(in_field_qualifier, in_field)));
282-
}
283-
Expr::Alias(Alias { expr: box Expr::ScalarFunction(ScalarFunction { func, args }), relation: _, name: _ })
284+
},
285+
Expr::Alias(Alias { expr: box Expr::ScalarFunction(ScalarFunction { func, args }), relation: _, name: _})
284286
| Expr::ScalarFunction(ScalarFunction { func, args }) => match func.name() {
285-
// TODO upgrade DF: use as_any() or something
287+
// TODO upgrade DF: Use as_any() or something
286288
"cardinality" => match &args[0] {
287289
Expr::Column(c) => {
288-
let fi = field_index(in_schema, c.relation.as_ref(), &c.name)?;
290+
let Some(fi) = field_index(in_schema, c.relation.as_ref(), &c.name) else {
291+
return Ok(None);
292+
};
289293
input_columns.push(fi);
290-
let (in_field_qualifier, in_field) = in_schema.qualified_field(fi);
291-
post_projection.push(Expr::ScalarFunction(ScalarFunction {
292-
func: scalar_udf_by_kind(CubeScalarUDFKind::HllCardinality),
293-
args: vec![Expr::Column(make_column(in_field_qualifier, in_field))],
294-
}));
295294
}
296-
_ => return None,
295+
_ => return Ok(None),
297296
},
298-
_ => return None,
297+
_ => return Ok(None),
299298
},
299+
_ => return Ok(None),
300+
};
301+
}
300302

301-
_ => return None,
302-
}
303+
// Now recurse.
304+
let inner_column_projection = extract_projections_and_havings(input)?;
305+
let Some(inner_column_projection) = inner_column_projection else {
306+
return Ok(None);
307+
};
308+
309+
// Now apply our projection on top of the recursion
310+
311+
// input_columns[i] is the (sole) column number of `input.schema()` used by expr[i].
312+
// inner_column_projection[j] is the (sole) column number of the presumed underlying `aggregate.schema()` used by inner expr j.
313+
// So inner_column_projection[input_columns[i]] is the column number of the presumed underlying `aggregate.schema()` used by expr[i].
314+
315+
let mut deep_input_columns = Vec::with_capacity(expr.len());
316+
for i in 0..expr.len() {
317+
let j = input_columns[i];
318+
deep_input_columns.push(inner_column_projection.input_columns[j]);
303319
}
304-
let (having_expr, input) = extract_having(input);
305-
Some(ColumnProjection {
306-
input_columns,
307-
input,
320+
321+
let mut new_post_projection = Vec::with_capacity(expr.len());
322+
323+
// And our projection's Column expressions need to be replaced with the inner post_projection expressions.
324+
for (i, e) in expr.iter().enumerate() {
325+
let new_e = e.clone().transform_up(|node| {
326+
node.unalias_nested().transform_data(|node|
327+
match node {
328+
Expr::Column(c) => {
329+
let replacement: Expr = inner_column_projection.post_projection[input_columns[i]].clone();
330+
// Transformed::yes/no doesn't matter here.
331+
// let unequal = &replacement != &node;
332+
Ok(Transformed::yes(replacement))
333+
},
334+
_ => Ok(Transformed::no(node))
335+
}
336+
)
337+
})?;
338+
new_post_projection.push(new_e.data);
339+
}
340+
341+
let column_projection = ColumnProjection {
342+
input_columns: deep_input_columns,
343+
input: inner_column_projection.input,
308344
schema,
345+
post_projection: new_post_projection,
346+
having_expr: inner_column_projection.having_expr,
347+
};
348+
349+
return Ok(Some(column_projection));
350+
},
351+
LogicalPlan::Filter(Filter {
352+
predicate,
353+
input,
354+
having: _,
355+
..
356+
}) => {
357+
// Filter's "having" flag is not relevant to us. It is used by DF to get the proper wildcard
358+
// expansion behavior in the analysis pass (before LogicalPlan optimizations, and before we
359+
// materialize the topk node here).
360+
361+
// First, recurse.
362+
let inner_column_projection = extract_projections_and_havings(input)?;
363+
let Some(inner_column_projection) = inner_column_projection else {
364+
return Ok(None);
365+
};
366+
367+
let in_schema = input.schema();
368+
369+
// Our filter's columns, defined in terms of in_schema, need to be mapped to inner_column_projection.input.schema().
370+
let transformed_predicate = predicate.clone().transform_up(|node| {
371+
node.unalias_nested().transform_data(|node|
372+
match node {
373+
Expr::Column(c) => {
374+
let Some(fi) = field_index(in_schema, c.relation.as_ref(), &c.name) else {
375+
return Err(DataFusionError::Plan(format!("Field '{}' not found", &c.name))); // TODO upgrade DF: spend brain cycles on error message
376+
};
377+
let replacement = inner_column_projection.post_projection[fi].clone();
378+
// Transformed::yes/no doesn't matter here.
379+
// let unequal = &replacement != &node;
380+
Ok(Transformed::yes(replacement))
381+
},
382+
_ => Ok(Transformed::no(node))
383+
}
384+
)
385+
})?.data;
386+
387+
let column_projection = ColumnProjection {
388+
input_columns: inner_column_projection.input_columns,
389+
input: inner_column_projection.input,
390+
schema: inner_column_projection.schema,
391+
post_projection: inner_column_projection.post_projection,
392+
having_expr: Some(if let Some(previous_predicate) = inner_column_projection.having_expr {
393+
previous_predicate.and(transformed_predicate)
394+
} else {
395+
transformed_predicate
396+
}),
397+
};
398+
399+
return Ok(Some(column_projection));
400+
}
401+
_ => {
402+
let in_schema = p.schema();
403+
let post_projection: Vec<Expr> = in_schema.iter().map(|(in_field_qualifier, in_field)|
404+
Expr::Column(datafusion::common::Column {
405+
relation: in_field_qualifier.cloned(),
406+
name: in_field.name().clone(),
407+
})
408+
).collect();
409+
let column_projection = ColumnProjection {
410+
input_columns: (0..post_projection.len()).collect(),
411+
input: p,
412+
schema: in_schema,
309413
post_projection,
310-
having_expr,
311-
})
414+
having_expr: None,
415+
};
416+
return Ok(Some(column_projection));
312417
}
313-
_ => None,
314418
}
315419
}
316420

0 commit comments

Comments
 (0)