Skip to content

Commit 0f43fcb

Browse files
committed
chore(cubestore): Upgrade DF 46: Fix rolling window optimization
1 parent a6e60e5 commit 0f43fcb

File tree

1 file changed

+89
-42
lines changed

1 file changed

+89
-42
lines changed

rust/cubestore/cubestore/src/queryplanner/optimizations/rolling_optimizer.rs

Lines changed: 89 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
11
use crate::queryplanner::rolling::RollingWindowAggregate;
2-
use datafusion::arrow::array::{Array, AsArray};
3-
use datafusion::arrow::compute::{date_part, DatePart};
4-
use datafusion::common::tree_node::{
5-
Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
6-
};
2+
use datafusion::arrow::array::Array;
3+
use datafusion::arrow::datatypes::DataType;
4+
use datafusion::common::tree_node::Transformed;
75
use datafusion::common::{Column, DataFusionError, JoinType, ScalarValue, TableReference};
86
use datafusion::functions::datetime::date_part::DatePartFunc;
97
use datafusion::functions::datetime::date_trunc::DateTruncFunc;
108
use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias, ScalarFunction};
119
use datafusion::logical_expr::{
12-
Aggregate, BinaryExpr, Cast, ColumnarValue, Expr, Extension, Join, LogicalPlan, Operator,
13-
Projection, ScalarUDFImpl, SubqueryAlias, Union, Unnest,
10+
Aggregate, BinaryExpr, Cast, ColumnarValue, Expr, Extension, Join, LogicalPlan, Operator, Projection, ScalarFunctionArgs, ScalarUDFImpl, SubqueryAlias, Union, Unnest
1411
};
1512
use datafusion::optimizer::optimizer::ApplyOrder;
1613
use datafusion::optimizer::{OptimizerConfig, OptimizerRule};
1714
use itertools::Itertools;
18-
use mockall::predicate::le;
19-
use std::collections::HashMap;
2015
use std::sync::Arc;
2116

2217
/// Rewrites following logical plan:
@@ -194,6 +189,7 @@ impl RollingOptimizerRule {
194189
_ => None,
195190
})
196191
.collect::<Option<Vec<_>>>()?;
192+
197193
let RollingWindowJoinExtractorResult {
198194
input,
199195
dimension,
@@ -261,6 +257,7 @@ impl RollingOptimizerRule {
261257
}) => {
262258
let left_series = Self::extract_series_projection(left)
263259
.or_else(|| Self::extract_series_union(left))?;
260+
264261
let RollingWindowBoundsExtractorResult {
265262
lower_bound,
266263
upper_bound,
@@ -596,10 +593,17 @@ impl RollingOptimizerRule {
596593
LogicalPlan::Unnest(Unnest {
597594
input,
598595
exec_columns,
596+
schema,
599597
..
600598
}) => {
601599
let series_column = exec_columns.iter().next().cloned()?;
602-
Self::extract_series_from_unnest(input, series_column)
600+
let series = Self::extract_series_from_unnest(input, series_column);
601+
let col = schema.field(0).name();
602+
series.map(|mut series| {
603+
series.from_col = Column::from_name(col);
604+
series.to_col = series.from_col.clone();
605+
series
606+
})
603607
}
604608
_ => None,
605609
}
@@ -633,15 +637,17 @@ impl RollingOptimizerRule {
633637
});
634638
}
635639
Expr::Literal(ScalarValue::List(list)) => {
640+
636641
// TODO why does first element holds the array? Is it always the case?
637642
let array = list.iter().next().as_ref().cloned()??;
638643
let from = ScalarValue::try_from_array(&array, 0).ok()?;
639644
let to =
640645
ScalarValue::try_from_array(&array, array.len() - 1).ok()?;
641646

647+
let index_1 = ScalarValue::try_from_array(&array, 1).ok()?;
642648
let every = month_aware_sub(
643649
&from,
644-
&ScalarValue::try_from_array(&array, 1).ok()?,
650+
&index_1,
645651
)?;
646652

647653
return Some(RollingWindowSeriesExtractorResult {
@@ -700,58 +706,99 @@ pub fn month_aware_sub(from: &ScalarValue, to: &ScalarValue) -> Option<ScalarVal
700706
| ScalarValue::TimestampMicrosecond(_, None)
701707
| ScalarValue::TimestampNanosecond(_, None),
702708
) => {
709+
let from_type = from.data_type();
710+
let to_type = to.data_type();
703711
// TODO lookup from registry?
704712
let date_trunc = DateTruncFunc::new();
705-
let date_part = DatePartFunc::new();
706713
let from_trunc = date_trunc
707-
.invoke(&[
708-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("month".to_string()))),
709-
ColumnarValue::Scalar(from.clone()),
710-
])
714+
.invoke_with_args(
715+
ScalarFunctionArgs {
716+
args: vec![
717+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("month".to_string()))),
718+
ColumnarValue::Scalar(from.clone()),
719+
],
720+
number_rows: 1,
721+
return_type: &from_type,
722+
},
723+
)
711724
.ok()?;
712725
let to_trunc = date_trunc
713-
.invoke(&[
714-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("month".to_string()))),
715-
ColumnarValue::Scalar(to.clone()),
716-
])
726+
.invoke_with_args(
727+
ScalarFunctionArgs {
728+
args: vec![
729+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("month".to_string()))),
730+
ColumnarValue::Scalar(to.clone()),
731+
],
732+
number_rows: 1,
733+
return_type: &to_type,
734+
},
735+
)
717736
.ok()?;
718737
match (from_trunc, to_trunc) {
719738
(ColumnarValue::Scalar(from_trunc), ColumnarValue::Scalar(to_trunc)) => {
739+
// TODO as with date_trunc above, lookup from registry?
740+
let date_part = DatePartFunc::new();
741+
720742
if from.sub(from_trunc.clone()).ok() == to.sub(to_trunc.clone()).ok() {
721743
let from_month = date_part
722-
.invoke(&[
723-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("month".to_string()))),
724-
ColumnarValue::Scalar(from_trunc.clone()),
725-
])
744+
.invoke_with_args(
745+
ScalarFunctionArgs {
746+
args: vec![
747+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("month".to_string()))),
748+
ColumnarValue::Scalar(from_trunc.clone()),
749+
],
750+
number_rows: 1,
751+
return_type: &DataType::Int32,
752+
},
753+
)
726754
.ok()?;
727755
let from_year = date_part
728-
.invoke(&[
729-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("year".to_string()))),
730-
ColumnarValue::Scalar(from_trunc.clone()),
731-
])
756+
.invoke_with_args(
757+
ScalarFunctionArgs {
758+
args: vec![
759+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("year".to_string()))),
760+
ColumnarValue::Scalar(from_trunc.clone()),
761+
],
762+
number_rows: 1,
763+
return_type: &DataType::Int32,
764+
},
765+
)
732766
.ok()?;
733767
let to_month = date_part
734-
.invoke(&[
735-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("month".to_string()))),
736-
ColumnarValue::Scalar(to_trunc.clone()),
737-
])
768+
.invoke_with_args(
769+
ScalarFunctionArgs {
770+
args: vec![
771+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("month".to_string()))),
772+
ColumnarValue::Scalar(to_trunc.clone()),
773+
],
774+
number_rows: 1,
775+
return_type: &DataType::Int32,
776+
},
777+
)
738778
.ok()?;
739779
let to_year = date_part
740-
.invoke(&[
741-
ColumnarValue::Scalar(ScalarValue::Utf8(Some("year".to_string()))),
742-
ColumnarValue::Scalar(to_trunc.clone()),
743-
])
780+
.invoke_with_args(
781+
ScalarFunctionArgs {
782+
args: vec![
783+
ColumnarValue::Scalar(ScalarValue::Utf8(Some("year".to_string()))),
784+
ColumnarValue::Scalar(to_trunc.clone()),
785+
],
786+
number_rows: 1,
787+
return_type: &DataType::Int32,
788+
},
789+
)
744790
.ok()?;
791+
745792
match (from_month, from_year, to_month, to_year) {
746793
(
747-
ColumnarValue::Scalar(ScalarValue::Float64(Some(from_month))),
748-
ColumnarValue::Scalar(ScalarValue::Float64(Some(from_year))),
749-
ColumnarValue::Scalar(ScalarValue::Float64(Some(to_month))),
750-
ColumnarValue::Scalar(ScalarValue::Float64(Some(to_year))),
794+
ColumnarValue::Scalar(ScalarValue::Int32(Some(from_month))),
795+
ColumnarValue::Scalar(ScalarValue::Int32(Some(from_year))),
796+
ColumnarValue::Scalar(ScalarValue::Int32(Some(to_month))),
797+
ColumnarValue::Scalar(ScalarValue::Int32(Some(to_year))),
751798
) => {
752799
return Some(ScalarValue::IntervalYearMonth(Some(
753-
(to_year - from_year) as i32 * 12
754-
+ (to_month - from_month) as i32,
800+
(to_year - from_year) * 12
801+
+ (to_month - from_month),
755802
)))
756803
}
757804
_ => {}

0 commit comments

Comments
 (0)