Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ cubes:

views:
- name: customers_view

cubes:
- join_path: customers
includes:
Expand Down Expand Up @@ -384,4 +383,26 @@ views:
// Skipping because it works only in Tesseract
});
}

if (getEnv('nativeSqlPlanner')) {
it('multi stage duplicated time shift over view and origin cube', async () => runQueryTest({
measures: [
'orders_view.cagr_1_y'
],
timeDimensions: [
{
dimension: 'orders_view.date',
dateRange: ['2022-01-01', '2022-10-31'],
},
],

timezone: 'America/Los_Angeles'
},

[{ orders_view__cagr_1_y: '-0.90000000000000000000' }]));
} else {
it.skip('member expression multi stage with time dimension segment', () => {
// Skipping because it works only in Tesseract
});
}
});
82 changes: 6 additions & 76 deletions rust/cubesqlplanner/cubesqlplanner/src/planner/base_measure.rs
Original file line number Diff line number Diff line change
@@ -1,76 +1,21 @@
use super::query_tools::QueryTools;
use super::sql_evaluator::{MemberExpressionSymbol, MemberSymbol, SqlCall};
use super::sql_evaluator::{MeasureTimeShift, MemberExpressionSymbol, MemberSymbol, SqlCall};
use super::{evaluate_with_context, BaseMember, BaseMemberHelper, VisitorContext};
use crate::cube_bridge::measure_definition::{
MeasureDefinition, RollingWindow, TimeShiftReference,
};
use crate::planner::sql_templates::PlanSqlTemplates;
use cubenativeutils::CubeError;
use lazy_static::lazy_static;
use regex::Regex;
use std::fmt::{Debug, Formatter};
use std::rc::Rc;

#[derive(Clone, Debug)]
pub struct MeasureTimeShift {
pub interval: String,
pub time_dimension: String,
}

lazy_static! {
static ref INTERVAL_MATCH_RE: Regex =
Regex::new(r"^(-?\d+) (second|minute|hour|day|week|month|quarter|year)s?$").unwrap();
}
impl MeasureTimeShift {
pub fn try_from_reference(reference: &TimeShiftReference) -> Result<Self, CubeError> {
let parsed_interval =
if let Some(captures) = INTERVAL_MATCH_RE.captures(&reference.interval) {
let duration = if let Some(duration) = captures.get(1) {
duration.as_str().parse::<i64>().ok()
} else {
None
};
let granularity = if let Some(granularity) = captures.get(2) {
Some(granularity.as_str().to_owned())
} else {
None
};
if let Some((duration, granularity)) = duration.zip(granularity) {
Some((duration, granularity))
} else {
None
}
} else {
None
};
if let Some((duration, granularity)) = parsed_interval {
let duration = if reference.shift_type.as_ref().unwrap_or(&format!("prior")) == "next" {
duration * (-1)
} else {
duration
};

Ok(Self {
interval: format!("{duration} {granularity}"),
time_dimension: reference.time_dimension.clone(),
})
} else {
Err(CubeError::user(format!(
"Invalid interval: {}",
reference.interval
)))
}
}
}

pub struct BaseMeasure {
measure: String,
query_tools: Rc<QueryTools>,
member_evaluator: Rc<MemberSymbol>,
definition: Option<Rc<dyn MeasureDefinition>>,
#[allow(dead_code)]
member_expression_definition: Option<String>,
time_shifts: Vec<MeasureTimeShift>,
cube_name: String,
name: String,
default_alias: String,
Expand All @@ -80,7 +25,6 @@ impl Debug for BaseMeasure {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BaseMeasure")
.field("measure", &self.measure)
.field("time_shifts", &self.time_shifts)
.field("default_alias", &self.default_alias)
.finish()
}
Expand Down Expand Up @@ -132,7 +76,6 @@ impl BaseMeasure {
) -> Result<Option<Rc<Self>>, CubeError> {
let res = match evaluation_node.as_ref() {
MemberSymbol::Measure(s) => {
let time_shifts = Self::parse_time_shifts(&s.definition())?;
let default_alias = BaseMemberHelper::default_alias(
&s.cube_name(),
&s.name(),
Expand All @@ -147,7 +90,6 @@ impl BaseMeasure {
member_expression_definition: None,
cube_name: s.cube_name().clone(),
name: s.name().clone(),
time_shifts,
default_alias,
}))
}
Expand All @@ -166,7 +108,6 @@ impl BaseMeasure {
name,
member_expression_definition,
default_alias,
time_shifts: vec![],
}))
}
_ => None,
Expand Down Expand Up @@ -212,7 +153,6 @@ impl BaseMeasure {
name,
member_expression_definition,
default_alias,
time_shifts: vec![],
}))
}

Expand All @@ -232,19 +172,6 @@ impl BaseMeasure {
Ok(res)
}

fn parse_time_shifts(
definition: &Rc<dyn MeasureDefinition>,
) -> Result<Vec<MeasureTimeShift>, CubeError> {
if let Some(time_shifts) = &definition.static_data().time_shift_references {
time_shifts
.iter()
.map(|t| MeasureTimeShift::try_from_reference(t))
.collect::<Result<Vec<_>, _>>()
} else {
Ok(vec![])
}
}

pub fn member_evaluator(&self) -> &Rc<MemberSymbol> {
&self.member_evaluator
}
Expand Down Expand Up @@ -289,8 +216,11 @@ impl BaseMeasure {
.map_or(None, |d| d.static_data().time_shift_references.clone())
}

pub fn time_shifts(&self) -> &Vec<MeasureTimeShift> {
&self.time_shifts
pub fn time_shifts(&self) -> Vec<MeasureTimeShift> {
match self.member_evaluator.as_ref() {
MemberSymbol::Measure(measure_symbol) => measure_symbol.time_shifts().clone(),
_ => vec![],
}
}

pub fn is_multi_stage(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::plan::{FilterGroup, FilterItem};
use crate::planner::filter::FilterOperator;
use crate::planner::planners::multi_stage::MultiStageTimeShift;
use crate::planner::sql_evaluator::MeasureTimeShift;
use crate::planner::{BaseDimension, BaseMember, BaseTimeDimension};
use cubenativeutils::CubeError;
use itertools::Itertools;
Expand All @@ -17,7 +17,7 @@ pub struct MultiStageAppliedState {
dimensions_filters: Vec<FilterItem>,
measures_filters: Vec<FilterItem>,
segments: Vec<FilterItem>,
time_shifts: HashMap<String, String>,
time_shifts: HashMap<String, MeasureTimeShift>,
}

impl MultiStageAppliedState {
Expand Down Expand Up @@ -62,14 +62,17 @@ impl MultiStageAppliedState {
.collect_vec();
}

pub fn add_time_shifts(&mut self, time_shifts: Vec<MultiStageTimeShift>) {
pub fn add_time_shifts(&mut self, time_shifts: Vec<MeasureTimeShift>) {
for ts in time_shifts.into_iter() {
self.time_shifts
.insert(ts.time_dimension.clone(), ts.interval.clone());
if let Some(exists) = self.time_shifts.get_mut(&ts.dimension.full_name()) {
exists.interval += ts.interval;
} else {
self.time_shifts.insert(ts.dimension.full_name(), ts);
}
}
}

pub fn time_shifts(&self) -> &HashMap<String, String> {
pub fn time_shifts(&self) -> &HashMap<String, MeasureTimeShift> {
&self.time_shifts
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,63 +1,7 @@
use crate::cube_bridge::measure_definition::TimeShiftReference;
use crate::planner::sql_evaluator::MemberSymbol;
use crate::planner::sql_evaluator::{MeasureTimeShift, MemberSymbol};
use crate::planner::BaseTimeDimension;
use cubenativeutils::CubeError;
use lazy_static::lazy_static;
use regex::Regex;
use std::rc::Rc;

#[derive(Clone, Debug)]
pub struct MultiStageTimeShift {
pub interval: String,
pub time_dimension: String,
}

lazy_static! {
static ref INTERVAL_MATCH_RE: Regex =
Regex::new(r"^(-?\d+) (second|minute|hour|day|week|month|quarter|year)s?$").unwrap();
}
impl MultiStageTimeShift {
pub fn try_from_reference(reference: &TimeShiftReference) -> Result<Self, CubeError> {
let parsed_interval =
if let Some(captures) = INTERVAL_MATCH_RE.captures(&reference.interval) {
let duration = if let Some(duration) = captures.get(1) {
duration.as_str().parse::<i64>().ok()
} else {
None
};
let granularity = if let Some(granularity) = captures.get(2) {
Some(granularity.as_str().to_owned())
} else {
None
};
if let Some((duration, granularity)) = duration.zip(granularity) {
Some((duration, granularity))
} else {
None
}
} else {
None
};
if let Some((duration, granularity)) = parsed_interval {
let duration = if reference.shift_type.as_ref().unwrap_or(&format!("prior")) == "next" {
duration * (-1)
} else {
duration
};

Ok(Self {
interval: format!("{duration} {granularity}"),
time_dimension: reference.time_dimension.clone(),
})
} else {
Err(CubeError::user(format!(
"Invalid interval: {}",
reference.interval
)))
}
}
}

#[derive(Clone)]
pub struct TimeSeriesDescription {
pub time_dimension: Rc<BaseTimeDimension>,
Expand Down Expand Up @@ -143,7 +87,7 @@ pub struct MultiStageInodeMember {
reduce_by: Vec<String>,
add_group_by: Vec<String>,
group_by: Option<Vec<String>>,
time_shifts: Vec<MultiStageTimeShift>,
time_shifts: Vec<MeasureTimeShift>,
}

impl MultiStageInodeMember {
Expand All @@ -152,7 +96,7 @@ impl MultiStageInodeMember {
reduce_by: Vec<String>,
add_group_by: Vec<String>,
group_by: Option<Vec<String>>,
time_shifts: Vec<MultiStageTimeShift>,
time_shifts: Vec<MeasureTimeShift>,
) -> Self {
Self {
inode_type,
Expand All @@ -179,7 +123,7 @@ impl MultiStageInodeMember {
&self.group_by
}

pub fn time_shifts(&self) -> &Vec<MultiStageTimeShift> {
pub fn time_shifts(&self) -> &Vec<MeasureTimeShift> {
&self.time_shifts
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{
MultiStageAppliedState, MultiStageInodeMember, MultiStageInodeMemberType,
MultiStageLeafMemberType, MultiStageMember, MultiStageMemberQueryPlanner, MultiStageMemberType,
MultiStageQueryDescription, MultiStageTimeShift, RollingWindowPlanner,
MultiStageQueryDescription, RollingWindowPlanner,
};
use crate::plan::{Cte, From, Schema, Select, SelectBuilder};
use crate::planner::query_tools::QueryTools;
Expand Down Expand Up @@ -124,15 +124,8 @@ impl MultiStageQueryPlanner {
MultiStageInodeMemberType::Calculate
};

let time_shifts = if let Some(refs) = measure.time_shift_references() {
let time_shifts = refs
.iter()
.map(|r| MultiStageTimeShift::try_from_reference(r))
.collect::<Result<Vec<_>, _>>()?;
time_shifts
} else {
vec![]
};
let time_shifts = measure.time_shifts();

let is_ungrupped = match &member_type {
MultiStageInodeMemberType::Rank | MultiStageInodeMemberType::Calculate => true,
_ => self.query_properties.ungrouped(),
Expand Down
Loading
Loading