Skip to content

Commit 58dae75

Browse files
paveltiunovsrh
authored andcommitted
chore(cubestore): Upgrade DF: backport rolling window implementation and allow multiple ClusterSend nodes within plan to support multi-stage aggregations
1 parent 0a77217 commit 58dae75

File tree

13 files changed

+3895
-460
lines changed

13 files changed

+3895
-460
lines changed

packages/cubejs-backend-shared/src/env.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,6 +1848,9 @@ const variables: Record<string, (...args: any) => any> = {
18481848
cubeStoreNoHeartBeatTimeout: () => get('CUBEJS_CUBESTORE_NO_HEART_BEAT_TIMEOUT')
18491849
.default('30')
18501850
.asInt(),
1851+
cubeStoreRollingWindowJoin: () => get('CUBEJS_CUBESTORE_ROLLING_WINDOW_JOIN')
1852+
.default('false')
1853+
.asBoolStrict(),
18511854

18521855
allowUngroupedWithoutPrimaryKey: () => get('CUBEJS_ALLOW_UNGROUPED_WITHOUT_PRIMARY_KEY')
18531856
.default(get('CUBESQL_SQL_PUSH_DOWN').default('true').asString())

packages/cubejs-schema-compiler/src/adapter/CubeStoreQuery.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import moment from 'moment-timezone';
2-
import { parseSqlInterval } from '@cubejs-backend/shared';
2+
import { parseSqlInterval, getEnv } from '@cubejs-backend/shared';
33
import { BaseQuery } from './BaseQuery';
44
import { BaseFilter } from './BaseFilter';
55
import { BaseMeasure } from './BaseMeasure';
@@ -30,6 +30,13 @@ type RollingWindow = {
3030
};
3131

3232
export class CubeStoreQuery extends BaseQuery {
33+
private readonly cubeStoreRollingWindowJoin: boolean;
34+
35+
public constructor(compilers, options) {
36+
super(compilers, options);
37+
this.cubeStoreRollingWindowJoin = getEnv('cubeStoreRollingWindowJoin');
38+
}
39+
3340
public newFilter(filter) {
3441
return new CubeStoreFilter(this, filter);
3542
}
@@ -55,10 +62,16 @@ export class CubeStoreQuery extends BaseQuery {
5562
}
5663

5764
public subtractInterval(date: string, interval: string) {
65+
if (this.cubeStoreRollingWindowJoin) {
66+
return super.subtractInterval(date, interval);
67+
}
5868
return `DATE_SUB(${date}, INTERVAL ${this.formatInterval(interval)})`;
5969
}
6070

6171
public addInterval(date: string, interval: string) {
72+
if (this.cubeStoreRollingWindowJoin) {
73+
return super.addInterval(date, interval);
74+
}
6275
return `DATE_ADD(${date}, INTERVAL ${this.formatInterval(interval)})`;
6376
}
6477

@@ -179,7 +192,7 @@ export class CubeStoreQuery extends BaseQuery {
179192
cumulativeMeasures: Array<[boolean, BaseMeasure]>,
180193
preAggregationForQuery: any
181194
) {
182-
if (!cumulativeMeasures.length) {
195+
if (this.cubeStoreRollingWindowJoin || !cumulativeMeasures.length) {
183196
return super.regularAndTimeSeriesRollupQuery(regularMeasures, multipliedMeasures, cumulativeMeasures, preAggregationForQuery);
184197
}
185198
const cumulativeMeasuresWithoutMultiplied = cumulativeMeasures.map(([_, measure]) => measure);

rust/cubestore/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)