Skip to content

Commit 5f72d8f

Browse files
committed
fix: Reduce memory footprint for pre-aggregations with many partitions by caching partition SQL
1 parent 339de65 commit 5f72d8f

File tree

4 files changed

+22
-6
lines changed

4 files changed

+22
-6
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,7 +1398,7 @@ export class PreAggregationLoader {
13981398
}
13991399
}
14001400

1401-
interface PreAggsPartiotionRangeLoaderOpts {
1401+
interface PreAggsPartitionRangeLoaderOpts {
14021402
maxPartitions: number;
14031403
maxSourceRowLimit: number;
14041404
waitForRenew?: boolean;
@@ -1409,6 +1409,7 @@ interface PreAggsPartiotionRangeLoaderOpts {
14091409
orphanedTimeout?: number;
14101410
lambdaQuery?: LambdaQuery;
14111411
isJob?: boolean;
1412+
compilerCacheFn?: <T>(subKey: string[], cacheFn: () => T) => T;
14121413
}
14131414

14141415
export class PreAggregationPartitionRangeLoader {
@@ -1427,6 +1428,8 @@ export class PreAggregationPartitionRangeLoader {
14271428

14281429
protected dataSource: string;
14291430

1431+
protected compilerCacheFn: <T>(subKey: string[], cacheFn: () => T) => T;
1432+
14301433
public constructor(
14311434
private readonly redisPrefix: string,
14321435
private readonly driverFactory: DriverFactory,
@@ -1437,7 +1440,7 @@ export class PreAggregationPartitionRangeLoader {
14371440
private readonly preAggregation: PreAggregationDescription,
14381441
private readonly preAggregationsTablesToTempTables: any,
14391442
private readonly loadCache: any,
1440-
private readonly options: PreAggsPartiotionRangeLoaderOpts = {
1443+
private readonly options: PreAggsPartitionRangeLoaderOpts = {
14411444
maxPartitions: 10000,
14421445
maxSourceRowLimit: 10000,
14431446
},
@@ -1447,6 +1450,7 @@ export class PreAggregationPartitionRangeLoader {
14471450
this.requestId = options.requestId;
14481451
this.lambdaQuery = options.lambdaQuery;
14491452
this.dataSource = preAggregation.dataSource;
1453+
this.compilerCacheFn = options.compilerCacheFn || ((subKey, cacheFn) => cacheFn());
14501454
}
14511455

14521456
private async loadRangeQuery(rangeQuery: QueryTuple, partitionRange?: QueryDateRange) {
@@ -1681,7 +1685,7 @@ export class PreAggregationPartitionRangeLoader {
16811685
public async partitionPreAggregations(): Promise<PreAggregationDescription[]> {
16821686
if (this.preAggregation.partitionGranularity && !this.preAggregation.expandedPartition) {
16831687
const { buildRange, partitionRanges } = await this.partitionRanges();
1684-
return partitionRanges.map(range => this.partitionPreAggregationDescription(range, buildRange));
1688+
return this.compilerCacheFn(['partitions', JSON.stringify(buildRange)], () => partitionRanges.map(range => this.partitionPreAggregationDescription(range, buildRange)));
16851689
} else {
16861690
return [this.preAggregation];
16871691
}
@@ -1701,10 +1705,10 @@ export class PreAggregationPartitionRangeLoader {
17011705
// use last partition so outer query can receive expected table structure.
17021706
dateRange = [buildRange[1], buildRange[1]];
17031707
}
1704-
const partitionRanges = PreAggregationPartitionRangeLoader.timeSeries(
1708+
const partitionRanges = this.compilerCacheFn(['timeSeries', this.preAggregation.partitionGranularity, JSON.stringify(dateRange)], () => PreAggregationPartitionRangeLoader.timeSeries(
17051709
this.preAggregation.partitionGranularity,
17061710
dateRange,
1707-
);
1711+
));
17081712
if (partitionRanges.length > this.options.maxPartitions) {
17091713
throw new Error(
17101714
`The maximum number of partitions (${
@@ -2145,6 +2149,7 @@ export class PreAggregations {
21452149
waitForRenew: queryBody.renewQuery,
21462150
requestId: queryBody.requestId,
21472151
externalRefresh: this.externalRefresh,
2152+
compilerCacheFn: queryBody.compilerCacheFn,
21482153
},
21492154
);
21502155

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export type Query = {
3636
groupedPartitionPreAggregations?: PreAggregationDescription[][];
3737
preAggregationsLoadCacheByDataSource?: any;
3838
renewQuery?: boolean;
39+
compilerCacheFn?: <T>(subKey: string[], cacheFn: () => T) => T;
3940
};
4041

4142
export type QueryBody = {

packages/cubejs-server-core/src/core/CompilerApi.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,15 @@ export class CompilerApi {
143143
}
144144
}
145145

146+
async compilerCacheFn(requestId, key, path) {
147+
const compilers = await this.getCompilers({ requestId });
148+
if (this.sqlCache) {
149+
return (subKey, cacheFn) => compilers.compilerCache.getQueryCache(key).cache(path.concat(subKey), cacheFn);
150+
} else {
151+
return (cacheFn) => cacheFn();
152+
}
153+
}
154+
146155
async preAggregations(filter) {
147156
const { cubeEvaluator } = await this.getCompilers();
148157
return cubeEvaluator.preAggregations(filter);

packages/cubejs-server-core/src/core/RefreshScheduler.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ export class RefreshScheduler {
158158
const queryBody = {
159159
preAggregations: preAggregationDescriptionList,
160160
preAggregationsLoadCacheByDataSource,
161-
requestId: context.requestId
161+
requestId: context.requestId,
162+
compilerCacheFn: await compilerApi.compilerCacheFn(context.requestId, baseQuery, ['expandPartitions']),
162163
};
163164

164165
if (queryingOptions.cacheOnly) {

0 commit comments

Comments
 (0)