Skip to content

Commit e04afad

Browse files
committed
correct queueOptionsWrapper flow
1 parent 6740e00 commit e04afad

File tree

1 file changed

+13
-4
lines changed

1 file changed

+13
-4
lines changed

packages/cubejs-server-core/src/core/OptsHandler.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ export class OptsHandler {
287287
private queueOptionsWrapper(
288288
context: RequestContext,
289289
queueOptions: unknown | ((dataSource?: string) => QueueOptions),
290+
queueType: 'query' | 'pre-aggs',
290291
): (dataSource?: string) => Promise<QueueOptions> {
291292
return async (dataSource = 'default') => {
292293
const options = (
@@ -298,6 +299,14 @@ export class OptsHandler {
298299
// concurrency specified in cube.js
299300
return options;
300301
} else {
302+
const workerConcurrency = getEnv('refreshWorkerConcurrency');
303+
if (queueType === 'pre-aggs' && workerConcurrency) {
304+
return {
305+
...options,
306+
concurrency: workerConcurrency,
307+
};
308+
}
309+
301310
const envConcurrency: number = getEnv('concurrency', { dataSource });
302311
if (envConcurrency) {
303312
// concurrency specified in CUBEJS_CONCURRENCY
@@ -320,7 +329,7 @@ export class OptsHandler {
320329
// no specified concurrency
321330
return {
322331
...options,
323-
concurrency: 2,
332+
concurrency: 5,
324333
};
325334
}
326335
}
@@ -458,9 +467,7 @@ export class OptsHandler {
458467
basePath: '/cubejs-api',
459468
dashboardAppPath: 'dashboard-app',
460469
dashboardAppPort: 3000,
461-
scheduledRefreshConcurrency: getEnv('refreshWorkerMode')
462-
? getEnv('refreshWorkerConcurrency')
463-
: getEnv('scheduledRefreshQueriesPerAppId'),
470+
scheduledRefreshConcurrency: getEnv('scheduledRefreshQueriesPerAppId'),
464471
scheduledRefreshBatchSize: getEnv('scheduledRefreshBatchSize'),
465472
preAggregationsSchema:
466473
getEnv('preAggregationsSchema') ||
@@ -661,13 +668,15 @@ export class OptsHandler {
661668
clone.queryCacheOptions.queueOptions = this.queueOptionsWrapper(
662669
context,
663670
clone.queryCacheOptions.queueOptions,
671+
'query'
664672
);
665673

666674
// pre-aggs queue options
667675
clone.preAggregationsOptions = clone.preAggregationsOptions || {};
668676
clone.preAggregationsOptions.queueOptions = this.queueOptionsWrapper(
669677
context,
670678
clone.preAggregationsOptions.queueOptions,
679+
'pre-aggs'
671680
);
672681

673682
// pre-aggs external refresh flag (force to run pre-aggs build flow first if

0 commit comments

Comments
 (0)