Skip to content

Commit 8e62f10

Browse files
committed
multi stage rolling windows
1 parent dca9d08 commit 8e62f10

File tree

3 files changed

+51
-29
lines changed

3 files changed

+51
-29
lines changed

packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ describe('SQL Generation', () => {
155155
revenue_qtd_proxy: {
156156
type: 'sum',
157157
sql: \`\${revenue}\`,
158+
multi_stage: true,
158159
rollingWindow: {
159160
type: 'to_date',
160161
granularity: 'quarter'
@@ -173,6 +174,7 @@ describe('SQL Generation', () => {
173174
revenueRollingDayAgo: {
174175
type: 'sum',
175176
sql: \`\${revenue_day_ago}\`,
177+
multi_stage: true,
176178
rollingWindow: {
177179
trailing: '2 day',
178180
offset: 'start'

rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,24 @@ impl MultiStageQueryPlanner {
281281
granularity: None,
282282
}
283283
};
284+
285+
if !measure.is_multi_stage() {
286+
let childs = member_childs(&member, true)?;
287+
let measures = childs
288+
.iter()
289+
.filter(|s| s.as_measure().is_ok())
290+
.collect_vec();
291+
if !measures.is_empty() {
292+
return Err(CubeError::user(
293+
format!("Measure {} and references another measures ({}). In this case, {} must have multi_stage: true defined",
294+
member.full_name(),
295+
measures.into_iter().map(|m| m.full_name()).join(", "),
296+
member.full_name(),
297+
),
298+
));
299+
}
300+
}
301+
284302
let ungrouped = measure.is_rolling_window() && !measure.is_addictive();
285303

286304
let mut time_dimensions = self.query_properties.time_dimensions().clone();
@@ -323,14 +341,12 @@ impl MultiStageQueryPlanner {
323341
&rolling_window,
324342
state.clone(),
325343
)?;
326-
let base_member = MemberSymbol::new_measure(
327-
measure.new_unrolling(member.get_dependencies().is_empty()),
328-
);
344+
let base_member = MemberSymbol::new_measure(measure.new_unrolling());
329345

330346
let time_series =
331347
self.add_time_series(time_dimension.clone(), state.clone(), descriptions)?;
332348

333-
let rolling_base = if base_member.get_dependencies().is_empty() {
349+
let rolling_base = if !measure.is_multi_stage() {
334350
self.add_rolling_window_base(
335351
base_member,
336352
base_rolling_state,

rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -121,32 +121,36 @@ impl MeasureSymbol {
121121
})
122122
}
123123

124-
pub fn new_unrolling(&self, keep_type: bool) -> Rc<Self> {
125-
let measure_type = if keep_type {
126-
self.measure_type.clone()
124+
pub fn new_unrolling(&self) -> Rc<Self> {
125+
if self.is_rolling_window() {
126+
let measure_type = if self.is_multi_stage {
127+
format!("number")
128+
} else {
129+
self.measure_type.clone()
130+
};
131+
Rc::new(Self {
132+
cube_name: self.cube_name.clone(),
133+
name: self.name.clone(),
134+
owned_by_cube: self.owned_by_cube,
135+
measure_type,
136+
rolling_window: None,
137+
is_multi_stage: false,
138+
is_reference: false,
139+
is_view: self.is_view,
140+
measure_filters: self.measure_filters.clone(),
141+
measure_drill_filters: self.measure_drill_filters.clone(),
142+
time_shift: self.time_shift.clone(),
143+
measure_order_by: self.measure_order_by.clone(),
144+
reduce_by: self.reduce_by.clone(),
145+
add_group_by: self.add_group_by.clone(),
146+
group_by: self.group_by.clone(),
147+
member_sql: self.member_sql.clone(),
148+
pk_sqls: self.pk_sqls.clone(),
149+
is_splitted_source: self.is_splitted_source,
150+
})
127151
} else {
128-
format!("number")
129-
};
130-
Rc::new(Self {
131-
cube_name: self.cube_name.clone(),
132-
name: self.name.clone(),
133-
owned_by_cube: self.owned_by_cube,
134-
measure_type,
135-
rolling_window: None,
136-
is_multi_stage: self.is_multi_stage,
137-
is_reference: self.is_reference,
138-
is_view: self.is_view,
139-
measure_filters: self.measure_filters.clone(),
140-
measure_drill_filters: self.measure_drill_filters.clone(),
141-
time_shift: self.time_shift.clone(),
142-
measure_order_by: self.measure_order_by.clone(),
143-
reduce_by: self.reduce_by.clone(),
144-
add_group_by: self.add_group_by.clone(),
145-
group_by: self.group_by.clone(),
146-
member_sql: self.member_sql.clone(),
147-
pk_sqls: self.pk_sqls.clone(),
148-
is_splitted_source: self.is_splitted_source,
149-
})
152+
Rc::new(self.clone())
153+
}
150154
}
151155

152156
pub fn new_patched(

0 commit comments

Comments
 (0)