Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/cubejs-backend-native/js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,14 @@ export type SQLInterfaceOptions = {

export interface TransformConfig {
fileName: string;
fileContent: string;
transpilers: string[];
compilerId: string;
metaData?: {
cubeNames: string[];
cubeSymbols: Record<string, Record<string, boolean>>;
contextSymbols: Record<string, string>;
stage: 0 | 1 | 2 | 3;
}
}

Expand Down Expand Up @@ -495,11 +497,11 @@ export const getFinalQueryResultMulti = (transformDataArr: Object[], rows: any[]
return native.getFinalQueryResultMulti(transformDataArr, rows, responseData);
};

export const transpileJs = async (content: String, metadata: TransformConfig): Promise<TransformResponse> => {
export const transpileJs = async (transpileRequests: TransformConfig[]): Promise<TransformResponse[]> => {
const native = loadNative();

if (native.transpileJs) {
return native.transpileJs(content, metadata);
return native.transpileJs(transpileRequests);
}

throw new Error('TranspileJs native implementation not found!');
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-query-orchestrator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"@cubejs-backend/shared": "1.2.27",
"csv-write-stream": "^2.0.0",
"generic-pool": "^3.8.2",
"lru-cache": "^5.1.1",
"lru-cache": "^11.1.0",
"ramda": "^0.27.2"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { getEnv, } from '@cubejs-backend/shared';

import { BaseDriver, InlineTable, } from '@cubejs-backend/base-driver';
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import LRUCache from 'lru-cache';
import { LRUCache } from 'lru-cache';

import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryWithParams } from './QueryCache';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
Expand Down Expand Up @@ -282,8 +282,8 @@ export class PreAggregations {
this.getQueueEventsBus = options.getQueueEventsBus;
this.touchCache = new LRUCache({
max: getEnv('touchPreAggregationCacheMaxCount'),
maxAge: getEnv('touchPreAggregationCacheMaxAge') * 1000,
stale: false,
ttl: getEnv('touchPreAggregationCacheMaxAge') * 1000,
allowStale: false,
updateAgeOnGet: false
});
}
Expand Down Expand Up @@ -330,7 +330,7 @@ export class PreAggregations {
this.touchTablePersistTime
);
} catch (e: unknown) {
this.touchCache.del(tableName);
this.touchCache.delete(tableName);

throw e;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import crypto from 'crypto';
import csvWriter from 'csv-write-stream';
import LRUCache from 'lru-cache';
import { LRUCache } from 'lru-cache';
import { pipeline } from 'stream';
import { getEnv, MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared';
import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
Expand Down Expand Up @@ -909,7 +909,7 @@ export class QueryCache {
inMemoryValue.renewalKey !== renewalKey
) || renewedAgo > expiration * 1000 || renewedAgo > inMemoryCacheDisablePeriod
) {
this.memoryCache.del(redisKey);
this.memoryCache.delete(redisKey);
} else {
this.logger('Found in memory cache entry', {
cacheKey,
Expand Down
3 changes: 1 addition & 2 deletions packages/cubejs-schema-compiler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"inflection": "^1.12.0",
"joi": "^17.8.3",
"js-yaml": "^4.1.0",
"lru-cache": "^5.1.1",
"lru-cache": "^11.1.0",
"moment-timezone": "^0.5.46",
"node-dijkstra": "^2.5.0",
"ramda": "^0.27.2",
Expand All @@ -66,7 +66,6 @@
"@types/babel__traverse": "^7.20.5",
"@types/inflection": "^1.5.28",
"@types/jest": "^27",
"@types/lru-cache": "^5.1.0",
"@types/node": "^18",
"@types/ramda": "^0.27.34",
"@types/sqlstring": "^2.3.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import LRUCache from 'lru-cache';
import { LRUCache } from 'lru-cache';
import { QueryCache } from '../adapter/QueryCache';

export class CompilerCache extends QueryCache {
Expand All @@ -11,13 +11,13 @@ export class CompilerCache extends QueryCache {

this.queryCache = new LRUCache({
max: maxQueryCacheSize || 10000,
maxAge: (maxQueryCacheAge * 1000) || 1000 * 60 * 10,
ttl: (maxQueryCacheAge * 1000) || 1000 * 60 * 10,
updateAgeOnGet: true
});

this.rbacCache = new LRUCache({
max: 10000,
maxAge: 1000 * 60 * 5, // 5 minutes
ttl: 1000 * 60 * 5, // 5 minutes
});
}

Expand Down
126 changes: 111 additions & 15 deletions packages/cubejs-schema-compiler/src/compiler/DataSchemaCompiler.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import crypto from 'crypto';
import vm from 'vm';
import fs from 'fs';
import os from 'os';
import path from 'path';
import syntaxCheck from 'syntax-error';
import { parse } from '@babel/parser';
Expand All @@ -20,6 +22,21 @@ const moduleFileCache = {};

const JINJA_SYNTAX = /{%|%}|{{|}}/ig;

const getThreadsCount = () => {
const envThreads = getEnv('transpilationWorkerThreadsCount');
if (envThreads > 0) {
return envThreads;
}

const cpuCount = os.cpus()?.length;
if (cpuCount) {
// there's no practical boost above 5 threads even if you have more cores.
return Math.min(Math.max(1, cpuCount - 1), 5);
}

return 3; // Default (like the workerpool do)
};

export class DataSchemaCompiler {
constructor(repository, options = {}) {
this.repository = repository;
Expand Down Expand Up @@ -47,6 +64,7 @@ export class DataSchemaCompiler {
this.pythonContext = null;
this.workerPool = null;
this.compilerId = options.compilerId;
this.compiledScriptCache = options.compiledScriptCache;
}

compileObjects(compileServices, objects, errorsReport) {
Expand Down Expand Up @@ -98,6 +116,7 @@ export class DataSchemaCompiler {

const transpilationWorkerThreads = getEnv('transpilationWorkerThreads');
const transpilationNative = getEnv('transpilationNative');
const transpilationNativeThreadsCount = getThreadsCount();
const { compilerId } = this;

if (!transpilationNative && transpilationWorkerThreads) {
Expand All @@ -108,7 +127,11 @@ export class DataSchemaCompiler {
);
}

const transpile = async () => {
/**
* @param stage Number
* @returns {Promise<*>}
*/
const transpile = async (stage) => {
let cubeNames;
let cubeSymbols;
let transpilerNames;
Expand Down Expand Up @@ -143,9 +166,28 @@ export class DataSchemaCompiler {
content: ';',
};

await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS, compilerId });
await this.transpileJsFile(dummyFile, errorsReport, { cubeNames, cubeSymbols, transpilerNames, contextSymbols: CONTEXT_SYMBOLS, compilerId, stage });

const nonJsFilesTasks = toCompile.filter(file => !file.fileName.endsWith('.js'))
.map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId }));

const jsFiles = toCompile.filter(file => file.fileName.endsWith('.js'));
let jsChunks;
if (jsFiles.length < transpilationNativeThreadsCount * transpilationNativeThreadsCount) {
jsChunks = [jsFiles];
} else {
const baseSize = Math.floor(jsFiles.length / transpilationNativeThreadsCount);
jsChunks = [];
for (let i = 0; i < transpilationNativeThreadsCount; i++) {
// For the last part, we take the remaining files so we don't lose the extra ones.
const start = i * baseSize;
const end = (i === transpilationNativeThreadsCount - 1) ? jsFiles.length : start + baseSize;
jsChunks.push(jsFiles.slice(start, end));
}
}
const JsFilesTasks = jsChunks.map(chunk => this.transpileJsFilesBulk(chunk, errorsReport, { transpilerNames, compilerId }));

results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { transpilerNames, compilerId })));
results = (await Promise.all([...nonJsFilesTasks, ...JsFilesTasks])).flat();
} else if (transpilationWorkerThreads) {
results = await Promise.all(toCompile.map(f => this.transpileFile(f, errorsReport, { cubeNames, cubeSymbols, transpilerNames })));
} else {
Expand All @@ -155,17 +197,17 @@ export class DataSchemaCompiler {
return results.filter(f => !!f);
};

const compilePhase = async (compilers) => this.compileCubeFiles(compilers, await transpile(), errorsReport);
const compilePhase = async (compilers, stage) => this.compileCubeFiles(compilers, await transpile(stage), errorsReport);

return compilePhase({ cubeCompilers: this.cubeNameCompilers })
.then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers.concat([this.viewCompilationGate]) }))
return compilePhase({ cubeCompilers: this.cubeNameCompilers }, 0)
.then(() => compilePhase({ cubeCompilers: this.preTranspileCubeCompilers.concat([this.viewCompilationGate]) }, 1))
.then(() => (this.viewCompilationGate.shouldCompileViews() ?
compilePhase({ cubeCompilers: this.viewCompilers })
compilePhase({ cubeCompilers: this.viewCompilers }, 2)
: Promise.resolve()))
.then(() => compilePhase({
cubeCompilers: this.cubeCompilers,
contextCompilers: this.contextCompilers,
}))
}, 3))
.then(() => {
if (transpilationNative) {
// Clean up cache
Expand All @@ -177,7 +219,7 @@ export class DataSchemaCompiler {
return this.transpileJsFile(
dummyFile,
errorsReport,
{ cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {}, compilerId: this.compilerId }
{ cubeNames: [], cubeSymbols: {}, transpilerNames: [], contextSymbols: {}, compilerId: this.compilerId, stage: 0 }
);
} else if (transpilationWorkerThreads && this.workerPool) {
this.workerPool.terminate();
Expand Down Expand Up @@ -225,29 +267,69 @@ export class DataSchemaCompiler {
}
}

async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId }) {
/**
* Right now it is used only for transpilation in native,
* so no checks for transpilation type inside this method
*/
async transpileJsFilesBulk(files, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) {
// for bulk processing this data may be optimized even more by passing transpilerNames, compilerId only once for a bulk
// but this requires more complex logic to be implemented in the native side.
// And comparing to the file content sizes, a few bytes of JSON data is not a big deal here
const reqDataArr = files.map(file => ({
fileName: file.fileName,
fileContent: file.content,
transpilers: transpilerNames,
compilerId,
...(cubeNames && {
metaData: {
cubeNames,
cubeSymbols,
contextSymbols,
stage
},
}),
}));
const res = await transpileJs(reqDataArr);

return files.map((file, index) => {
errorsReport.inFile(file);
if (!res[index]) { // This should not happen in theory but just to be safe
errorsReport.error(`No transpilation result received for the file ${file.fileName}.`);
return undefined;
}
errorsReport.addErrors(res[index].errors);
errorsReport.addWarnings(res[index].warnings);
errorsReport.exitFile();

return { ...file, content: res[index].code };
});
}

async transpileJsFile(file, errorsReport, { cubeNames, cubeSymbols, contextSymbols, transpilerNames, compilerId, stage }) {
try {
if (getEnv('transpilationNative')) {
const reqData = {
fileName: file.fileName,
fileContent: file.content,
transpilers: transpilerNames,
compilerId,
...(cubeNames && {
metaData: {
cubeNames,
cubeSymbols,
contextSymbols,
stage
},
}),
};

errorsReport.inFile(file);
const res = await transpileJs(file.content, reqData);
errorsReport.addErrors(res.errors);
errorsReport.addWarnings(res.warnings);
const res = await transpileJs([reqData]);
errorsReport.addErrors(res[0].errors);
errorsReport.addWarnings(res[0].warnings);
errorsReport.exitFile();

return { ...file, content: res.code };
return { ...file, content: res[0].code };
} else if (getEnv('transpilationWorkerThreads')) {
const data = {
fileName: file.fileName,
Expand Down Expand Up @@ -370,6 +452,18 @@ export class DataSchemaCompiler {
}
}

getJsScript(file) {
const cacheKey = crypto.createHash('md5').update(JSON.stringify(file.content)).digest('hex');

if (this.compiledScriptCache.has(cacheKey)) {
return this.compiledScriptCache.get(cacheKey);
}

const script = new vm.Script(file.content, { filename: file.fileName, timeout: 15000 });
this.compiledScriptCache.set(cacheKey, script);
return script;
}

compileJsFile(file, errorsReport, cubes, contexts, exports, asyncModules, toCompile, compiledFiles, { doSyntaxCheck } = { doSyntaxCheck: false }) {
if (doSyntaxCheck) {
// There is no need to run syntax check for data model files
Expand All @@ -382,7 +476,9 @@ export class DataSchemaCompiler {
}

try {
vm.runInNewContext(file.content, {
const script = this.getJsScript(file);

script.runInNewContext({
view: (name, cube) => (
!cube ?
this.cubeFactory({ ...name, fileName: file.fileName, isView: true }) :
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { SchemaFileRepository } from '@cubejs-backend/shared';
import { NativeInstance } from '@cubejs-backend/native';
import { v4 as uuidv4 } from 'uuid';
import { LRUCache } from 'lru-cache';
import vm from 'vm';

import { CubeValidator } from './CubeValidator';
import { DataSchemaCompiler } from './DataSchemaCompiler';
Expand Down Expand Up @@ -32,6 +34,7 @@ export type PrepareCompilerOptions = {
standalone?: boolean;
headCommitId?: string;
adapter?: string;
compiledScriptCache?: LRUCache<string, vm.Script>;
};

export const prepareCompiler = (repo: SchemaFileRepository, options: PrepareCompilerOptions = {}) => {
Expand All @@ -49,6 +52,8 @@ export const prepareCompiler = (repo: SchemaFileRepository, options: PrepareComp
const compilerCache = new CompilerCache({ maxQueryCacheSize, maxQueryCacheAge });
const yamlCompiler = new YamlCompiler(cubeSymbols, cubeDictionary, nativeInstance, viewCompiler);

const compiledScriptCache = options.compiledScriptCache || new LRUCache<string, vm.Script>({ max: 250 });

const transpilers: TranspilerInterface[] = [
new ValidationTranspiler(),
new ImportExportTranspiler(),
Expand All @@ -66,6 +71,7 @@ export const prepareCompiler = (repo: SchemaFileRepository, options: PrepareComp
preTranspileCubeCompilers: [cubeSymbols, cubeValidator],
transpilers,
viewCompilationGate,
compiledScriptCache,
viewCompilers: [viewCompiler],
cubeCompilers: [cubeEvaluator, joinGraph, metaTransformer],
contextCompilers: [contextEvaluator],
Expand Down
Loading
Loading