Skip to content

Commit 9223f85

Browse files
committed
use workerpool pkg
1 parent e2c6177 commit 9223f85

File tree

4 files changed

+76
-64
lines changed

4 files changed

+76
-64
lines changed

packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { parse } from '@babel/parser';
77
import babelGenerator from '@babel/generator';
88
import babelTraverse from '@babel/traverse';
99
import R from 'ramda';
10+
import workerpool from 'workerpool';
1011

1112
import { getEnv, isNativeSupported } from '@cubejs-backend/shared';
1213
import { UserError } from './UserError';
@@ -43,6 +44,7 @@ export class DataSchemaCompiler {
4344
this.yamlCompiler = options.yamlCompiler;
4445
this.yamlCompiler.dataSchemaCompiler = this;
4546
this.pythonContext = null;
47+
this.workerPool = null;
4648
}
4749

4850
compileObjects(compileServices, objects, errorsReport) {
@@ -92,13 +94,19 @@ export class DataSchemaCompiler {
9294
const errorsReport = new ErrorReporter(null, [], this.errorReport);
9395
this.errorsReport = errorsReport;
9496

97+
if (getEnv('workerThreadsTranspilation')) {
98+
this.workerPool = workerpool.pool(path.join(__dirname, 'transpilers/transpiler_worker'));
99+
}
100+
95101
const transpile = async () => {
96102
let cubeNames;
97103
let cubeSymbolsNames;
98104

99105
if (getEnv('workerThreadsTranspilation')) {
100106
cubeNames = Object.keys(this.cubeDictionary.byId);
101107
// We need only cubes and all its member names for transpiling.
108+
// Cubes doesn't change during transpiling, but are changed during compilation phase,
109+
// so we can prepare them once for every phase.
102110
// Communication between main and worker threads uses
103111
// The structured clone algorithm (@see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
104112
// which doesn't allow passing any function objects, so we need to sanitize the symbols.
@@ -133,7 +141,12 @@ export class DataSchemaCompiler {
133141
.then(() => compilePhase({
134142
cubeCompilers: this.cubeCompilers,
135143
contextCompilers: this.contextCompilers,
136-
}));
144+
}))
145+
.then(() => {
146+
if (this.workerPool) {
147+
this.workerPool.terminate();
148+
}
149+
});
137150
}
138151

139152
compile() {
@@ -174,34 +187,18 @@ export class DataSchemaCompiler {
174187
}
175188
}
176189

177-
async transpileJsFile(file, errorsReport, options) {
190+
async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbolsNames }) {
178191
try {
179192
if (getEnv('workerThreadsTranspilation')) {
180-
const res = await new Promise((resolve, reject) => {
181-
const { cubeNames, cubeSymbolsNames } = options;
182-
183-
const worker = new Worker(
184-
path.join(__dirname, 'transpilers/transpiler_worker'),
185-
{ workerData: { cubeNames, cubeSymbolsNames } },
186-
);
187-
188-
const data = {
189-
fileName: file.fileName,
190-
content: file.content,
191-
transpilers: this.transpilers.map(t => t.constructor.name),
192-
};
193-
194-
worker.postMessage(data);
195-
196-
worker.on('message', resolve);
197-
worker.on('error', reject);
198-
worker.on('exit', (code) => {
199-
if (code !== 0) {
200-
reject(new Error(`Worker thread exited with code ${code}`));
201-
}
202-
});
203-
});
204-
193+
const data = {
194+
fileName: file.fileName,
195+
content: file.content,
196+
transpilers: this.transpilers.map(t => t.constructor.name),
197+
cubeNames,
198+
cubeSymbolsNames,
199+
};
200+
201+
const res = await this.workerPool.exec('transpile', [data]);
205202
errorsReport.addErrors(res.errors);
206203
errorsReport.addWarnings(res.warnings);
207204

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import { TranspilerCubeResolver } from './transpiler.interface';
22

33
export class LightweightNodeCubeDictionary implements TranspilerCubeResolver {
4-
public constructor(private readonly cubeNames: string[] = []) {
4+
public constructor(private cubeNames: string[] = []) {
55
}
66

77
public resolveCube(name: string): boolean {
88
return this.cubeNames.includes(name);
99
}
10+
11+
public setCubeNames(cubeNames: string[]): void {
12+
this.cubeNames = cubeNames;
13+
}
1014
}

packages/cubejs-schema-compiler/src/compiler/transpilers/LightweightSymbolResolver.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
import { TranspilerSymbolResolver } from './transpiler.interface';
22
import { CONTEXT_SYMBOLS, CURRENT_CUBE_CONSTANTS } from '../CubeSymbols';
33

4+
type CubeSymbols = Record<string, Record<string, boolean>>;
5+
46
export class LightweightSymbolResolver implements TranspilerSymbolResolver {
5-
public constructor(private readonly symbols: any) {
7+
public constructor(private symbols: CubeSymbols = {}) {
8+
}
9+
10+
public setSymbols(symbols: CubeSymbols) {
11+
this.symbols = symbols;
612
}
713

814
public isCurrentCube(name): boolean {
Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import { parentPort, workerData } from 'worker_threads';
2-
1+
import workerpool from 'workerpool';
32
import { parse } from '@babel/parser';
43
import babelGenerator from '@babel/generator';
54
import babelTraverse from '@babel/traverse';
@@ -12,14 +11,16 @@ import { ErrorReporter } from '../ErrorReporter';
1211
import { LightweightSymbolResolver } from './LightweightSymbolResolver';
1312
import { LightweightNodeCubeDictionary } from './LightweightNodeCubeDictionary';
1413

15-
type FileContent = {
14+
type TransferContent = {
1615
fileName: string;
1716
content: string;
1817
transpilers: string[];
18+
cubeNames: string[];
19+
cubeSymbolsNames: Record<string, Record<string, boolean>>;
1920
};
2021

21-
const cubeDictionary = new LightweightNodeCubeDictionary(workerData.cubeNames);
22-
const cubeSymbols = new LightweightSymbolResolver(workerData.cubeSymbolsNames);
22+
const cubeDictionary = new LightweightNodeCubeDictionary();
23+
const cubeSymbols = new LightweightSymbolResolver();
2324
const errorsReport = new ErrorReporter(null, []);
2425

2526
const transpilers = {
@@ -29,34 +30,38 @@ const transpilers = {
2930
CubePropContextTranspiler: new CubePropContextTranspiler(cubeSymbols, cubeDictionary, cubeSymbols),
3031
};
3132

32-
if (parentPort) {
33-
parentPort.on('message', (file: FileContent) => {
34-
const ast = parse(
35-
file.content,
36-
{
37-
sourceFilename: file.fileName,
38-
sourceType: 'module',
39-
plugins: ['objectRestSpread']
40-
},
41-
);
42-
43-
file.transpilers.forEach(transpilerName => {
44-
if (transpilers[transpilerName]) {
45-
errorsReport.inFile(file);
46-
babelTraverse(ast, transpilers[transpilerName].traverseObject(errorsReport));
47-
errorsReport.exitFile();
48-
} else {
49-
throw new Error(`Transpiler ${transpilerName} not supported`);
50-
}
51-
});
52-
53-
const content = babelGenerator(ast, {}, file.content).code;
54-
55-
// @ts-ignore
56-
parentPort.postMessage({
57-
content,
58-
errors: errorsReport.getErrors(),
59-
warnings: errorsReport.getWarnings()
60-
});
33+
const transpile = (data: TransferContent) => {
34+
cubeDictionary.setCubeNames(data.cubeNames);
35+
cubeSymbols.setSymbols(data.cubeSymbolsNames);
36+
37+
const ast = parse(
38+
data.content,
39+
{
40+
sourceFilename: data.fileName,
41+
sourceType: 'module',
42+
plugins: ['objectRestSpread']
43+
},
44+
);
45+
46+
data.transpilers.forEach(transpilerName => {
47+
if (transpilers[transpilerName]) {
48+
errorsReport.inFile(data);
49+
babelTraverse(ast, transpilers[transpilerName].traverseObject(errorsReport));
50+
errorsReport.exitFile();
51+
} else {
52+
throw new Error(`Transpiler ${transpilerName} not supported`);
53+
}
6154
});
62-
}
55+
56+
const content = babelGenerator(ast, {}, data.content).code;
57+
58+
return {
59+
content,
60+
errors: errorsReport.getErrors(),
61+
warnings: errorsReport.getWarnings()
62+
};
63+
};
64+
65+
workerpool.worker({
66+
transpile,
67+
});

0 commit comments

Comments
 (0)