Skip to content

Commit c274b2a

Browse files
authored
feat(schema-compiler): Implement bulk processing for js transpilation in native (#9427)
* add stage for transpile native * move file content into transform request struct * change native transpile() interface to expect arrays * implement bulk transpileJsFiles in native * small update in getThreadsCount * quickfix
1 parent 74ab946 commit c274b2a

File tree

2 files changed

+98
-16
lines changed

2 files changed

+98
-16
lines changed

packages/cubejs-backend-native/js/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,14 @@ export type SQLInterfaceOptions = {
122122

123123
export interface TransformConfig {
124124
fileName: string;
125+
fileContent: string;
125126
transpilers: string[];
126127
compilerId: string;
127128
metaData?: {
128129
cubeNames: string[];
129130
cubeSymbols: Record<string, Record<string, boolean>>;
130131
contextSymbols: Record<string, string>;
132+
stage: 0 | 1 | 2 | 3;
131133
}
132134
}
133135

@@ -495,11 +497,11 @@ export const getFinalQueryResultMulti = (transformDataArr: Object[], rows: any[]
495497
return native.getFinalQueryResultMulti(transformDataArr, rows, responseData);
496498
};
497499

498-
export const transpileJs = async (content: String, metadata: TransformConfig): Promise<TransformResponse> => {
500+
export const transpileJs = async (transpileRequests: TransformConfig[]): Promise<TransformResponse[]> => {
499501
const native = loadNative();
500502

501503
if (native.transpileJs) {
502-
return native.transpileJs(content, metadata);
504+
return native.transpileJs(transpileRequests);
503505
}
504506

505507
throw new Error('TranspileJs native implementation not found!');

packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js

Lines changed: 94 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import crypto from 'crypto';
22
import vm from 'vm';
33
import fs from 'fs';
4+
import os from 'os';
45
import path from 'path';
56
import syntaxCheck from 'syntax-error';
67
import { parse } from '@babel/parser';
@@ -21,6 +22,21 @@ const moduleFileCache = {};
2122

2223
const JINJA_SYNTAX = /{%|%}|{{|}}/ig;
2324

25+
const getThreadsCount = () => {
26+
const envThreads = getEnv('transpilationWorkerThreadsCount');
27+
if (envThreads > 0) {
28+
return envThreads;
29+
}
30+
31+
const cpuCount = os.cpus()?.length;
32+
if (cpuCount) {
33+
// there's no practical boost above 5 threads even if you have more cores.
34+
return Math.min(Math.max(1, cpuCount - 1), 5);
35+
}
36+
37+
return 3; // Default (like the workerpool do)
38+
};
39+
2440
export class DataSchemaCompiler {
2541
constructor(repository, options = {}) {
2642
this.repository = repository;
@@ -100,6 +116,7 @@ export class DataSchemaCompiler {
100116

101117
const transpilationWorkerThreads = getEnv('transpilationWorkerThreads');
102118
const transpilationNative = getEnv('transpilationNative');
119+
const transpilationNativeThreadsCount = getThreadsCount();
103120
const { compilerId } = this;
104121

105122
if (!transpilationNative && transpilationWorkerThreads) {
@@ -110,7 +127,11 @@ export class DataSchemaCompiler {
110127
);
111128
}
112129

113-
const transpile = async () => {
130+
/**
131+
* @param stage Number
132+
* @returns {Promise<*>}
133+
*/
134+
const transpile = async (stage) => {
114135
let cubeNames;
115136
let cubeSymbols;
116137
let transpilerNames;
@@ -145,9 +166,28 @@ export class DataSchemaCompiler {
145166
content: ';',
146167
};
147168

148-
await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS, compilerId });
169+
await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS, compilerId, stage });
170+
171+
const nonJsFilesTasks = toCompile.filter(file => !file.fileName.endsWith('.js'))
172+
.map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId }));
173+
174+
const jsFiles = toCompile.filter(file => file.fileName.endsWith('.js'));
175+
let jsChunks;
176+
if (jsFiles.length < transpilationNativeThreadsCount * transpilationNativeThreadsCount) {
177+
jsChunks = [jsFiles];
178+
} else {
179+
const baseSize = Math.floor(jsFiles.length / transpilationNativeThreadsCount);
180+
jsChunks = [];
181+
for (let i = 0; i < transpilationNativeThreadsCount; i++) {
182+
// For the last part, we take the remaining files so we don't lose the extra ones.
183+
const start = i * baseSize;
184+
const end = (i === transpilationNativeThreadsCount - 1) ? jsFiles.length : start + baseSize;
185+
jsChunks.push(jsFiles.slice(start, end));
186+
}
187+
}
188+
const JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { transpilerNames, compilerId }));
149189

150-
results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId })));
190+
results = (await Promise.all([...nonJsFilesTasks, ...JsFilesTasks])).flat();
151191
} else if (transpilationWorkerThreads) {
152192
results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbols, transpilerNames })));
153193
} else {
@@ -157,17 +197,17 @@ export class DataSchemaCompiler {
157197
return results.filter(f => !!f);
158198
};
159199

160-
const compilePhase = async (compilers) => this.compileCubeFiles(compilers, await transpile(), errorsReport);
200+
const compilePhase = async (compilers, stage) => this.compileCubeFiles(compilers, await transpile(stage), errorsReport);
161201

162-
return compilePhase({ cubeCompilers: this.cubeNameCompilers })
163-
.then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers.concat([this.viewCompilationGate]) }))
202+
return compilePhase({ cubeCompilers: this.cubeNameCompilers }, 0)
203+
.then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers.concat([this.viewCompilationGate]) }, 1))
164204
.then(() => (this.viewCompilationGate.shouldCompileViews() ?
165-
compilePhase({ cubeCompilers: this.viewCompilers })
205+
compilePhase({ cubeCompilers: this.viewCompilers }, 2)
166206
: Promise.resolve()))
167207
.then(() => compilePhase({
168208
cubeCompilers: this.cubeCompilers,
169209
contextCompilers: this.contextCompilers,
170-
}))
210+
}, 3))
171211
.then(() => {
172212
if (transpilationNative) {
173213
// Clean up cache
@@ -179,7 +219,7 @@ export class DataSchemaCompiler {
179219
return this.transpileJsFile(
180220
dummyFile,
181221
errorsReport,
182-
{ cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {}, compilerId: this.compilerId }
222+
{ cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {}, compilerId: this.compilerId, stage: 0 }
183223
);
184224
} else if (transpilationWorkerThreads && this.workerPool) {
185225
this.workerPool.terminate();
@@ -227,29 +267,69 @@ export class DataSchemaCompiler {
227267
}
228268
}
229269

230-
async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId }) {
270+
/**
271+
* Right now it is used only for transpilation in native,
272+
* so no checks for transpilation type inside this method
273+
*/
274+
async transpileJsFilesBulk(files, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) {
275+
// for bulk processing this data may be optimized even more by passing transpilerNames, compilerId only once for a bulk
276+
// but this requires more complex logic to be implemented in the native side.
277+
// And comparing to the file content sizes, a few bytes of JSON data is not a big deal here
278+
const reqDataArr = files.map(file => ({
279+
fileName: file.fileName,
280+
fileContent: file.content,
281+
transpilers: transpilerNames,
282+
compilerId,
283+
...(cubeNames && {
284+
metaData: {
285+
cubeNames,
286+
cubeSymbols,
287+
contextSymbols,
288+
stage
289+
},
290+
}),
291+
}));
292+
const res = await transpileJs(reqDataArr);
293+
294+
return files.map((file, index) => {
295+
errorsReport.inFile(file);
296+
if (!res[index]) { // This should not happen in theory but just to be safe
297+
errorsReport.error(`No transpilation result received for the file ${file.fileName}.`);
298+
return undefined;
299+
}
300+
errorsReport.addErrors(res[index].errors);
301+
errorsReport.addWarnings(res[index].warnings);
302+
errorsReport.exitFile();
303+
304+
return { ...file, content: res[index].code };
305+
});
306+
}
307+
308+
async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) {
231309
try {
232310
if (getEnv('transpilationNative')) {
233311
const reqData = {
234312
fileName: file.fileName,
313+
fileContent: file.content,
235314
transpilers: transpilerNames,
236315
compilerId,
237316
...(cubeNames && {
238317
metaData: {
239318
cubeNames,
240319
cubeSymbols,
241320
contextSymbols,
321+
stage
242322
},
243323
}),
244324
};
245325

246326
errorsReport.inFile(file);
247-
const res = await transpileJs(file.content, reqData);
248-
errorsReport.addErrors(res.errors);
249-
errorsReport.addWarnings(res.warnings);
327+
const res = await transpileJs([reqData]);
328+
errorsReport.addErrors(res[0].errors);
329+
errorsReport.addWarnings(res[0].warnings);
250330
errorsReport.exitFile();
251331

252-
return { ...file, content: res.code };
332+
return { ...file, content: res[0].code };
253333
} else if (getEnv('transpilationWorkerThreads')) {
254334
const data = {
255335
fileName: file.fileName,

0 commit comments

Comments
 (0)