Skip to content

Commit 13be19e

Browse files
committed
feat(tesseract): Support of rolling window (including to_date) pre-aggregations
1 parent 174e6c2 commit 13be19e

File tree

24 files changed

+267
-284
lines changed

24 files changed

+267
-284
lines changed

packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,11 @@ export class BaseQuery {
331331
this.customSubQueryJoins = this.options.subqueryJoins ?? [];
332332
this.useNativeSqlPlanner = this.options.useNativeSqlPlanner ?? getEnv('nativeSqlPlanner');
333333
this.canUseNativeSqlPlannerPreAggregation = true;
334-
/* if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
335-
const hasMultiStageMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true }).multiStageMembers.length > 0;
336-
this.canUseNativeSqlPlannerPreAggregation = hasMultiStageMeasures;
337-
} */
334+
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
335+
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });
336+
337+
this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0 || fullAggregateMeasures.cumulativeMeasures.length > 0;
338+
}
338339
this.queryLevelJoinHints = this.options.joinHints ?? [];
339340
this.prebuildJoin();
340341

@@ -776,7 +777,7 @@ export class BaseQuery {
776777
}
777778

778779
driverTools(external) {
779-
if (external && this.options.disableExternalPreAggregations && this.externalQueryClass) {
780+
if (external && !this.options.disableExternalPreAggregations && this.externalQueryClass) {
780781
return this.externalQuery();
781782
}
782783
return this;

packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,4 +334,13 @@ export class CubeStoreQuery extends BaseQuery {
334334
}
335335
);
336336
}
337+
338+
public sqlTemplates() {
339+
const templates = super.sqlTemplates();
340+
templates.statements.time_series_select = '{% for time_item in seria %}' +
341+
'select to_timestamp(\'{{ time_item[0] }}\') date_from, to_timestamp(\'{{ time_item[1] }}\') date_to \n' +
342+
'{% if not loop.last %} UNION ALL\n{% endif %}' +
343+
'{% endfor %}';
344+
return templates;
345+
}
337346
}

packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1353,7 +1353,7 @@ SELECT 1 AS revenue, cast('2024-01-01' AS timestamp) as time UNION ALL
13531353
{ visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling_three_day: '900' }
13541354
]));
13551355

1356-
it('rolling count', async () => runQueryTest({
1356+
it('rolling count 11', async () => runQueryTest({
13571357
measures: [
13581358
'visitors.countRolling'
13591359
],

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use super::base_query_options::FilterItem;
2+
use super::driver_tools::{DriverTools, NativeDriverTools};
23
use super::filter_group::{FilterGroup, NativeFilterGroup};
34
use super::filter_params::{FilterParams, NativeFilterParams};
45
use super::pre_aggregation_obj::{NativePreAggregationObj, PreAggregationObj};
56
use super::security_context::{NativeSecurityContext, SecurityContext};
67
use super::sql_templates_render::{NativeSqlTemplatesRender, SqlTemplatesRender};
7-
use super::driver_tools::{NativeDriverTools, DriverTools};
88
use super::sql_utils::{NativeSqlUtils, SqlUtils};
99
use cubenativeutils::wrappers::serializer::{
1010
NativeDeserialize, NativeDeserializer, NativeSerialize,
@@ -18,12 +18,6 @@ use std::rc::Rc;
1818
#[nativebridge::native_bridge]
1919
pub trait BaseTools {
2020
fn driver_tools(&self, external: bool) -> Result<Rc<dyn DriverTools>, CubeError>;
21-
fn convert_tz(&self, field: String) -> Result<String, CubeError>;
22-
fn time_grouped_column(
23-
&self,
24-
granularity: String,
25-
dimension: String,
26-
) -> Result<String, CubeError>;
2721
fn sql_templates(&self) -> Result<Rc<dyn SqlTemplatesRender>, CubeError>;
2822
fn security_context_for_rust(&self) -> Result<Rc<dyn SecurityContext>, CubeError>;
2923
fn sql_utils_for_rust(&self) -> Result<Rc<dyn SqlUtils>, CubeError>;
@@ -35,10 +29,6 @@ pub trait BaseTools {
3529
&self,
3630
used_filters: Option<Vec<FilterItem>>,
3731
) -> Result<Rc<dyn FilterGroup>, CubeError>;
38-
fn timestamp_precision(&self) -> Result<u32, CubeError>;
39-
fn time_stamp_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
40-
fn date_time_cast(&self, field: String) -> Result<String, CubeError>; //TODO move to templates
41-
fn in_db_time_zone(&self, date: String) -> Result<String, CubeError>;
4232
fn generate_time_series(
4333
&self,
4434
granularity: String,
@@ -51,23 +41,8 @@ pub trait BaseTools {
5141
origin: String,
5242
) -> Result<Vec<Vec<String>>, CubeError>;
5343
fn get_allocated_params(&self) -> Result<Vec<String>, CubeError>;
54-
fn subtract_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
55-
fn add_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
56-
fn add_timestamp_interval(&self, date: String, interval: String) -> Result<String, CubeError>;
5744
fn all_cube_members(&self, path: String) -> Result<Vec<String>, CubeError>;
5845
fn interval_and_minimal_time_unit(&self, interval: String) -> Result<Vec<String>, CubeError>;
59-
//===== TODO Move to templates
60-
fn hll_init(&self, sql: String) -> Result<String, CubeError>;
61-
fn hll_merge(&self, sql: String) -> Result<String, CubeError>;
62-
fn hll_cardinality_merge(&self, sql: String) -> Result<String, CubeError>;
63-
fn count_distinct_approx(&self, sql: String) -> Result<String, CubeError>;
64-
fn date_bin(
65-
&self,
66-
interval: String,
67-
source: String,
68-
origin: String,
69-
) -> Result<String, CubeError>;
70-
7146
fn get_pre_aggregation_by_name(
7247
&self,
7348
cube_name: String,

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/driver_tools.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
use super::base_query_options::FilterItem;
2-
use super::filter_group::{FilterGroup, NativeFilterGroup};
3-
use super::filter_params::{FilterParams, NativeFilterParams};
4-
use super::pre_aggregation_obj::{NativePreAggregationObj, PreAggregationObj};
5-
use super::security_context::{NativeSecurityContext, SecurityContext};
61
use super::sql_templates_render::{NativeSqlTemplatesRender, SqlTemplatesRender};
7-
use super::sql_utils::{NativeSqlUtils, SqlUtils};
82
use cubenativeutils::wrappers::serializer::{
93
NativeDeserialize, NativeDeserializer, NativeSerialize,
104
};

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod case_item;
66
pub mod case_label;
77
pub mod cube_definition;
88
pub mod dimension_definition;
9+
pub mod driver_tools;
910
pub mod evaluator;
1011
pub mod filter_group;
1112
pub mod filter_params;
@@ -28,4 +29,3 @@ pub mod segment_definition;
2829
pub mod sql_templates_render;
2930
pub mod sql_utils;
3031
pub mod struct_with_sql_member;
31-
pub mod driver_tools;

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use super::*;
2-
use crate::cube_bridge::pre_aggregation_obj::PreAggregationObj;
32
use crate::logical_plan::*;
43
use crate::plan::FilterItem;
54
use crate::planner::query_tools::QueryTools;
@@ -29,7 +28,7 @@ impl MatchState {
2928

3029
pub struct PreAggregationOptimizer {
3130
query_tools: Rc<QueryTools>,
32-
used_pre_aggregations: HashMap<(String, String), Rc<dyn PreAggregationObj>>,
31+
used_pre_aggregations: HashMap<(String, String), Rc<PreAggregation>>,
3332
}
3433

3534
impl PreAggregationOptimizer {
@@ -71,7 +70,7 @@ impl PreAggregationOptimizer {
7170
Ok(None)
7271
}
7372

74-
pub fn get_used_pre_aggregations(&self) -> Vec<Rc<dyn PreAggregationObj>> {
73+
pub fn get_used_pre_aggregations(&self) -> Vec<Rc<PreAggregation>> {
7574
self.used_pre_aggregations.values().cloned().collect()
7675
}
7776

@@ -445,15 +444,14 @@ impl PreAggregationOptimizer {
445444
granularity: pre_aggregation.granularity.clone(),
446445
table_name: table_name.clone(),
447446
cube_name: pre_aggregation.cube_name.clone(),
447+
pre_aggregation_obj,
448448
};
449+
let result = Rc::new(pre_aggregation);
449450
self.used_pre_aggregations.insert(
450-
(
451-
pre_aggregation.cube_name.clone(),
452-
pre_aggregation.name.clone(),
453-
),
454-
pre_aggregation_obj.clone(),
451+
(result.cube_name.clone(), result.name.clone()),
452+
result.clone(),
455453
);
456-
Ok(Rc::new(pre_aggregation))
454+
Ok(result)
457455
} else {
458456
Err(CubeError::internal(format!(
459457
"Cannot find pre aggregation object for cube {} and name {}",

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::*;
2+
use crate::cube_bridge::pre_aggregation_obj::PreAggregationObj;
23
use crate::planner::sql_evaluator::MemberSymbol;
34
use itertools::Itertools;
45
use std::rc::Rc;
@@ -13,6 +14,7 @@ pub struct PreAggregation {
1314
pub granularity: Option<String>,
1415
pub table_name: String,
1516
pub cube_name: String,
17+
pub pre_aggregation_obj: Rc<dyn PreAggregationObj>,
1618
}
1719

1820
impl PrettyPrint for PreAggregation {

rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ impl PhysicalPlanBuilder {
101101
) -> Result<Rc<Select>, CubeError> {
102102
let mut render_references = HashMap::new();
103103
let mut measure_references = HashMap::new();
104+
let mut dimensions_references = HashMap::new();
104105
let mut context_factory = context.make_sql_nodes_factory();
105106
let from = match &logical_plan.source {
106107
SimpleQuerySource::LogicalJoin(join) => self.process_logical_join(
@@ -113,8 +114,8 @@ impl PhysicalPlanBuilder {
113114
let res = self.process_pre_aggregation(
114115
pre_aggregation,
115116
context,
116-
&mut render_references,
117117
&mut measure_references,
118+
&mut dimensions_references,
118119
)?;
119120
for member in logical_plan.schema.time_dimensions.iter() {
120121
context_factory.add_dimensions_with_ignored_timezone(member.full_name());
@@ -127,6 +128,7 @@ impl PhysicalPlanBuilder {
127128
let mut select_builder = SelectBuilder::new(from);
128129
context_factory.set_ungrouped(logical_plan.ungrouped);
129130
context_factory.set_pre_aggregation_measures_references(measure_references);
131+
context_factory.set_pre_aggregation_dimensions_references(dimensions_references);
130132

131133
let mut group_by = Vec::new();
132134
for member in logical_plan.schema.dimensions.iter() {
@@ -184,8 +186,8 @@ impl PhysicalPlanBuilder {
184186
&self,
185187
pre_aggregation: &Rc<PreAggregation>,
186188
_context: &PhysicalPlanBuilderContext,
187-
render_references: &mut HashMap<String, QualifiedColumnName>,
188189
measure_references: &mut HashMap<String, QualifiedColumnName>,
190+
dimensions_references: &mut HashMap<String, QualifiedColumnName>,
189191
) -> Result<Rc<From>, CubeError> {
190192
let mut pre_aggregation_schema = Schema::empty();
191193
let pre_aggregation_alias = PlanSqlTemplates::memeber_alias_name(
@@ -200,7 +202,7 @@ impl PhysicalPlanBuilder {
200202
&dim.alias_suffix(),
201203
self.query_tools.clone(),
202204
)?;
203-
render_references.insert(
205+
dimensions_references.insert(
204206
dim.full_name(),
205207
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
206208
);
@@ -213,16 +215,10 @@ impl PhysicalPlanBuilder {
213215
granularity,
214216
self.query_tools.clone(),
215217
)?;
216-
render_references.insert(
218+
dimensions_references.insert(
217219
dim.full_name(),
218220
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
219221
);
220-
if let Some(granularity) = &granularity {
221-
render_references.insert(
222-
format!("{}_{}", dim.full_name(), granularity),
223-
QualifiedColumnName::new(Some(pre_aggregation_alias.clone()), alias.clone()),
224-
);
225-
}
226222
pre_aggregation_schema.add_column(SchemaColumn::new(alias, Some(dim.full_name())));
227223
}
228224
for meas in pre_aggregation.measures.iter() {
@@ -969,9 +965,7 @@ impl PhysicalPlanBuilder {
969965
));
970966
};
971967

972-
let templates = self.query_tools.plan_sql_templates(false)?;
973-
974-
let ts_date_range = if templates.supports_generated_time_series()
968+
let ts_date_range = if self.plan_sql_templates.supports_generated_time_series()
975969
&& granularity_obj.is_predefined_granularity()
976970
{
977971
if let Some(date_range) = time_dimension_symbol

rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub struct ToDateRollingWindowJoinCondition {
117117
time_series_source: String,
118118
granularity: String,
119119
time_dimension: Expr,
120-
query_tools: Rc<QueryTools>,
120+
_query_tools: Rc<QueryTools>,
121121
}
122122

123123
impl ToDateRollingWindowJoinCondition {
@@ -131,7 +131,7 @@ impl ToDateRollingWindowJoinCondition {
131131
time_series_source,
132132
granularity,
133133
time_dimension,
134-
query_tools,
134+
_query_tools: query_tools,
135135
}
136136
}
137137

@@ -146,10 +146,7 @@ impl ToDateRollingWindowJoinCondition {
146146
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?;
147147
let date_to =
148148
templates.column_reference(&Some(self.time_series_source.clone()), "date_from")?;
149-
let grouped_from = self
150-
.query_tools
151-
.base_tools()
152-
.time_grouped_column(self.granularity.clone(), date_from)?;
149+
let grouped_from = templates.time_grouped_column(self.granularity.clone(), date_from)?;
153150
let result = format!("{date_column} >= {grouped_from} and {date_column} <= {date_to}");
154151
Ok(result)
155152
}

0 commit comments

Comments
 (0)