diff --git a/packages/cubejs-backend-native/js/index.ts b/packages/cubejs-backend-native/js/index.ts index 077c3de15986f..8bd390d3801a4 100644 --- a/packages/cubejs-backend-native/js/index.ts +++ b/packages/cubejs-backend-native/js/index.ts @@ -122,12 +122,14 @@ export type SQLInterfaceOptions = { export interface TransformConfig { fileName: string; + fileContent: string; transpilers: string[]; compilerId: string; metaData?: { cubeNames: string[]; cubeSymbols: Record>; contextSymbols: Record; + stage: 0 | 1 | 2 | 3; } } @@ -495,11 +497,11 @@ export const getFinalQueryResultMulti = (transformDataArr: Object[], rows: any[] return native.getFinalQueryResultMulti(transformDataArr, rows, responseData); }; -export const transpileJs = async (content: String, metadata: TransformConfig): Promise => { +export const transpileJs = async (transpileRequests: TransformConfig[]): Promise => { const native = loadNative(); if (native.transpileJs) { - return native.transpileJs(content, metadata); + return native.transpileJs(transpileRequests); } throw new Error('TranspileJs native implementation not found!'); diff --git a/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js b/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js index 98e0af7821257..eb0fee5e84523 100644 --- a/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js +++ b/packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js @@ -1,6 +1,7 @@ import crypto from 'crypto'; import vm from 'vm'; import fs from 'fs'; +import os from 'os'; import path from 'path'; import syntaxCheck from 'syntax-error'; import { parse } from '@babel/parser'; @@ -21,6 +22,21 @@ const moduleFileCache = {}; const JINJA_SYNTAX = /{%|%}|{{|}}/ig; +const getThreadsCount = () => { + const envThreads = getEnv('transpilationWorkerThreadsCount'); + if (envThreads > 0) { + return envThreads; + } + + const cpuCount = os.cpus()?.length; + if (cpuCount) { + // there's no practical boost above 5 threads even if you have more cores. + return Math.min(Math.max(1, cpuCount - 1), 5); + } + + return 3; // Default (like the workerpool do) +}; + export class DataSchemaCompiler { constructor(repository, options = {}) { this.repository = repository; @@ -100,6 +116,7 @@ export class DataSchemaCompiler { const transpilationWorkerThreads = getEnv('transpilationWorkerThreads'); const transpilationNative = getEnv('transpilationNative'); + const transpilationNativeThreadsCount = getThreadsCount(); const { compilerId } = this; if (!transpilationNative && transpilationWorkerThreads) { @@ -110,7 +127,11 @@ export class DataSchemaCompiler { ); } - const transpile = async () => { + /** + * @param stage Number + * @returns {Promise<*>} + */ + const transpile = async (stage) => { let cubeNames; let cubeSymbols; let transpilerNames; @@ -145,9 +166,28 @@ export class DataSchemaCompiler { content: ';', }; - await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS, compilerId }); + await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS, compilerId, stage }); + + const nonJsFilesTasks = toCompile.filter(file => !file.fileName.endsWith('.js')) + .map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId })); + + const jsFiles = toCompile.filter(file => file.fileName.endsWith('.js')); + let jsChunks; + if (jsFiles.length < transpilationNativeThreadsCount * transpilationNativeThreadsCount) { + jsChunks = [jsFiles]; + } else { + const baseSize = Math.floor(jsFiles.length / transpilationNativeThreadsCount); + jsChunks = []; + for (let i = 0; i < transpilationNativeThreadsCount; i++) { + // For the last part, we take the remaining files so we don't lose the extra ones. + const start = i * baseSize; + const end = (i === transpilationNativeThreadsCount - 1) ? jsFiles.length : start + baseSize; + jsChunks.push(jsFiles.slice(start, end)); + } + } + const JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { transpilerNames, compilerId })); - results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId }))); + results = (await Promise.all([...nonJsFilesTasks, ...JsFilesTasks])).flat(); } else if (transpilationWorkerThreads) { results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbols, transpilerNames }))); } else { @@ -157,17 +197,17 @@ export class DataSchemaCompiler { return results.filter(f => !!f); }; - const compilePhase = async (compilers) => this.compileCubeFiles(compilers, await transpile(), errorsReport); + const compilePhase = async (compilers, stage) => this.compileCubeFiles(compilers, await transpile(stage), errorsReport); - return compilePhase({ cubeCompilers: this.cubeNameCompilers }) - .then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers.concat([this.viewCompilationGate]) })) + return compilePhase({ cubeCompilers: this.cubeNameCompilers }, 0) + .then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers.concat([this.viewCompilationGate]) }, 1)) .then(() => (this.viewCompilationGate.shouldCompileViews() ? - compilePhase({ cubeCompilers: this.viewCompilers }) + compilePhase({ cubeCompilers: this.viewCompilers }, 2) : Promise.resolve())) .then(() => compilePhase({ cubeCompilers: this.cubeCompilers, contextCompilers: this.contextCompilers, - })) + }, 3)) .then(() => { if (transpilationNative) { // Clean up cache @@ -179,7 +219,7 @@ export class DataSchemaCompiler { return this.transpileJsFile( dummyFile, errorsReport, - { cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {}, compilerId: this.compilerId } + { cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {}, compilerId: this.compilerId, stage: 0 } ); } else if (transpilationWorkerThreads && this.workerPool) { this.workerPool.terminate(); @@ -227,11 +267,50 @@ export class DataSchemaCompiler { } } - async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId }) { + /** + * Right now it is used only for transpilation in native, + * so no checks for transpilation type inside this method + */ + async transpileJsFilesBulk(files, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) { + // for bulk processing this data may be optimized even more by passing transpilerNames, compilerId only once for a bulk + // but this requires more complex logic to be implemented in the native side. + // And comparing to the file content sizes, a few bytes of JSON data is not a big deal here + const reqDataArr = files.map(file => ({ + fileName: file.fileName, + fileContent: file.content, + transpilers: transpilerNames, + compilerId, + ...(cubeNames && { + metaData: { + cubeNames, + cubeSymbols, + contextSymbols, + stage + }, + }), + })); + const res = await transpileJs(reqDataArr); + + return files.map((file, index) => { + errorsReport.inFile(file); + if (!res[index]) { // This should not happen in theory but just to be safe + errorsReport.error(`No transpilation result received for the file ${file.fileName}.`); + return undefined; + } + errorsReport.addErrors(res[index].errors); + errorsReport.addWarnings(res[index].warnings); + errorsReport.exitFile(); + + return { ...file, content: res[index].code }; + }); + } + + async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) { try { if (getEnv('transpilationNative')) { const reqData = { fileName: file.fileName, + fileContent: file.content, transpilers: transpilerNames, compilerId, ...(cubeNames && { @@ -239,17 +318,18 @@ export class DataSchemaCompiler { cubeNames, cubeSymbols, contextSymbols, + stage }, }), }; errorsReport.inFile(file); - const res = await transpileJs(file.content, reqData); - errorsReport.addErrors(res.errors); - errorsReport.addWarnings(res.warnings); + const res = await transpileJs([reqData]); + errorsReport.addErrors(res[0].errors); + errorsReport.addWarnings(res[0].warnings); errorsReport.exitFile(); - return { ...file, content: res.code }; + return { ...file, content: res[0].code }; } else if (getEnv('transpilationWorkerThreads')) { const data = { fileName: file.fileName,