Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,9 @@ const variables: Record<string, (...args: any) => any> = {
cubeStoreNoHeartBeatTimeout: () => get('CUBEJS_CUBESTORE_NO_HEART_BEAT_TIMEOUT')
.default('30')
.asInt(),

cubeStoreRollingWindowJoin: () => get('CUBEJS_CUBESTORE_ROLLING_WINDOW_JOIN')
.default('false')
.asBoolStrict(),
allowUngroupedWithoutPrimaryKey: () => get('CUBEJS_ALLOW_UNGROUPED_WITHOUT_PRIMARY_KEY')
.default(get('CUBESQL_SQL_PUSH_DOWN').default('true').asString())
.asBoolStrict(),
Expand Down
6 changes: 4 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ export class BaseQuery {
if (this.useNativeSqlPlanner && !this.neverUseSqlPlannerPreaggregation()) {
const fullAggregateMeasures = this.fullKeyQueryAggregateMeasures({ hasMultipliedForPreAggregation: true });

this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0 || fullAggregateMeasures.cumulativeMeasures.length > 0;
this.canUseNativeSqlPlannerPreAggregation = fullAggregateMeasures.multiStageMembers.length > 0;
}
this.queryLevelJoinHints = this.options.joinHints ?? [];
this.prebuildJoin();
Expand Down Expand Up @@ -880,6 +880,7 @@ export class BaseQuery {
preAggregationQuery: this.options.preAggregationQuery,
totalQuery: this.options.totalQuery,
joinHints: this.options.joinHints,
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin')
};

const buildResult = nativeBuildSqlAndParams(queryParams);
Expand Down Expand Up @@ -929,7 +930,8 @@ export class BaseQuery {
baseTools: this,
ungrouped: this.options.ungrouped,
exportAnnotatedSql: false,
preAggregationQuery: this.options.preAggregationQuery
preAggregationQuery: this.options.preAggregationQuery,
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin')
};

const buildResult = nativeBuildSqlAndParams(queryParams);
Expand Down
17 changes: 15 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import moment from 'moment-timezone';
import { parseSqlInterval } from '@cubejs-backend/shared';
import { parseSqlInterval, getEnv } from '@cubejs-backend/shared';
import { BaseQuery } from './BaseQuery';
import { BaseFilter } from './BaseFilter';
import { BaseMeasure } from './BaseMeasure';
Expand Down Expand Up @@ -32,6 +32,13 @@ type RollingWindow = {
};

export class CubeStoreQuery extends BaseQuery {
private readonly cubeStoreRollingWindowJoin: boolean;

public constructor(compilers, options) {
super(compilers, options);
this.cubeStoreRollingWindowJoin = getEnv('cubeStoreRollingWindowJoin');
}

public newFilter(filter) {
return new CubeStoreFilter(this, filter);
}
Expand All @@ -57,10 +64,16 @@ export class CubeStoreQuery extends BaseQuery {
}

public subtractInterval(date: string, interval: string) {
if (this.cubeStoreRollingWindowJoin) {
return super.subtractInterval(date, interval);
}
return `DATE_SUB(${date}, INTERVAL ${this.formatInterval(interval)})`;
}

public addInterval(date: string, interval: string) {
if (this.cubeStoreRollingWindowJoin) {
return super.addInterval(date, interval);
}
return `DATE_ADD(${date}, INTERVAL ${this.formatInterval(interval)})`;
}

Expand Down Expand Up @@ -185,7 +198,7 @@ export class CubeStoreQuery extends BaseQuery {
cumulativeMeasures: Array<[boolean, BaseMeasure]>,
preAggregationForQuery: any
) {
if (!cumulativeMeasures.length) {
if (this.cubeStoreRollingWindowJoin || !cumulativeMeasures.length) {
return super.regularAndTimeSeriesRollupQuery(regularMeasures, multipliedMeasures, cumulativeMeasures, preAggregationForQuery);
}
const cumulativeMeasuresWithoutMultiplied = cumulativeMeasures.map(([_, measure]) => measure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe('PreAggregationsMultiStage', () => {

testMeas: {
type: 'countDistinct',
sql: \`\${createdAtDay}\`
sql: \`\${createdAtDay}\`
}
},

Expand Down Expand Up @@ -188,7 +188,8 @@ describe('PreAggregationsMultiStage', () => {
order: [{
id: 'visitors.createdAt'
}],
preAggregationsSchema: ''
preAggregationsSchema: '',
cubestoreSupportMultistage: true
});

const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
Expand Down Expand Up @@ -231,7 +232,8 @@ describe('PreAggregationsMultiStage', () => {
order: [{
id: 'visitors.source'
}],
preAggregationsSchema: ''
preAggregationsSchema: '',
cubestoreSupportMultistage: true
});

const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
Expand Down Expand Up @@ -262,7 +264,8 @@ describe('PreAggregationsMultiStage', () => {
order: [{
id: 'visitors.source'
}],
preAggregationsSchema: ''
preAggregationsSchema: '',
cubestoreSupportMultistage: true
});

const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1922,6 +1922,7 @@ describe('PreAggregations', () => {
}, {
id: 'visitors.source'
}],
cubestoreSupportMultistage: getEnv("nativeSqlPlanner")
});

const queryAndParams = query.buildSqlAndParams();
Expand Down Expand Up @@ -1999,6 +2000,7 @@ describe('PreAggregations', () => {
}, {
id: 'visitors.source'
}],
cubestoreSupportMultistage: getEnv("nativeSqlPlanner")
});

const queryAndParams = query.buildSqlAndParams();
Expand Down
4 changes: 0 additions & 4 deletions packages/cubejs-testing-drivers/fixtures/athena.json
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,6 @@
"querying custom granularities ECommerce: count by three_months_by_march + dimension",
"querying custom granularities (with preaggregation) ECommerce: totalQuantity by half_year + no dimension",
"querying custom granularities (with preaggregation) ECommerce: totalQuantity by half_year + dimension",
"pre-aggregations Customers: running total without time dimension",
"querying BigECommerce: totalProfitYearAgo",
"SQL API: post-aggregate percentage of total",
"SQL API: Simple Rollup",
"SQL API: Complex Rollup",
"SQL API: Nested Rollup",
Expand All @@ -201,7 +198,6 @@
"SQL API: Nested Rollup with aliases",
"SQL API: Nested Rollup over asterisk",
"SQL API: Extended nested Rollup over asterisk",
"SQL API: Timeshift measure from cube",
"SQL API: SQL push down push to cube quoted alias",
"querying BigECommerce: rolling window YTD (month + week)",
"querying BigECommerce: rolling window YTD (month + week + no gran)",
Expand Down
5 changes: 1 addition & 4 deletions packages/cubejs-testing-drivers/fixtures/postgres.json
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@
"querying ECommerce: total quantity, avg discount, total sales, total profit by product + order + total -- noisy test",
"querying custom granularities (with preaggregation) ECommerce: totalQuantity by half_year + no dimension",
"querying custom granularities (with preaggregation) ECommerce: totalQuantity by half_year + dimension",
"pre-aggregations Customers: running total without time dimension",
"querying BigECommerce: totalProfitYearAgo",
"SQL API: post-aggregate percentage of total",

"SQL API: Simple Rollup",
"SQL API: Complex Rollup",
"SQL API: Rollup with aliases",
Expand All @@ -186,7 +184,6 @@
"SQL API: Nested Rollup with aliases",
"SQL API: Nested Rollup over asterisk",
"SQL API: Extended nested Rollup over asterisk",
"SQL API: Timeshift measure from cube",
"SQL API: SQL push down push to cube quoted alias",

"---- Different results comparing to baseQuery version. Need to investigate ----",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub struct BaseQueryOptionsStatic {
pub pre_aggregation_query: Option<bool>,
#[serde(rename = "totalQuery")]
pub total_query: Option<bool>,
#[serde(rename = "cubestoreSupportMultistage")]
pub cubestore_support_multistage: Option<bool>,
}

#[nativebridge::native_bridge(BaseQueryOptionsStatic)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ impl MatchState {

pub struct PreAggregationOptimizer {
query_tools: Rc<QueryTools>,
allow_multi_stage: bool,
used_pre_aggregations: HashMap<(String, String), Rc<PreAggregation>>,
}

impl PreAggregationOptimizer {
pub fn new(query_tools: Rc<QueryTools>) -> Self {
pub fn new(query_tools: Rc<QueryTools>, allow_multi_stage: bool) -> Self {
Self {
query_tools,
allow_multi_stage,
used_pre_aggregations: HashMap::new(),
}
}
Expand Down Expand Up @@ -96,7 +98,10 @@ impl PreAggregationOptimizer {
query: &FullKeyAggregateQuery,
pre_aggregation: &Rc<CompiledPreAggregation>,
) -> Result<Option<Rc<Query>>, CubeError> {
if !query.multistage_members.is_empty() {
if !self.allow_multi_stage && !query.multistage_members.is_empty() {
return Ok(None);
}
if self.allow_multi_stage && !query.multistage_members.is_empty() {
return self
.try_rewrite_full_key_aggregate_query_with_multi_stages(query, pre_aggregation);
}
Expand Down
12 changes: 10 additions & 2 deletions rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@ pub struct BaseQuery<IT: InnerTypes> {
context: NativeContextHolder<IT>,
query_tools: Rc<QueryTools>,
request: Rc<QueryProperties>,
cubestore_support_multistage: bool,
}

impl<IT: InnerTypes> BaseQuery<IT> {
pub fn try_new(
context: NativeContextHolder<IT>,
options: Rc<dyn BaseQueryOptions>,
) -> Result<Self, CubeError> {
let cubestore_support_multistage = options
.static_data()
.cubestore_support_multistage
.unwrap_or(false);
let query_tools = QueryTools::try_new(
options.cube_evaluator()?,
options.base_tools()?,
Expand All @@ -41,6 +46,7 @@ impl<IT: InnerTypes> BaseQuery<IT> {
context,
query_tools,
request,
cubestore_support_multistage,
})
}

Expand Down Expand Up @@ -142,8 +148,10 @@ impl<IT: InnerTypes> BaseQuery<IT> {
plan: Rc<Query>,
) -> Result<(Rc<Query>, Vec<Rc<PreAggregation>>), CubeError> {
let result = if !self.request.is_pre_aggregation_query() {
let mut pre_aggregation_optimizer =
PreAggregationOptimizer::new(self.query_tools.clone());
let mut pre_aggregation_optimizer = PreAggregationOptimizer::new(
self.query_tools.clone(),
self.cubestore_support_multistage,
);
if let Some(result) = pre_aggregation_optimizer.try_optimize(plan.clone())? {
if pre_aggregation_optimizer.get_used_pre_aggregations().len() == 1 {
(
Expand Down
Loading