Skip to content

Commit 20ed3e0

Browse files
authored
feat(sqlplanner): Base multi staging support (#8832)
1 parent 634521e commit 20ed3e0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+2235
-376
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,11 @@ export class BaseQuery {
614614
joinRoot: this.join.root,
615615
joinGraph: this.joinGraph,
616616
cubeEvaluator: this.cubeEvaluator,
617+
order: this.options.order,
617618
filters: this.options.filters,
619+
limit: this.options.limit ? this.options.limit.toString() : null,
620+
rowLimit: this.options.rowLimit ? this.options.rowLimit.toString() : null,
621+
offset: this.options.offset ? this.options.offset.toString() : null,
618622
baseTools: this,
619623

620624
};
@@ -1030,6 +1034,7 @@ export class BaseQuery {
10301034

10311035
const multipliedMeasures = measuresToRender(true, false)(measureToHierarchy);
10321036
const regularMeasures = measuresToRender(false, false)(measureToHierarchy);
1037+
10331038
const cumulativeMeasures =
10341039
R.pipe(
10351040
R.map(multiplied => R.xprod([multiplied], measuresToRender(multiplied, true)(measureToHierarchy))),
@@ -1518,7 +1523,7 @@ export class BaseQuery {
15181523
if (m.expressionName && !collectedMeasures.length && !m.isMemberExpression) {
15191524
throw new UserError(`Subquery dimension ${m.expressionName} should reference at least one measure`);
15201525
}
1521-
if (!collectedMeasures.length && m.isMemberExpression && m.query.allCubeNames.length > 1 && m.measureSql() === "COUNT(*)") {
1526+
if (!collectedMeasures.length && m.isMemberExpression && m.query.allCubeNames.length > 1 && m.measureSql() === 'COUNT(*)') {
15221527
const cubeName = m.expressionCubeName ? `\`${m.expressionCubeName}\` ` : '';
15231528
throw new UserError(`The query contains \`COUNT(*)\` expression but cube/view ${cubeName}is missing \`count\` measure`);
15241529
}
@@ -2438,7 +2443,6 @@ export class BaseQuery {
24382443
fn,
24392444
context
24402445
);
2441-
24422446
return context.joinHints;
24432447
}
24442448

packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ describe('SQL Generation', () => {
9797
type: 'prior',
9898
}]
9999
},
100-
cagr_1d: {
100+
cagr_day: {
101101
multi_stage: true,
102102
sql: \`ROUND(100 * \${revenue} / NULLIF(\${revenue_day_ago}, 0))\`,
103103
type: 'number',
@@ -601,7 +601,7 @@ describe('SQL Generation', () => {
601601
await compiler.compile();
602602
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, q);
603603

604-
console.log(query.buildSqlAndParams());
604+
// console.log(query.buildSqlAndParams());
605605

606606
const res = await dbRunner.testQuery(query.buildSqlAndParams());
607607
console.log(JSON.stringify(res));
@@ -836,7 +836,7 @@ describe('SQL Generation', () => {
836836
measures: [
837837
'visitors.revenue',
838838
'visitors.revenue_day_ago',
839-
'visitors.cagr_1d'
839+
'visitors.cagr_day'
840840
],
841841
timeDimensions: [{
842842
dimension: 'visitors.created_at',
@@ -848,8 +848,8 @@ describe('SQL Generation', () => {
848848
}],
849849
timezone: 'America/Los_Angeles'
850850
}, [
851-
{ visitors__created_at_day: '2017-01-05T00:00:00.000Z', visitors__cagr_1d: '150', visitors__revenue: '300', visitors__revenue_day_ago: '200' },
852-
{ visitors__created_at_day: '2017-01-06T00:00:00.000Z', visitors__cagr_1d: '300', visitors__revenue: '900', visitors__revenue_day_ago: '300' }
851+
{ visitors__created_at_day: '2017-01-05T00:00:00.000Z', visitors__cagr_day: '150', visitors__revenue: '300', visitors__revenue_day_ago: '200' },
852+
{ visitors__created_at_day: '2017-01-06T00:00:00.000Z', visitors__cagr_day: '300', visitors__revenue: '900', visitors__revenue_day_ago: '300' }
853853
]));
854854

855855
it('sql utils', async () => runQueryTest({

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@ pub struct FilterItem {
2929
pub values: Option<Vec<Option<String>>>,
3030
}
3131

32+
#[derive(Serialize, Deserialize, Debug)]
33+
pub struct OrderByItem {
34+
pub id: String,
35+
pub desc: Option<bool>,
36+
}
37+
38+
impl OrderByItem {
39+
pub fn is_desc(&self) -> bool {
40+
self.desc.unwrap_or(false)
41+
}
42+
}
43+
3244
impl FilterItem {
3345
pub fn member(&self) -> Option<&String> {
3446
self.member.as_ref().or(self.dimension.as_ref())
@@ -43,9 +55,11 @@ pub struct BaseQueryOptionsStatic {
4355
pub time_dimensions: Option<Vec<TimeDimension>>,
4456
pub timezone: Option<String>,
4557
pub filters: Option<Vec<FilterItem>>,
46-
#[serde(rename = "joinRoot")]
47-
pub join_root: Option<String>, //TODO temporaty. join graph should be rewrited in rust or taked
48-
//from Js CubeCompiller
58+
pub order: Option<Vec<OrderByItem>>,
59+
pub limit: Option<String>,
60+
#[serde(rename = "rowLimit")]
61+
pub row_limit: Option<String>,
62+
pub offset: Option<String>,
4963
}
5064

5165
#[nativebridge::native_bridge(BaseQueryOptionsStatic)]

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/dimension_definition.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use std::rc::Rc;
1313
pub struct DimenstionDefinitionStatic {
1414
#[serde(rename = "type")]
1515
pub dimension_type: String,
16+
#[serde(rename = "ownedByCube")]
1617
pub owned_by_cube: Option<bool>,
18+
#[serde(rename = "multiStage")]
19+
pub multi_stage: Option<bool>,
1720
}
1821

1922
#[nativebridge::native_bridge(DimenstionDefinitionStatic)]

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::cube_definition::{CubeDefinition, NativeCubeDefinition};
22
use super::measure_filter::{MeasureFiltersVec, NativeMeasureFiltersVec};
3+
use super::member_order_by::{MemberOrderByVec, NativeMemberOrderByVec};
34
use super::memeber_sql::{MemberSql, NativeMemberSql};
45
use cubenativeutils::wrappers::serializer::{
56
NativeDeserialize, NativeDeserializer, NativeSerialize,
@@ -11,11 +12,31 @@ use serde::{Deserialize, Serialize};
1112
use std::any::Any;
1213
use std::rc::Rc;
1314

15+
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
16+
pub struct TimeShiftReference {
17+
pub interval: String,
18+
#[serde(rename = "type")]
19+
pub shift_type: Option<String>,
20+
#[serde(rename = "timeDimension")]
21+
pub time_dimension: String,
22+
}
23+
1424
#[derive(Serialize, Deserialize, Debug)]
1525
pub struct MeasureDefinitionStatic {
1626
#[serde(rename = "type")]
1727
pub measure_type: String,
28+
#[serde(rename = "ownedByCube")]
1829
pub owned_by_cube: Option<bool>,
30+
#[serde(rename = "multiStage")]
31+
pub multi_stage: Option<bool>,
32+
#[serde(rename = "reduceByReferences")]
33+
pub reduce_by_references: Option<Vec<String>>,
34+
#[serde(rename = "addGroupByReferences")]
35+
pub add_group_by_references: Option<Vec<String>>,
36+
#[serde(rename = "groupByReferences")]
37+
pub group_by_references: Option<Vec<String>>,
38+
#[serde(rename = "timeShiftReferences")]
39+
pub time_shift_references: Option<Vec<TimeShiftReference>>,
1940
}
2041

2142
#[nativebridge::native_bridge(MeasureDefinitionStatic)]
@@ -29,4 +50,8 @@ pub trait MeasureDefinition {
2950
#[optional]
3051
#[field]
3152
fn filters(&self) -> Result<Option<Rc<dyn MeasureFiltersVec>>, CubeError>;
53+
54+
#[optional]
55+
#[field]
56+
fn order_by(&self) -> Result<Option<Rc<dyn MemberOrderByVec>>, CubeError>;
3257
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use super::memeber_sql::{MemberSql, NativeMemberSql};
2+
use cubenativeutils::wrappers::object::NativeArray;
3+
use cubenativeutils::wrappers::serializer::{
4+
NativeDeserialize, NativeDeserializer, NativeSerialize,
5+
};
6+
use cubenativeutils::wrappers::NativeContextHolder;
7+
use cubenativeutils::wrappers::NativeObjectHandle;
8+
use cubenativeutils::CubeError;
9+
use std::any::Any;
10+
use std::marker::PhantomData;
11+
use std::rc::Rc;
12+
13+
#[nativebridge::native_bridge]
14+
pub trait MemberOrderBy {
15+
#[field]
16+
fn sql(&self) -> Result<Rc<dyn MemberSql>, CubeError>;
17+
#[field]
18+
fn dir(&self) -> Result<String, CubeError>;
19+
}
20+
21+
pub trait MemberOrderByVec {
22+
fn items(&self) -> &Vec<Rc<dyn MemberOrderBy>>;
23+
}
24+
25+
pub struct NativeMemberOrderByVec<IT: InnerTypes> {
26+
items: Vec<Rc<dyn MemberOrderBy>>,
27+
phantom: PhantomData<IT>,
28+
}
29+
30+
impl<IT: InnerTypes> NativeMemberOrderByVec<IT> {
31+
pub fn try_new(native_items: NativeObjectHandle<IT>) -> Result<Self, CubeError> {
32+
let items = native_items
33+
.into_array()?
34+
.to_vec()?
35+
.into_iter()
36+
.map(|v| -> Result<Rc<dyn MemberOrderBy>, CubeError> {
37+
Ok(Rc::new(NativeMemberOrderBy::from_native(v)?))
38+
})
39+
.collect::<Result<Vec<_>, _>>()?;
40+
Ok(Self {
41+
items,
42+
phantom: PhantomData::default(),
43+
})
44+
}
45+
}
46+
47+
impl<IT: InnerTypes> MemberOrderByVec for NativeMemberOrderByVec<IT> {
48+
fn items(&self) -> &Vec<Rc<dyn MemberOrderBy>> {
49+
&self.items
50+
}
51+
}
52+
53+
impl<IT: InnerTypes> NativeDeserialize<IT> for NativeMemberOrderByVec<IT> {
54+
fn from_native(v: NativeObjectHandle<IT>) -> Result<Self, CubeError> {
55+
Self::try_new(v)
56+
}
57+
}

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod join_item_definition;
1212
pub mod measure_definition;
1313
pub mod measure_filter;
1414
pub mod member_definition;
15+
pub mod member_order_by;
1516
pub mod memeber_sql;
1617
pub mod security_context;
1718
pub mod sql_templates_render;

rust/cubesqlplanner/cubesqlplanner/src/plan/expression.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,25 @@ use crate::planner::{BaseMember, VisitorContext};
22
use cubenativeutils::CubeError;
33
use std::rc::Rc;
44

5+
#[derive(Clone)]
56
pub enum Expr {
67
Field(Rc<dyn BaseMember>),
7-
Reference(String, String),
8+
Reference(Option<String>, String),
9+
Asterix,
810
}
911

1012
impl Expr {
1113
pub fn to_sql(&self, context: Rc<VisitorContext>) -> Result<String, CubeError> {
1214
match self {
1315
Expr::Field(field) => field.to_sql(context),
1416
Expr::Reference(cube_alias, field_alias) => {
15-
Ok(format!("{}.{}", cube_alias, field_alias))
17+
if let Some(cube_alias) = cube_alias {
18+
Ok(format!("{}.{}", cube_alias, field_alias))
19+
} else {
20+
Ok(field_alias.clone())
21+
}
1622
}
23+
Expr::Asterix => Ok("*".to_string()),
1724
}
1825
}
1926
}

rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@ use cubenativeutils::CubeError;
44
use std::fmt;
55
use std::rc::Rc;
66

7+
#[derive(Clone)]
78
pub enum FilterGroupOperator {
89
Or,
910
And,
1011
}
1112

13+
#[derive(Clone)]
1214
pub struct FilterGroup {
13-
operator: FilterGroupOperator,
14-
items: Vec<FilterItem>,
15+
pub operator: FilterGroupOperator,
16+
pub items: Vec<FilterItem>,
1517
}
1618

1719
impl FilterGroup {
@@ -49,7 +51,11 @@ impl FilterItem {
4951
.iter()
5052
.map(|itm| itm.to_sql(context.clone()))
5153
.collect::<Result<Vec<_>, _>>()?;
52-
format!("({})", items_sql.join(&operator))
54+
if items_sql.is_empty() {
55+
format!("( 1 = 1 )")
56+
} else {
57+
format!("({})", items_sql.join(&operator))
58+
}
5359
}
5460
FilterItem::Item(item) => {
5561
let sql = item.to_sql(context.clone())?;

rust/cubesqlplanner/cubesqlplanner/src/plan/from.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use super::Join;
2-
use super::QueryPlan;
1+
use super::{Join, QueryPlan, Subquery};
32
use crate::planner::{BaseCube, VisitorContext};
43
use cubenativeutils::CubeError;
54
use std::rc::Rc;
@@ -9,7 +8,8 @@ pub enum FromSource {
98
Empty,
109
Cube(Rc<BaseCube>),
1110
Join(Rc<Join>),
12-
Subquery(Rc<QueryPlan>, String),
11+
Subquery(Subquery),
12+
TableReference(String, Option<String>),
1313
}
1414

1515
#[derive(Clone)]
@@ -26,12 +26,16 @@ impl From {
2626
Self::new(FromSource::Cube(cube))
2727
}
2828

29+
pub fn new_from_table_reference(reference: String, alias: Option<String>) -> Self {
30+
Self::new(FromSource::TableReference(reference, alias))
31+
}
32+
2933
pub fn new_from_join(join: Rc<Join>) -> Self {
3034
Self::new(FromSource::Join(join))
3135
}
3236

3337
pub fn new_from_subquery(plan: Rc<QueryPlan>, alias: String) -> Self {
34-
Self::new(FromSource::Subquery(plan, alias))
38+
Self::new(FromSource::Subquery(Subquery::new(plan, alias)))
3539
}
3640

3741
pub fn to_sql(&self, context: Rc<VisitorContext>) -> Result<String, CubeError> {
@@ -44,8 +48,13 @@ impl From {
4448
FromSource::Join(j) => {
4549
format!("{}", j.to_sql(context.clone())?)
4650
}
47-
FromSource::Subquery(s, alias) => {
48-
format!("({}) AS {}", s.to_sql()?, alias)
51+
FromSource::Subquery(s) => s.to_sql()?,
52+
FromSource::TableReference(r, alias) => {
53+
if let Some(alias) = alias {
54+
format!(" {} as {} ", r, alias)
55+
} else {
56+
format!(" {} ", r)
57+
}
4958
}
5059
};
5160
Ok(sql)

0 commit comments

Comments
 (0)