Skip to content

Commit 8295069

Browse files
committed
WIP: Use with_upper_expressions for topk
1 parent fb71a4c commit 8295069

File tree

1 file changed

+39
-5
lines changed
  • rust/cubestore/cubestore/src/queryplanner/topk

1 file changed

+39
-5
lines changed

rust/cubestore/cubestore/src/queryplanner/topk/mod.rs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod execute;
22
mod plan;
33
mod util;
44

5+
use datafusion::error::DataFusionError;
56
use datafusion::execution::FunctionRegistry;
67
use datafusion_proto::bytes::Serializeable;
78
pub use execute::AggregateTopKExec;
@@ -168,14 +169,43 @@ impl UserDefinedLogicalNode for ClusterAggregateTopK {
168169
.collect_vec();
169170
// TODO upgrade DF: DF's type_coercion analysis pass doesn't like these exprs (which are
170171
// defined on the aggregate's output schema instead of the input schema). Maybe we should
171-
// split ClusterAggregateTopK into separate logical nodes. Omitting having_expr is a bit of
172-
// a hack...
172+
// split ClusterAggregateTopK into separate logical nodes. Instead we (hackishly) use
173+
// upper_expressions.
173174
if false && self.having_expr.is_some() {
174175
res.push(self.having_expr.clone().unwrap());
175176
}
176177
res
177178
}
178179

180+
// Cube extension.
181+
fn upper_expressions(&self) -> Vec<Expr> {
182+
if let Some(e) = &self.having_expr {
183+
vec![e.clone()]
184+
} else {
185+
vec![]
186+
}
187+
}
188+
189+
// Cube extension.
190+
fn with_upper_expressions(&self, upper_exprs: Vec<Expr>) -> Result<Option<Arc<dyn UserDefinedLogicalNode>>, DataFusionError> {
191+
assert_eq!(usize::from(self.having_expr.is_some()), upper_exprs.len());
192+
if self.having_expr.is_some() {
193+
let having_expr = Some(upper_exprs.into_iter().next().unwrap());
194+
Ok(Some(Arc::new(ClusterAggregateTopK {
195+
limit: self.limit,
196+
input: self.input.clone(),
197+
group_expr: self.group_expr.clone(),
198+
aggregate_expr: self.aggregate_expr.clone(),
199+
order_by: self.order_by.clone(),
200+
having_expr,
201+
schema: self.schema.clone(),
202+
snapshots: self.snapshots.clone(),
203+
})))
204+
} else {
205+
Ok(None)
206+
}
207+
}
208+
179209
fn fmt_for_explain<'a>(&self, f: &mut Formatter<'a>) -> std::fmt::Result {
180210
write!(
181211
f,
@@ -188,10 +218,14 @@ impl UserDefinedLogicalNode for ClusterAggregateTopK {
188218
&self,
189219
exprs: Vec<Expr>,
190220
inputs: Vec<LogicalPlan>,
191-
) -> datafusion::common::Result<Arc<dyn UserDefinedLogicalNode>> {
221+
) -> Result<Arc<dyn UserDefinedLogicalNode>, DataFusionError> {
192222
let num_groups = self.group_expr.len();
193223
let num_aggs = self.aggregate_expr.len();
194-
// TODO upgrade DF: See expressions() comment -- we make the having expressions be "invisible" because they're defined on the output schema.
224+
225+
// TODO upgrade DF: See expressions() comment; having_expr is part of the
226+
// upper_expressions() -- we make the having expressions be "invisible" because they're
227+
// defined on the output schema.
228+
195229
// let num_having = if self.having_expr.is_some() { 1 } else { 0 };
196230
assert_eq!(inputs.len(), 1);
197231
assert_eq!(exprs.len(), num_groups + num_aggs /* + num_having */); // TODO upgrade DF
@@ -200,7 +234,7 @@ impl UserDefinedLogicalNode for ClusterAggregateTopK {
200234
// } else {
201235
// None
202236
// };
203-
let having_expr = self.having_expr.clone(); // TODO upgrade DF
237+
let having_expr = self.having_expr.clone();
204238
Ok(Arc::new(ClusterAggregateTopK {
205239
limit: self.limit,
206240
input: Arc::new(inputs[0].clone()),

0 commit comments

Comments
 (0)