Skip to content

Commit 51482b8

Browse files
committed
fix(cubestore): Allowing window functions in cluster send planning
1 parent f8e9b5a commit 51482b8

File tree

1 file changed

+31
-12
lines changed

1 file changed

+31
-12
lines changed

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ use datafusion::arrow::datatypes::Field;
2525
use datafusion::error::DataFusionError;
2626
use datafusion::physical_plan::empty::EmptyExec;
2727
use datafusion::physical_plan::{
28-
DisplayAs, DisplayFormatType, ExecutionPlan,
29-
PlanProperties, SendableRecordBatchStream,
28+
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
3029
};
3130
use flatbuffers::bitflags::_core::any::Any;
3231
use flatbuffers::bitflags::_core::fmt::Formatter;
@@ -47,12 +46,12 @@ use crate::queryplanner::providers::InfoSchemaQueryCacheTableProvider;
4746
use crate::queryplanner::query_executor::{ClusterSendExec, CubeTable, InlineTableProvider};
4847
use crate::queryplanner::rolling::RollingWindowAggregateSerialized;
4948
use crate::queryplanner::serialized_plan::PreSerializedPlan;
50-
use crate::queryplanner::serialized_plan::{
51-
IndexSnapshot, InlineSnapshot, PartitionSnapshot,
49+
use crate::queryplanner::serialized_plan::{IndexSnapshot, InlineSnapshot, PartitionSnapshot};
50+
use crate::queryplanner::topk::{
51+
materialize_topk, ClusterAggregateTopKLowerSerialized, ClusterAggregateTopKUpperSerialized,
5252
};
5353
use crate::queryplanner::topk::{plan_topk, DummyTopKLowerExec};
54-
use crate::queryplanner::topk::{ClusterAggregateTopKUpper, ClusterAggregateTopKLower};
55-
use crate::queryplanner::topk::{materialize_topk, ClusterAggregateTopKUpperSerialized, ClusterAggregateTopKLowerSerialized};
54+
use crate::queryplanner::topk::{ClusterAggregateTopKLower, ClusterAggregateTopKUpper};
5655
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
5756
use crate::table::{cmp_same_types, Row};
5857
use crate::CubeError;
@@ -64,7 +63,9 @@ use datafusion::execution::{SessionState, TaskContext};
6463
use datafusion::logical_expr::expr::Alias;
6564
use datafusion::logical_expr::utils::expr_to_columns;
6665
use datafusion::logical_expr::{
67-
expr, logical_plan, Aggregate, BinaryExpr, Expr, Extension, FetchType, Filter, InvariantLevel, Join, Limit, LogicalPlan, Operator, Projection, SkipType, Sort, SortExpr, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode
66+
expr, logical_plan, Aggregate, BinaryExpr, Expr, Extension, FetchType, Filter, InvariantLevel,
67+
Join, Limit, LogicalPlan, Operator, Projection, SkipType, Sort, SortExpr, SubqueryAlias,
68+
TableScan, Union, Unnest, UserDefinedLogicalNode,
6869
};
6970
use datafusion::physical_expr::{Distribution, LexRequirement};
7071
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
@@ -1027,7 +1028,10 @@ fn check_aggregates_expr(table: &IdRow<Table>, aggregates: &Vec<Expr>) -> bool {
10271028

10281029
for aggr in aggregates.iter() {
10291030
match aggr {
1030-
Expr::AggregateFunction(expr::AggregateFunction { func, params: expr::AggregateFunctionParams { args, .. } }) => {
1031+
Expr::AggregateFunction(expr::AggregateFunction {
1032+
func,
1033+
params: expr::AggregateFunctionParams { args, .. },
1034+
}) => {
10311035
if args.len() != 1 {
10321036
return false;
10331037
}
@@ -1529,6 +1533,7 @@ fn pull_up_cluster_send(mut p: LogicalPlan) -> Result<LogicalPlan, DataFusionErr
15291533
LogicalPlan::Extension { .. } => return Ok(p),
15301534
// These nodes collect results from multiple partitions, return unchanged.
15311535
LogicalPlan::Aggregate { .. }
1536+
| LogicalPlan::Window { .. }
15321537
| LogicalPlan::Sort { .. }
15331538
| LogicalPlan::Limit { .. }
15341539
| LogicalPlan::Repartition { .. } => return Ok(p),
@@ -1710,7 +1715,8 @@ impl ExtensionPlanner for CubeExtensionPlanner {
17101715
} else if let Some(topk_upper) = node.as_any().downcast_ref::<ClusterAggregateTopKUpper>() {
17111716
assert_eq!(inputs.len(), 1);
17121717
assert_eq!(logical_inputs.len(), 1);
1713-
let msg: &'static str = "ClusterAggregateTopKUpper expects its child to be a ClusterAggregateTopKLower";
1718+
let msg: &'static str =
1719+
"ClusterAggregateTopKUpper expects its child to be a ClusterAggregateTopKLower";
17141720
let LogicalPlan::Extension(Extension { node }) = logical_inputs[0] else {
17151721
return Err(DataFusionError::Internal(msg.to_owned()));
17161722
};
@@ -1719,11 +1725,22 @@ impl ExtensionPlanner for CubeExtensionPlanner {
17191725
};
17201726

17211727
// The input should be (and must be) a DummyTopKLowerExec node.
1722-
let Some(DummyTopKLowerExec { schema: _, input: lower_input }) = inputs[0].as_any().downcast_ref::<DummyTopKLowerExec>() else {
1728+
let Some(DummyTopKLowerExec {
1729+
schema: _,
1730+
input: lower_input,
1731+
}) = inputs[0].as_any().downcast_ref::<DummyTopKLowerExec>()
1732+
else {
17231733
return Err(DataFusionError::Internal("ClusterAggregateTopKUpper expects its physical input to be a DummyTopKLowerExec".to_owned()));
17241734
};
17251735

1726-
Ok(Some(plan_topk(planner, self, topk_upper, lower_node, lower_input.clone(), state)?))
1736+
Ok(Some(plan_topk(
1737+
planner,
1738+
self,
1739+
topk_upper,
1740+
lower_node,
1741+
lower_input.clone(),
1742+
state,
1743+
)?))
17271744
} else if let Some(_) = node.as_any().downcast_ref::<PanicWorkerNode>() {
17281745
assert_eq!(inputs.len(), 0);
17291746
Ok(Some(plan_panic_worker()?))
@@ -1916,7 +1933,9 @@ pub mod tests {
19161933
use crate::queryplanner::pretty_printers::PPOptions;
19171934
use crate::queryplanner::query_executor::ClusterSendExec;
19181935
use crate::queryplanner::serialized_plan::RowRange;
1919-
use crate::queryplanner::{pretty_printers, sql_to_rel_options, CubeTableLogical, QueryPlannerImpl};
1936+
use crate::queryplanner::{
1937+
pretty_printers, sql_to_rel_options, CubeTableLogical, QueryPlannerImpl,
1938+
};
19201939
use crate::sql::parser::{CubeStoreParser, Statement};
19211940
use crate::table::{Row, TableValue};
19221941
use crate::CubeError;

0 commit comments

Comments
 (0)