Skip to content

Commit b5881cd

Browse files
committed
return back previous transpilationWorkerThreads flow
1 parent 49f49a5 commit b5881cd

File tree

7 files changed

+60
-12
lines changed

7 files changed

+60
-12
lines changed

packages/cubejs-backend-native/src/transpilers.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ pub fn transpile_js(mut cx: FunctionContext) -> JsResult<JsPromise> {
3939
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_js_object);
4040
let transform_request_config = TransformRequestConfig::deserialize(deserializer);
4141

42-
println!("\ntransform_config {:?}\n", transform_config);
43-
4442
let promise = cx
4543
.task(move || {
4644
let transform_config: TransformConfig = match transform_request_config {

packages/cubejs-backend-shared/src/env.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@ const variables: Record<string, (...args: any) => any> = {
229229
transpilationWorkerThreadsCount: () => get('CUBEJS_TRANSPILATION_WORKER_THREADS_COUNT')
230230
.default('0')
231231
.asInt(),
232+
// This one takes precedence over CUBEJS_TRANSPILATION_WORKER_THREADS
233+
transpilationNative: () => get('CUBEJS_TRANSPILATION_NATIVE')
234+
.default('false')
235+
.asBoolStrict(),
232236

233237
/** ****************************************************************
234238
* Common db options *

packages/cubejs-schema-compiler/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
"node-dijkstra": "^2.5.0",
5555
"ramda": "^0.27.2",
5656
"syntax-error": "^1.3.0",
57-
"uuid": "^8.3.2"
57+
"uuid": "^8.3.2",
58+
"workerpool": "^9.2.0"
5859
},
5960
"devDependencies": {
6061
"@clickhouse/client": "^1.7.0",

packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { parse } from '@babel/parser';
66
import babelGenerator from '@babel/generator';
77
import babelTraverse from '@babel/traverse';
88
import R from 'ramda';
9+
import workerpool from 'workerpool';
910

1011
import { getEnv, isNativeSupported } from '@cubejs-backend/shared';
1112
import { transpileJs } from '@cubejs-backend/native';
@@ -94,16 +95,32 @@ export class DataSchemaCompiler {
9495
const errorsReport = new ErrorReporter(null, [], this.errorReport);
9596
this.errorsReport = errorsReport;
9697

98+
const transpilationWorkerThreads = getEnv('transpilationWorkerThreads');
99+
const transpilationNative = getEnv('transpilationNative');
100+
101+
if (!transpilationNative && transpilationWorkerThreads) {
102+
const wc = getEnv('transpilationWorkerThreadsCount');
103+
this.workerPool = workerpool.pool(
104+
path.join(__dirname, 'transpilers/transpiler_worker'),
105+
wc > 0 ? { maxWorkers: wc } : undefined,
106+
);
107+
}
108+
97109
const transpile = async () => {
98110
let cubeNames;
99111
let cubeSymbols;
100112
let transpilerNames;
113+
let results;
101114

102-
if (getEnv('transpilationWorkerThreads')) {
115+
if (transpilationNative || transpilationWorkerThreads) {
103116
cubeNames = Object.keys(this.cubeDictionary.byId);
104117
// We need only cubes and all its member names for transpiling.
105118
// Cubes doesn't change during transpiling, but are changed during compilation phase,
106119
// so we can prepare them once for every phase.
120+
// Communication between main and worker threads uses
121+
// The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
122+
// which doesn't allow passing any function objects, so we need to sanitize the symbols.
123+
// Communication with native backend also involves deserialization.
107124
cubeSymbols = Object.fromEntries(
108125
Object.entries(this.cubeSymbols.symbols)
109126
.map(
@@ -112,7 +129,9 @@ export class DataSchemaCompiler {
112129
)],
113130
),
114131
);
132+
}
115133

134+
if (transpilationNative) {
116135
// Transpilers are the same for all files within phase.
117136
transpilerNames = this.transpilers.map(t => t.constructor.name);
118137

@@ -123,9 +142,14 @@ export class DataSchemaCompiler {
123142
};
124143

125144
await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS });
145+
146+
results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { transpilerNames })));
147+
} else if (transpilationWorkerThreads) {
148+
results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbols })));
149+
} else {
150+
results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, {})));
126151
}
127152

128-
const results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { transpilerNames })));
129153
return results.filter(f => !!f);
130154
};
131155

@@ -141,14 +165,16 @@ export class DataSchemaCompiler {
141165
contextCompilers: this.contextCompilers,
142166
}))
143167
.then(() => {
144-
if (getEnv('transpilationWorkerThreads')) {
168+
if (transpilationNative) {
145169
// Clean up cache
146170
const dummyFile = {
147171
fileName: 'terminate.js',
148172
content: ';',
149173
};
150174

151175
return this.transpileJsFile(dummyFile, errorsReport, { cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {} });
176+
} else if (transpilationWorkerThreads && this.workerPool) {
177+
this.workerPool.terminate();
152178
}
153179

154180
return Promise.resolve();
@@ -195,7 +221,7 @@ export class DataSchemaCompiler {
195221

196222
async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames }) {
197223
try {
198-
if (getEnv('transpilationWorkerThreads')) {
224+
if (getEnv('transpilationNative')) {
199225
const reqData = {
200226
fileName: file.fileName,
201227
transpilers: transpilerNames,
@@ -215,6 +241,20 @@ export class DataSchemaCompiler {
215241
errorsReport.exitFile();
216242

217243
return Object.assign({}, file, { content: res.code });
244+
} else if (getEnv('transpilationWorkerThreads')) {
245+
const data = {
246+
fileName: file.fileName,
247+
content: file.content,
248+
transpilers: this.transpilers.map(t => t.constructor.name),
249+
cubeNames,
250+
cubeSymbols,
251+
};
252+
253+
const res = await this.workerPool.exec('transpile', [data]);
254+
errorsReport.addErrors(res.errors);
255+
errorsReport.addWarnings(res.warnings);
256+
257+
return Object.assign({}, file, { content: res.content });
218258
} else {
219259
const ast = parse(
220260
file.content,

packages/cubejs-schema-compiler/src/compiler/ErrorReporter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export class ErrorReporter {
108108
}
109109

110110
public error(e: any, fileName?: any, lineNumber?: any, position?: any) {
111-
const message = `${this.context.length ? `${this.context.join(' -> ')}: ` : ''}${e instanceof UserError ? e.message : (e.stack || e)}`;
111+
const message = `${this.context.length ? `${this.context.join(' -> ')}: ` : ''}${e.message ? e.message : (e.stack || e)}`;
112112
if (this.rootReporter().errors.find(m => (m.message || m) === message)) {
113113
return;
114114
}

packages/cubejs-schema-compiler/src/compiler/transpilers/transpiler_worker.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type TransferContent = {
1616
content: string;
1717
transpilers: string[];
1818
cubeNames: string[];
19-
cubeSymbolsNames: Record<string, Record<string, boolean>>;
19+
cubeSymbols: Record<string, Record<string, boolean>>;
2020
};
2121

2222
const cubeDictionary = new LightweightNodeCubeDictionary();
@@ -32,7 +32,7 @@ const transpilers = {
3232

3333
const transpile = (data: TransferContent) => {
3434
cubeDictionary.setCubeNames(data.cubeNames);
35-
cubeSymbols.setSymbols(data.cubeSymbolsNames);
35+
cubeSymbols.setSymbols(data.cubeSymbols);
3636

3737
const ast = parse(
3838
data.content,
@@ -43,15 +43,15 @@ const transpile = (data: TransferContent) => {
4343
},
4444
);
4545

46+
errorsReport.inFile(data);
4647
data.transpilers.forEach(transpilerName => {
4748
if (transpilers[transpilerName]) {
48-
errorsReport.inFile(data);
4949
babelTraverse(ast, transpilers[transpilerName].traverseObject(errorsReport));
50-
errorsReport.exitFile();
5150
} else {
5251
throw new Error(`Transpiler ${transpilerName} not supported`);
5352
}
5453
});
54+
errorsReport.exitFile();
5555

5656
const content = babelGenerator(ast, {}, data.content).code;
5757

yarn.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29554,6 +29554,11 @@ wordwrap@^1.0.0:
2955429554
resolved "https://registry.yarnpkg.com/wordwrap/-/wordwrap-1.0.0.tgz#27584810891456a4171c8d0226441ade90cbcaeb"
2955529555
integrity sha1-J1hIEIkUVqQXHI0CJkQa3pDLyus=
2955629556

29557+
workerpool@^9.2.0:
29558+
version "9.2.0"
29559+
resolved "https://registry.yarnpkg.com/workerpool/-/workerpool-9.2.0.tgz#f74427cbb61234708332ed8ab9cbf56dcb1c4371"
29560+
integrity sha512-PKZqBOCo6CYkVOwAxWxQaSF2Fvb5Iv2fCeTP7buyWI2GiynWr46NcXSgK/idoV6e60dgCBfgYc+Un3HMvmqP8w==
29561+
2955729562
"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0":
2955829563
version "7.0.0"
2955929564
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43"

0 commit comments

Comments
 (0)