Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ cubes:
type: string
add_group_by: [orders.customerId]

- name: changeTypeComplexWithJoin
sql: >
CASE
WHEN {revenueYearAgo} IS NULL THEN 'New'
WHEN {revenue} > {revenueYearAgo} THEN 'Grow'
ELSE 'Down'
END
multi_stage: true
type: string
add_group_by: [first_date.customerId]

- name: changeTypeConcat
sql: "CONCAT({changeTypeComplex}, '-test')"
type: string


measures:
- name: count
Expand Down Expand Up @@ -111,6 +126,45 @@ cubes:
END
type: string

- name: first_date
sql: >
SELECT 1 AS id, '2023-03-01T00:00:00Z'::timestamptz AS createdAt, 1 AS customerId UNION ALL
SELECT 8 AS id, '2023-09-01T00:00:00Z'::timestamptz AS createdAt, 2 AS customerId UNION ALL
SELECT 16 AS id, '2024-09-01T00:00:00Z'::timestamptz AS createdAt, 3 AS customerId UNION ALL
SELECT 23 AS id, '2025-03-01T00:00:00Z'::timestamptz AS createdAt, 4 AS customerId UNION ALL
SELECT 29 AS id, '2025-03-01T00:00:00Z'::timestamptz AS createdAt, 5 AS customerId UNION ALL
SELECT 36 AS id, '2025-09-01T00:00:00Z'::timestamptz AS createdAt, 6 AS customerId

joins:
- name: orders
sql: "{first_date.customerId} = {orders.customerId}"
relationship: one_to_many

dimensions:
- name: customerId
sql: customerId
type: number

- name: createdAt
sql: createdAt
type: time

- name: customerType
sql: >
CASE
WHEN {orders.revenue} < 10000 THEN 'Low'
WHEN {orders.revenue} < 20000 THEN 'Medium'
ELSE 'Top'
END
multi_stage: true
type: string
add_group_by: [first_date.customerId]

- name: customerTypeConcat
sql: "CONCAT('Customer type: ', {customerType})"
multi_stage: true
type: string
add_group_by: [first_date.customerId]


`);
Expand Down Expand Up @@ -242,6 +296,104 @@ cubes:
},
],
{ joinGraph, cubeEvaluator, compiler }));
it('bucketing with dimension over complex dimension', async () => dbRunner.runQueryTest({
dimensions: ['orders.changeTypeConcat'],
measures: ['orders.revenue', 'orders.revenueYearAgo'],
timeDimensions: [
{
dimension: 'orders.createdAt',
granularity: 'year',
dateRange: ['2024-01-02T00:00:00', '2026-01-01T00:00:00']
}
],
timezone: 'UTC',
order: [{
id: 'orders.changeTypeConcat'
}, { id: 'orders.createdAt' }],
},
[
{
orders__change_type_concat: 'Down-test',
orders__created_at_year: '2024-01-01T00:00:00.000Z',
orders__revenue: '20400',
orders__revenue_year_ago: '22800'
},
{
orders__change_type_concat: 'Down-test',
orders__created_at_year: '2025-01-01T00:00:00.000Z',
orders__revenue: '17800',
orders__revenue_year_ago: '20400'
},
{
orders__change_type_concat: 'Grow-test',
orders__created_at_year: '2024-01-01T00:00:00.000Z',
orders__revenue: '11700',
orders__revenue_year_ago: '9400'
},
{
orders__change_type_concat: 'Grow-test',
orders__created_at_year: '2025-01-01T00:00:00.000Z',
orders__revenue: '14100',
orders__revenue_year_ago: '11700'
},
],
{ joinGraph, cubeEvaluator, compiler }));
it('bucketing with join and bucket dimension', async () => dbRunner.runQueryTest({
dimensions: ['orders.changeTypeComplexWithJoin'],
measures: ['orders.revenue', 'orders.revenueYearAgo'],
timeDimensions: [
{
dimension: 'orders.createdAt',
granularity: 'year',
dateRange: ['2024-01-02T00:00:00', '2026-01-01T00:00:00']
}
],
timezone: 'UTC',
order: [{
id: 'orders.changeTypeComplexWithJoin'
}, { id: 'orders.createdAt' }],
},
[
{
orders__change_type_complex_with_join: 'Down',
orders__created_at_year: '2024-01-01T00:00:00.000Z',
orders__revenue: '20400',
orders__revenue_year_ago: '22800'
},
{
orders__change_type_complex_with_join: 'Down',
orders__created_at_year: '2025-01-01T00:00:00.000Z',
orders__revenue: '17800',
orders__revenue_year_ago: '20400'
},
{
orders__change_type_complex_with_join: 'Grow',
orders__created_at_year: '2024-01-01T00:00:00.000Z',
orders__revenue: '11700',
orders__revenue_year_ago: '9400'
},
{
orders__change_type_complex_with_join: 'Grow',
orders__created_at_year: '2025-01-01T00:00:00.000Z',
orders__revenue: '14100',
orders__revenue_year_ago: '11700'
},
],
{ joinGraph, cubeEvaluator, compiler }));
it('bucketing dim reference other cube measure', async () => dbRunner.runQueryTest({
dimensions: ['first_date.customerType'],
measures: ['orders.revenue'],
timezone: 'UTC',
order: [{
id: 'first_date.customerType'
}],
},
[
{ first_date__customer_type: 'Low', orders__revenue: '8100' },
{ first_date__customer_type: 'Medium', orders__revenue: '41700' },
{ first_date__customer_type: 'Top', orders__revenue: '46400' }
],
{ joinGraph, cubeEvaluator, compiler }));
} else {
// This test is working only in tesseract
test.skip('multi stage over sub query', () => { expect(1).toBe(1); });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ impl FullKeyAggregate {
pub fn multi_stage_subquery_refs(&self) -> &Vec<Rc<MultiStageSubqueryRef>> {
&self.multi_stage_subquery_refs
}

pub fn is_empty(&self) -> bool {
self.multi_stage_subquery_refs.is_empty() && self.multiplied_measures_resolver.is_none()
}
}

impl LogicalNode for FullKeyAggregate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct QueryProcessor<'a> {
impl QueryProcessor<'_> {
fn is_over_full_aggregated_source(&self, logical_plan: &Query) -> bool {
match logical_plan.source() {
QuerySource::FullKeyAggregate(_) => true,
QuerySource::FullKeyAggregate(fk) => !fk.is_empty(),
QuerySource::PreAggregation(_) => false,
QuerySource::LogicalJoin(_) => false,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,20 @@ impl MultiStageMemberQueryPlanner {
MemberSymbol::Measure(_) => measures.push(cte_member.clone()),
_ => {}
}
// We add all non–multi-stage dimensions from the underlying states because
// they’re needed to join a multi-stage dimension into the measure query
let (all_dependend_dimensions, all_dependend_time_dimensions) =
self.description.collect_all_non_multi_stage_dimension()?;
dimensions.extend(all_dependend_dimensions.iter().cloned());
time_dimensions.extend(all_dependend_time_dimensions.iter().cloned());
dimensions = dimensions
.into_iter()
.unique_by(|d| d.full_name())
.collect_vec();
time_dimensions = time_dimensions
.into_iter()
.unique_by(|d| d.full_name())
.collect_vec();

let schema = LogicalSchema::default()
.set_dimensions(dimensions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl MultiStageQueryPlanner {
descriptions,
resolved_multi_stage_dimensions,
)?;
if !description.is_multi_stage_dimension() {
if !description.is_multi_stage_dimension() || member.as_dimension().is_ok() {
result.push(description);
}
}
Expand Down Expand Up @@ -403,17 +403,6 @@ impl MultiStageQueryPlanner {
resolved_multi_stage_dimensions,
)?;

// Add GROUP BY to the dimension subquery itself
// if a multi-stage dimension has the `add_group_by` field.
let self_state =
if !multi_stage_member.add_group_by_symbols().is_empty() && member.is_dimension() {
let mut self_state = state.clone_state();
self_state.add_dimensions(multi_stage_member.add_group_by_symbols().clone());
Rc::new(self_state)
} else {
state.clone()
};

let alias = format!("cte_{}", descriptions.len());
MultiStageQueryDescription::new(
MultiStageMember::new(
Expand All @@ -422,7 +411,7 @@ impl MultiStageQueryPlanner {
is_ungrupped,
false,
),
self_state,
state.clone(),
input,
alias.clone(),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use super::{MultiStageAppliedState, MultiStageMember};
use crate::logical_plan::LogicalSchema;
use crate::planner::sql_evaluator::collectors::has_multi_stage_members;

Check warning on line 3 in rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs

View workflow job for this annotation

GitHub Actions / Check fmt/clippy

unused import: `crate::planner::sql_evaluator::collectors::has_multi_stage_members`

Check warning on line 3 in rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs

View workflow job for this annotation

GitHub Actions / Check fmt/clippy

unused import: `crate::planner::sql_evaluator::collectors::has_multi_stage_members`

Check warning on line 3 in rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs

View workflow job for this annotation

GitHub Actions / Build native Linux 22 x86_64-unknown-linux-gnu Python fallback

unused import: `crate::planner::sql_evaluator::collectors::has_multi_stage_members`

Check warning on line 3 in rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/query_description.rs

View workflow job for this annotation

GitHub Actions / Build native linux (Python: 3.11)

unused import: `crate::planner::sql_evaluator::collectors::has_multi_stage_members`
use crate::planner::sql_evaluator::MemberSymbol;
use cubenativeutils::CubeError;
use itertools::Itertools;
use std::fmt::Debug;
use std::rc::Rc;

Expand Down Expand Up @@ -79,6 +82,56 @@
self.input.is_empty()
}

pub fn collect_all_non_multi_stage_dimension(
&self,
) -> Result<(Vec<Rc<MemberSymbol>>, Vec<Rc<MemberSymbol>>), CubeError> {
let mut dimensions = vec![];
let mut time_dimensions = vec![];
self.collect_all_non_multi_stage_dimension_impl(&mut dimensions, &mut time_dimensions);
let dimensions = dimensions
.into_iter()
.unique_by(|d| d.full_name())
.filter_map(|d| match d.is_basic_dimension() {
Ok(res) => {
if res {
None
} else {
Some(Ok(d))
}
}
Err(e) => Some(Err(e)),
})
.collect::<Result<Vec<_>, _>>()?;

let time_dimensions = time_dimensions
.into_iter()
.unique_by(|d| d.full_name())
.filter_map(|d| match d.is_basic_dimension() {
Ok(res) => {
if res {
None
} else {
Some(Ok(d))
}
}
Err(e) => Some(Err(e)),
})
.collect::<Result<Vec<_>, _>>()?;
Ok((dimensions, time_dimensions))
}

fn collect_all_non_multi_stage_dimension_impl(
&self,
dimensions: &mut Vec<Rc<MemberSymbol>>,
time_dimensions: &mut Vec<Rc<MemberSymbol>>,
) {
dimensions.extend(self.state.dimensions_symbols().iter().cloned());
time_dimensions.extend(self.state.time_dimensions_symbols().iter().cloned());
for child in self.input.iter() {
child.collect_all_non_multi_stage_dimension_impl(dimensions, time_dimensions);
}
}

pub fn is_match_member_and_state(
&self,
member_node: &Rc<MemberSymbol>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@ impl TraversalVisitor for JoinHintsCollector {
_: &Self::State,
) -> Result<Option<Self::State>, CubeError> {
if node.is_multi_stage() {
//We don't add multi-stage members childs to join hints
if let Ok(dim) = node.as_dimension() {
if let Some(add_group_by) = dim.add_group_by() {
for item in add_group_by.iter() {
self.apply(item, &())?;
}
for (dep, path) in dim.get_dependencies_with_path().into_iter() {
if let Ok(dim) = dep.as_dimension() {
if dim.is_multi_stage() {
self.on_node_traverse(&dep, &path, &())?;
}
}
}
}
}
return Ok(None);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,6 @@ impl DimensionSymbol {
if let Some(member_sql) = &self.longitude {
member_sql.extract_symbol_deps(&mut deps);
}
if let Some(add_group_by) = &self.add_group_by {
for member_sql in add_group_by {
deps.extend(member_sql.get_dependencies().into_iter());
}
}
if let Some(case) = &self.case {
case.extract_symbol_deps(&mut deps);
}
Expand All @@ -263,11 +258,6 @@ impl DimensionSymbol {
if let Some(case) = &self.case {
case.extract_symbol_deps_with_path(&mut deps);
}
if let Some(add_group_by) = &self.add_group_by {
for member_sql in add_group_by {
deps.extend(member_sql.get_dependencies_with_path().into_iter());
}
}
deps
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use cubenativeutils::CubeError;

use crate::planner::sql_evaluator::collectors::has_multi_stage_members;
use crate::planner::sql_evaluator::Case;

use super::{
Expand Down Expand Up @@ -311,6 +312,14 @@ impl MemberSymbol {
}
}

pub fn is_basic_dimension(self: &Rc<Self>) -> Result<bool, CubeError> {
if self.as_dimension().is_ok() {
has_multi_stage_members(self, true)
} else {
Ok(false)
}
}

pub fn is_leaf(&self) -> bool {
self.get_dependencies().is_empty()
}
Expand Down
Loading