Skip to content

Commit f565371

Browse files
committed
fix: Limit pre-aggregations API memory usage by introducing partition loading concurrency
1 parent 9ce883d commit f565371

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

packages/cubejs-server-core/src/core/RefreshScheduler.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import R from 'ramda';
2+
import pLimit from 'p-limit';
23
import { v4 as uuidv4 } from 'uuid';
34
import crypto from 'crypto';
45
import { Required } from '@cubejs-backend/shared';
@@ -35,7 +36,8 @@ type PreAggregationsQueryingOptions = {
3536
partitions?: string[]
3637
}[],
3738
forceBuildPreAggregations?: boolean,
38-
throwErrors?: boolean
39+
throwErrors?: boolean,
40+
preAggregationLoadConcurrency?: number,
3941
};
4042

4143
type RefreshQueries = {
@@ -382,7 +384,9 @@ export class RefreshScheduler {
382384
preAggregationIds: Object.keys(preAggregationsQueryingOptions)
383385
});
384386

385-
return Promise.all(preAggregations.map(async preAggregation => {
387+
const loadConcurrency = pLimit(queryingOptions.preAggregationLoadConcurrency || 1);
388+
389+
return Promise.all(preAggregations.map(preAggregation => async () => {
386390
const { timezones } = queryingOptions;
387391
const { partitions: partitionsFilter, cacheOnly } = preAggregationsQueryingOptions[preAggregation.id] || {};
388392

@@ -459,7 +463,7 @@ export class RefreshScheduler {
459463
errors,
460464
partitionsWithDependencies
461465
};
462-
}));
466+
}).map(loadConcurrency));
463467
}
464468

465469
protected async roundRobinRefreshPreAggregationsQueryIterator(context, compilerApi: CompilerApi, queryingOptions, queriesCache: { [key: string]: Promise<PreAggregationDescription[][]> }) {

0 commit comments

Comments
 (0)