Skip to content

feat(tesseract): Full key aggregate and logical plan refactoring #9807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -4108,7 +4108,7 @@ export class BaseQuery {
VAR_SAMP: 'VAR_SAMP({{ args_concat }})',
COVAR_POP: 'COVAR_POP({{ args_concat }})',
COVAR_SAMP: 'COVAR_SAMP({{ args_concat }})',

GROUP_ANY: 'max({{ expr }})',
COALESCE: 'COALESCE({{ args_concat }})',
CONCAT: 'CONCAT({{ args_concat }})',
FLOOR: 'FLOOR({{ args_concat }})',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ describe('PreAggregations', () => {
});
`);

it('simple pre-aggregation', async () => {
it('simple pre-aggregation 1', async () => {
await compiler.compile();

const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
Expand Down Expand Up @@ -2194,7 +2194,7 @@ describe('PreAggregations', () => {
});
});

if (getEnv('nativeSqlPlanner') && getEnv('nativeSqlPlannerPreAggregations')) {
if (true) { // getEnv('nativeSqlPlanner') && getEnv('nativeSqlPlannerPreAggregations')) {
it('rollup lambda', async () => {
await compiler.compile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ SELECT 1 AS revenue, cast('2024-01-01' AS timestamp) as time UNION ALL
})
`);

it('simple join', async () => {
it('simple join 1', async () => {
await compiler.compile();

console.log(joinGraph.buildJoin(['visitor_checkins', 'visitors']));
Expand Down Expand Up @@ -2539,7 +2539,11 @@ SELECT 1 AS revenue, cast('2024-01-01' AS timestamp) as time UNION ALL
filters: [],
order: [{
id: 'visitor_checkins.id'
}],
},
{
id: 'visitor_checkins.created_at'
}
],
ungrouped: true
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ pub struct CubeDefinitionStatic {
pub is_calendar: Option<bool>,
}

impl CubeDefinitionStatic {
pub fn resolved_alias(&self) -> &String {
if let Some(alias) = &self.sql_alias {
alias
} else {
&self.name
}
}
}

#[nativebridge::native_bridge(CubeDefinitionStatic)]
pub trait CubeDefinition {
#[nbridge(field, optional)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,120 @@
use super::pretty_print::*;
use super::*;
use cubenativeutils::CubeError;
use std::rc::Rc;

pub enum AggregateMultipliedSubquerySouce {
Cube,
Cube(Rc<Cube>),
MeasureSubquery(Rc<MeasureSubquery>),
}

impl AggregateMultipliedSubquerySouce {
fn as_plan_node(&self) -> PlanNode {
match self {
Self::Cube(item) => item.as_plan_node(),
Self::MeasureSubquery(item) => item.as_plan_node(),
}
}
fn with_plan_node(&self, plan_node: PlanNode) -> Result<Self, CubeError> {
Ok(match self {
Self::Cube(_) => Self::Cube(plan_node.into_logical_node()?),
Self::MeasureSubquery(_) => Self::MeasureSubquery(plan_node.into_logical_node()?),
})
}
}

pub struct AggregateMultipliedSubquery {
pub schema: Rc<LogicalSchema>,
pub dimension_subqueries: Vec<Rc<DimensionSubQuery>>,
pub keys_subquery: Rc<KeysSubQuery>,
pub pk_cube: Rc<Cube>, //FIXME may be duplication with information in keys_subquery
pub source: Rc<AggregateMultipliedSubquerySouce>,
pub source: AggregateMultipliedSubquerySouce,
pub dimension_subqueries: Vec<Rc<DimensionSubQuery>>,
}

impl LogicalNode for AggregateMultipliedSubquery {
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
PlanNode::AggregateMultipliedSubquery(self.clone())
}

fn inputs(&self) -> Vec<PlanNode> {
AggregateMultipliedSubqueryInputPacker::pack(self)
}

fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError> {
let AggregateMultipliedSubqueryInputUnPacker {
keys_subquery,
source,
dimension_subqueries,
} = AggregateMultipliedSubqueryInputUnPacker::new(&self, &inputs)?;

let result = Self {
schema: self.schema.clone(),
keys_subquery: keys_subquery.clone().into_logical_node()?,
source: self.source.with_plan_node(source.clone())?,
dimension_subqueries: dimension_subqueries
.iter()
.map(|itm| itm.clone().into_logical_node())
.collect::<Result<Vec<_>, _>>()?,
};

Ok(Rc::new(result))
}

fn node_name(&self) -> &'static str {
"AggregateMultipliedSubquery"
}
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
if let PlanNode::AggregateMultipliedSubquery(item) = plan_node {
Ok(item)
} else {
Err(cast_error(&plan_node, "AggregateMultipliedSubquery"))
}
}
}

pub struct AggregateMultipliedSubqueryInputPacker;

impl AggregateMultipliedSubqueryInputPacker {
pub fn pack(aggregate: &AggregateMultipliedSubquery) -> Vec<PlanNode> {
let mut result = vec![];
result.push(aggregate.keys_subquery.as_plan_node());
result.push(aggregate.source.as_plan_node());
result.extend(
aggregate
.dimension_subqueries
.iter()
.map(|itm| itm.as_plan_node()),
);
result
}
}

pub struct AggregateMultipliedSubqueryInputUnPacker<'a> {
keys_subquery: &'a PlanNode,
source: &'a PlanNode,
dimension_subqueries: &'a [PlanNode],
}

impl<'a> AggregateMultipliedSubqueryInputUnPacker<'a> {
pub fn new(
aggregate: &AggregateMultipliedSubquery,
inputs: &'a Vec<PlanNode>,
) -> Result<Self, CubeError> {
check_inputs_len(&inputs, Self::inputs_len(aggregate), aggregate.node_name())?;

let keys_subquery = &inputs[0];
let source = &inputs[1];
let dimension_subqueries = &inputs[2..];

Ok(Self {
keys_subquery,
source,
dimension_subqueries,
})
}

fn inputs_len(aggregate: &AggregateMultipliedSubquery) -> usize {
2 + aggregate.dimension_subqueries.len()
}
}

impl PrettyPrint for AggregateMultipliedSubquery {
Expand All @@ -22,28 +124,25 @@ impl PrettyPrint for AggregateMultipliedSubquery {
let details_state = state.new_level();
result.println("schema:", &state);
self.schema.pretty_print(result, &details_state);
if !self.dimension_subqueries.is_empty() {
result.println("dimension_subqueries:", &state);
for subquery in self.dimension_subqueries.iter() {
subquery.pretty_print(result, &details_state);
}
}
result.println("keys_subquery:", &state);
self.keys_subquery.pretty_print(result, &details_state);
result.println("source:", &state);
match self.source.as_ref() {
AggregateMultipliedSubquerySouce::Cube => {
match &self.source {
AggregateMultipliedSubquerySouce::Cube(cube) => {
result.println("Cube:", &details_state);
self.pk_cube
.pretty_print(result, &details_state.new_level());
cube.pretty_print(result, &details_state.new_level());
}
AggregateMultipliedSubquerySouce::MeasureSubquery(measure_subquery) => {
result.println(
&format!("MeasureSubquery: {}", measure_subquery.measures.len()),
&details_state,
);
result.println(&format!("MeasureSubquery: "), &details_state);
measure_subquery.pretty_print(result, &details_state);
}
}
if !self.dimension_subqueries.is_empty() {
result.println("dimension_subqueries:", &state);
let details_state = state.new_level();
for subquery in self.dimension_subqueries.iter() {
subquery.pretty_print(result, &details_state);
}
}
}
}
27 changes: 27 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;
use crate::planner::BaseCube;
use cubenativeutils::CubeError;
use std::rc::Rc;

#[derive(Clone)]
Expand Down Expand Up @@ -49,3 +50,29 @@ impl Cube {
})
}
}

impl LogicalNode for Cube {
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
PlanNode::Cube(self.clone())
}

fn inputs(&self) -> Vec<PlanNode> {
vec![] // Cube has no inputs
}

fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError> {
check_inputs_len(&inputs, 0, self.node_name())?;
Ok(self)
}

fn node_name(&self) -> &'static str {
"Cube"
}
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
if let PlanNode::Cube(item) = plan_node {
Ok(item)
} else {
Err(cast_error(&plan_node, "Cube"))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::pretty_print::*;
use super::*;
use crate::planner::sql_evaluator::MemberSymbol;
use cubenativeutils::CubeError;
use std::rc::Rc;

pub struct DimensionSubQuery {
Expand All @@ -10,6 +11,38 @@ pub struct DimensionSubQuery {
pub measure_for_subquery_dimension: Rc<MemberSymbol>,
}

impl LogicalNode for DimensionSubQuery {
fn as_plan_node(self: &Rc<Self>) -> PlanNode {
PlanNode::DimensionSubQuery(self.clone())
}

fn inputs(&self) -> Vec<PlanNode> {
vec![self.query.as_plan_node()]
}

fn with_inputs(self: Rc<Self>, inputs: Vec<PlanNode>) -> Result<Rc<Self>, CubeError> {
check_inputs_len(&inputs, 1, self.node_name())?;
let query = &inputs[0];
Ok(Rc::new(Self {
query: query.clone().into_logical_node()?,
primary_keys_dimensions: self.primary_keys_dimensions.clone(),
subquery_dimension: self.subquery_dimension.clone(),
measure_for_subquery_dimension: self.measure_for_subquery_dimension.clone(),
}))
}

fn node_name(&self) -> &'static str {
"DimensionSubQuery"
}
fn try_from_plan_node(plan_node: PlanNode) -> Result<Rc<Self>, CubeError> {
if let PlanNode::DimensionSubQuery(query) = plan_node {
Ok(query)
} else {
Err(cast_error(&plan_node, "DimensionSubQuery"))
}
}
}

impl PrettyPrint for DimensionSubQuery {
fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) {
result.println("DimensionSubQuery: ", state);
Expand Down
9 changes: 9 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/logical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ impl LogicalFilter {
Some(Filter { items })
}
}
pub fn measures_filter(&self) -> Option<Filter> {
if self.measures_filter.is_empty() {
None
} else {
Some(Filter {
items: self.measures_filter.clone(),
})
}
}
}

impl PrettyPrint for LogicalFilter {
Expand Down
Loading
Loading