diff --git a/lib/lbt/utils/PoolDispatcher.js b/lib/lbt/utils/PoolDispatcher.js new file mode 100644 index 000000000..ae5c25670 --- /dev/null +++ b/lib/lbt/utils/PoolDispatcher.js @@ -0,0 +1,493 @@ +import workerpool from "workerpool"; +import os from "node:os"; +import {Buffer} from "node:buffer"; +import {fileURLToPath} from "node:url"; +import {getLogger} from "@ui5/logger"; +import {createResource} from "@ui5/fs/resourceFactory"; +import Resource from "@ui5/fs/Resource"; +import {setTimeout as setTimeoutPromise} from "node:timers/promises"; + +const MIN_WORKERS = 2; +const MAX_WORKERS = 4; +const osCpus = os.cpus().length || 1; +const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); + +export class PoolDispatcher { + #log = getLogger("builder:utils:PoolDispatcher"); + #projectBuilders = []; + #pool; + static #ensureSingleton = false; + static #instance; + + #getPool() { + if (!this.#pool) { + this.#log.verbose( + `Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})` + ); + const workerPath = fileURLToPath( + new URL("./TaskProcessorThread.js", import.meta.url) + ); + this.#pool = workerpool.pool(workerPath, { + workerType: "auto", + maxWorkers, + }); + } + return this.#pool; + } + + constructor() { + if (!PoolDispatcher.#ensureSingleton) { + throw new Error( + "Constructor must not be called! This is a singleton class. Use PoolDispatcher.getInstance()" + ); + } + } + + static getInstance() { + if (!PoolDispatcher.#instance) { + PoolDispatcher.#ensureSingleton = true; + PoolDispatcher.#instance = new PoolDispatcher(); + PoolDispatcher.#ensureSingleton = false; + } + + return PoolDispatcher.#instance; + } + + getProcessor(modulePath) { + return { + execute: async (methodName, args) => { + const {resources, fs, options} = args; + const buildUpArgs = {modulePath, methodName, args: {options: await serializeData(options)}}; + const useTransfers = !!fs; // TODO: Workaround- themeBuild uses fs, while minify- not + let toTransfer; + let threadMessageHandler; + let fsInterfaceMainPort; + + if (useTransfers) { + const {port1, port2} = new MessageChannel(); + fsInterfaceMainPort = port1; + buildUpArgs.args.fs = port2; + toTransfer = {transfer: [port2]}; + + threadMessageHandler = new FsMainThreadInterface(fs); + threadMessageHandler.startCommunication(fsInterfaceMainPort); + } + + if (resources) { + buildUpArgs.args.resources = await serializeResources(resources); + } + + const result = await this.#getPool().exec("execInThread", [buildUpArgs], toTransfer); + + if (useTransfers) { + threadMessageHandler.endCommunication(fsInterfaceMainPort); + } + + return deserializeData(result); + } + }; + } + + async cleanup(project) { + const attemptPoolTermination = async () => { + if (this.#projectBuilders.length) { + this.#log.verbose( + `Pool termination canceled. Still pending projects to build: ${this.#projectBuilders.length}` + ); + return; + } + + this.#log.verbose(`Attempt to terminate the workerpool...`); + + if (!this.#pool) { + this.#log.verbose( + "Pool termination requested, but a pool has not been initialized or already has been terminated." + ); + return; + } + + // There are many stats that could be used, but these ones seem the most + // convenient. When all the (available) workers are idle, then it's safe to terminate. + // There are many stats that could be used, but these ones seem the most + // convenient. When all the (available) workers are idle, then it's safe to terminate. + let {idleWorkers, totalWorkers} = this.#pool.stats(); + while (idleWorkers !== totalWorkers) { + await setTimeoutPromise(100); // Wait a bit workers to finish and try again + ({idleWorkers, totalWorkers} = this.#pool.stats()); + } + + return this.terminateTasks(/* terminate gracefully */); + }; + + if (project) { + const projectIndex = this.#projectBuilders.indexOf(project); + this.#projectBuilders.splice(projectIndex, 1); + } + + return attemptPoolTermination(); + } + + async terminateTasks(force) { + if (!this.#pool) { + this.#log.verbose("Pool termination requested, but a pool has not been initialized"); + return; + } + + this.#projectBuilders = []; + const pool = this.#pool; + this.#pool = null; + return pool.terminate(force); + } + + registerProjectBuilder(project) { + this.#projectBuilders.push(project); + } + + getQueuedProjectBuilders() { + return this.#projectBuilders; + } +} + +/** + * Casts @ui5/fs/Resource-s into an Uint8Array transferable object + * + * @param {@ui5/fs/Resource[]} resourceCollection + * @returns {Promise} + */ +export async function serializeResources(resourceCollection) { + return Promise.all( + resourceCollection.map(async (res) => ({ + buffer: await res.getBuffer(), + path: res.getPath() + })) + ); +} + +/** + * Casts Uint8Array into @ui5/fs/Resource-s transferable object + * + * @param {Promise} resources + * @returns {@ui5/fs/Resource[]} + */ +export function deserializeResources(resources) { + return resources.map((res) => { + // res.buffer is an Uint8Array object and needs to be cast + // to a Buffer in order to be read correctly. + return createResource({path: res.path, buffer: Buffer.from(res.buffer)}); + }); +} + +function isPojo(obj) { + const proto = Object.prototype; + const gpo = Object.getPrototypeOf; + + if (obj === null || typeof obj !== "object") { + return false; + } + return gpo(obj) === proto; +} + +function isFsResourceLikeTransfer(input) { + return isPojo(input) && + input["buffer"] && (Buffer.isBuffer(input.buffer) || ArrayBuffer.isView(input.buffer)) && + input["path"] && typeof input["path"] === "string"; +} + +export async function serializeData(input) { + if (Array.isArray(input) || isPojo(input)) { + for (const prop in input) { + if (Object.hasOwn(input, prop)) { + input[prop] = await serializeData(input[prop]); + } + } + } else if (input instanceof Resource) { + return (await serializeResources([input]))[0]; + } + + return input; +} + +export async function deserializeData(input) { + // Resource like transferrable object that could be converted to a @ui5/fs/Resource + if (isFsResourceLikeTransfer(input)) { + return (await deserializeResources([input]))[0]; + } else if (Array.isArray(input) || isPojo(input)) { + for (const prop in input) { + if (Object.hasOwn(input, prop)) { + input[prop] = await deserializeData(input[prop]); + } + } + } + + return input; +} + +class AbstractMain { + _comPorts = new Set(); + _collection = null; + _cache = Object.create(null); + + /** + * Constructor + * + * @param {object} collection + */ + constructor(collection) { + if (!collection) { + throw new Error("collection is mandatory argument"); + } + + this._collection = collection; + } + + /** + * Adds MessagePort and starts listening for requests on it. + * + * @param {MessagePort} comPort port1 from a {code}MessageChannel{/code} + */ + startCommunication(comPort) { + if (!comPort) { + throw new Error("Communication channel is mandatory argument"); + } + + this._comPorts.add(comPort); + comPort.on("message", (e) => this.#onMessage(e, comPort)); + comPort.on("close", () => comPort.close()); + } + + /** + * Ends MessagePort communication. + * + * @param {MessagePort} comPort port1 to remove from handling. + */ + endCommunication(comPort) { + comPort.close(); + this._comPorts.delete(comPort); + } + + /** + * Destroys the FsMainThreadInterface + */ + cleanup() { + this._comPorts.forEach((comPort) => comPort.close()); + this._cache = null; + this._collection = null; + } + + /** + * Handles messages from the MessagePort + * + * @param {object} e data to construct the request + * @param {string} e.action Action to perform. Corresponds to the names of + * the public methods of "@ui5/fs/fsInterface" + * @param {string} e.fsPath Path of the Resource + * @param {object} e.options Options for "readFile" action + * @param {MessagePort} comPort The communication channel + */ + async #onMessage(e, comPort) { + const {action, args, key: cacheKey} = e; + + if (!this._cache[cacheKey]) { + this._cache[cacheKey] = this.get(action, args); + } + + const fromCache = await this._cache[cacheKey]; + comPort.postMessage({action, key: cacheKey, ...fromCache}); + } + + get(method) { + throw new Error(`${method} method's handler has to be implemented`); + } +} + +class AbstractThread { + #comPort = null; + #callbacks = []; + #cache = Object.create(null); + + /** + * Constructor + * + * @param {MessagePort} comPort Communication port + */ + constructor(comPort) { + if (!comPort) { + throw new Error("Communication port is mandatory argument"); + } + + this.#comPort = comPort; + comPort.on("message", this.#onMessage.bind(this)); + comPort.on("close", this.#onClose.bind(this)); + } + + /** + * Handles messages from MessagePort + * + * @param {object} e + * @param {string} e.action Action to perform. Corresponds to the names of + * the public methods of "@ui5/fs/fsInterface" + * @param {string} e.fsPath Path of the Resource + * @param {*} e.result Response from the "action". + * @param {object} e.error Error from the "action". + */ + #onMessage(e) { + const cbObject = this.#callbacks.find((cb) => cb.key === e.key); + + if (cbObject) { + this.#cache[e.key] = { + error: e.error, + result: e.result, + }; + this.#callbacks.splice(this.#callbacks.indexOf(cbObject), 1); + cbObject.callback(e.error, e.result); + } else { + throw new Error( + "No callback found for this message! Possible hang for the thread!", + e + ); + } + } + + /** + * End communication + */ + #onClose() { + this.#comPort.close(); + this.#cache = null; + } + + /** + * Makes a request via the MessagePort + * + * @param {object} parameters + * @param {string} parameters.action Action to perform. Corresponds to the names of + * the public methods. + * @param {string} parameters.key + * @param {object} parameters.args + * @param {Function} callback Callback to call when the "action" is executed and ready. + */ + _doRequest({action, key, args}, callback) { + // fsPath, options + if (this.#cache[key]) { + const {result, error} = this.#cache[key]; + callback(error, result); + } else { + this.#callbacks.push({key, callback}); + this.#comPort.postMessage({action, key, args}); + } + } +} + +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * sends the results to a MessagePort. + * + * Used in the main thread in a combination with FsWorkerThreadInterface. + */ +export class FsMainThreadInterface extends AbstractMain { + constructor(fsInterfacePort) { + super(fsInterfacePort); + } + + #parseResults(method, result) { + // Stats object cannot be sent over postMessage. + // Cast it to simple object that is alike the stats. + if (method === "stat" && !!result) { + return JSON.parse(JSON.stringify(result)); + } else { + return result; + } + } + + get(method, args) { + const {fsPath, options} = args; + const composedArgs = [fsPath, options].filter(($) => $ !== undefined); + + return new Promise((resolve) => { + this._collection[method](...composedArgs, (error, result) => { + resolve({error, result: this.#parseResults(method, result)}); + }); + }); + } +} + +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * requests resources via MessagePort. + * + * Used in the worker thread in a combination with FsMainThreadInterface. + */ +export class FsWorkerThreadInterface extends AbstractThread { + readFile(fsPath, options, callback) { + const key = `${fsPath}-readFile`; + this._doRequest({action: "readFile", key, args: {fsPath, options}}, callback); + } + + stat(fsPath, callback) { + const key = `${fsPath}-stat`; + this._doRequest({action: "stat", key, args: {fsPath}}, callback); + } +} + +export class DuplexCollectionMainInterface extends AbstractMain { + constructor(collection) { + super(collection); + } + + get(method, args) { + const {virPattern, virPath, resource, options} = args; + const composedArgs = [virPattern, virPath, resource, options].filter(($) => $ !== undefined); + + return new Promise((resolve) => { + this._collection[method](...composedArgs, (error, result) => { + resolve({error, result}); + }); + }); + } +} + +export class DuplexCollectionThreadInterface extends AbstractThread { + #promisifyRequest(args) { + return new Promise((resolve, reject) => { + this._doRequest(args, (error, result) => { + if (error) { + reject(error); + } else { + resolve(result); + } + }); + }); + } + + byGlob(virPattern, options) { + const key = virPattern; + + return this.#promisifyRequest({ + action: "byGlob", + key, + args: {virPattern, options}, + }); + } + + byPath(virPath, options) { + const key = virPath; + + return this.#promisifyRequest({ + action: "byPath", + key, + args: {virPath, options}, + }); + } + + write(resource, options) { + const key = resource.getName(); + + return this.#promisifyRequest({ + action: "write", + key, + args: {resource, options}, + }); + } +} diff --git a/lib/lbt/utils/TaskProcessorThread.js b/lib/lbt/utils/TaskProcessorThread.js new file mode 100644 index 000000000..3e9c600e0 --- /dev/null +++ b/lib/lbt/utils/TaskProcessorThread.js @@ -0,0 +1,31 @@ +import workerpool from "workerpool"; +import {FsWorkerThreadInterface, deserializeResources, serializeData, deserializeData} from "./PoolDispatcher.js"; + +export default async function execInThread({modulePath, methodName, args}) { + const moduleToExecute = await import(modulePath); + const methodCall = moduleToExecute[methodName] || moduleToExecute["default"]; + const {options, resources, fs} = args; + + const buildUpArgs = {options: await deserializeData(options)}; + + if (resources) { + buildUpArgs.resources = await deserializeResources(resources); + } + if (fs) { + buildUpArgs.fs = new FsWorkerThreadInterface(fs); + } + + const result = await methodCall(buildUpArgs); + + return serializeData(result); +} + +// Test execution via ava is never done on the main thread +/* istanbul ignore else */ +if (!workerpool.isMainThread) { + // Script got loaded through workerpool + // => Create a worker and register public functions + workerpool.worker({ + execInThread, + }); +} diff --git a/lib/processors/minifier.js b/lib/processors/minifier.js index b70d4dd02..2a4c892d9 100644 --- a/lib/processors/minifier.js +++ b/lib/processors/minifier.js @@ -1,45 +1,20 @@ import {fileURLToPath} from "node:url"; import posixPath from "node:path/posix"; import {promisify} from "node:util"; -import os from "node:os"; -import workerpool from "workerpool"; +import {PoolDispatcher} from "../lbt/utils/PoolDispatcher.js"; import Resource from "@ui5/fs/Resource"; import {getLogger} from "@ui5/logger"; const log = getLogger("builder:processors:minifier"); const debugFileRegex = /((?:\.view|\.fragment|\.controller|\.designtime|\.support)?\.js)$/; -const MIN_WORKERS = 2; -const MAX_WORKERS = 4; -const osCpus = os.cpus().length || 1; -const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); - const sourceMappingUrlPattern = /\/\/# sourceMappingURL=(\S+)\s*$/; const httpPattern = /^https?:\/\//i; -// Shared workerpool across all executions until the taskUtil cleanup is triggered -let pool; - -function getPool(taskUtil) { - if (!pool) { - log.verbose(`Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})`); - const workerPath = fileURLToPath(new URL("./minifierWorker.js", import.meta.url)); - pool = workerpool.pool(workerPath, { - workerType: "auto", - maxWorkers - }); - taskUtil.registerCleanupTask(() => { - log.verbose(`Terminating workerpool`); - const poolToBeTerminated = pool; - pool = null; - poolToBeTerminated.terminate(); - }); - } - return pool; -} - -async function minifyInWorker(options, taskUtil) { - return getPool(taskUtil).exec("execMinification", [options]); +async function minifyInWorker(args) { + const url = fileURLToPath(new URL("./minifierWorker.js", import.meta.url)); + const processor = PoolDispatcher.getInstance().getProcessor(url); + return processor.execute("execMinification", args); } async function extractAndRemoveSourceMappingUrl(resource) { @@ -249,11 +224,13 @@ export default async function({ } const result = await minify({ - filename, - dbgFilename, - code, - sourceMapOptions - }, taskUtil); + options: { + filename, + dbgFilename, + code, + sourceMapOptions, + }, + }); resource.setString(result.code); const sourceMapResource = new Resource({ path: resource.getPath() + ".map", diff --git a/lib/processors/minifierWorker.js b/lib/processors/minifierWorker.js index 943422903..cc6fd7028 100644 --- a/lib/processors/minifierWorker.js +++ b/lib/processors/minifierWorker.js @@ -1,4 +1,3 @@ -import workerpool from "workerpool"; import {minify} from "terser"; /** @@ -27,53 +26,42 @@ const copyrightCommentsAndBundleCommentPattern = /copyright|\(c\)(?:[0-9]+|\s+[0 * @static * * @param {object} parameters Parameters - * @param {string} parameters.filename - * @param {string} parameters.dbgFilename - * @param {string} parameters.code - * @param {object} parameters.sourceMapOptions + * @param {string} parameters.options + * @param {string} parameters.options.filename + * @param {string} parameters.options.dbgFilename + * @param {string} parameters.options.code + * @param {object} parameters.options.sourceMapOptions * @returns {Promise} Promise resolving once minification of the resource has finished */ export default async function execMinification({ - filename, - dbgFilename, - code, - sourceMapOptions + options: {filename, dbgFilename, code, sourceMapOptions}, }) { try { - return await minify({ - // Use debug-name since this will be referenced in the source map "sources" - [dbgFilename]: code - }, { - output: { - comments: copyrightCommentsAndBundleCommentPattern, - wrap_func_args: false + return await minify( + { + // Use debug-name since this will be referenced in the source map "sources" + [dbgFilename]: code, }, - compress: false, - mangle: { - reserved: [ - "jQuery", - "jquery", - "sap", - ] - }, - sourceMap: sourceMapOptions - }); + { + output: { + comments: copyrightCommentsAndBundleCommentPattern, + wrap_func_args: false, + }, + compress: false, + mangle: { + reserved: ["jQuery", "jquery", "sap"], + }, + sourceMap: sourceMapOptions, + } + ); } catch (err) { // Note: err.filename contains the debug-name throw new Error( `Minification failed with error: ${err.message} in file ${filename} ` + - `(line ${err.line}, col ${err.col}, pos ${err.pos})`, { - cause: err - }); + `(line ${err.line}, col ${err.col}, pos ${err.pos})`, + { + cause: err, + } + ); } } - -// Test execution via ava is never done on the main thread -/* istanbul ignore else */ -if (!workerpool.isMainThread) { - // Script got loaded through workerpool - // => Create a worker and register public functions - workerpool.worker({ - execMinification - }); -} diff --git a/lib/processors/themeBuilderWorker.js b/lib/processors/themeBuilderWorker.js deleted file mode 100644 index 91956e566..000000000 --- a/lib/processors/themeBuilderWorker.js +++ /dev/null @@ -1,278 +0,0 @@ -import workerpool from "workerpool"; -import themeBuilder from "./themeBuilder.js"; -import {createResource} from "@ui5/fs/resourceFactory"; -import {Buffer} from "node:buffer"; - -/** - * Task to build library themes. - * - * @private - * @function default - * @static - * - * @param {object} parameters Parameters - * @param {MessagePort} parameters.fsInterfacePort - * @param {object[]} parameters.themeResources Input array of Uint8Array transferable objects - * that are the less sources to build upon. By nature those are @ui5/fs/Resource. - * @param {object} parameters.options Less compiler options - * @returns {Promise} Resulting array of Uint8Array transferable objects - */ -export default async function execThemeBuild({ - fsInterfacePort, - themeResources = [], - options = {} -}) { - const fsThemeResources = deserializeResources(themeResources); - const fsReader = new FsWorkerThreadInterface(fsInterfacePort); - - const result = await themeBuilder({ - resources: fsThemeResources, - fs: fsReader, - options - }); - - return serializeResources(result); -} - -/** - * Casts @ui5/fs/Resource-s into an Uint8Array transferable object - * - * @param {@ui5/fs/Resource[]} resourceCollection - * @returns {Promise} - */ -export async function serializeResources(resourceCollection) { - return Promise.all( - resourceCollection.map(async (res) => ({ - buffer: await res.getBuffer(), - path: res.getPath() - })) - ); -} - -/** - * Casts Uint8Array into @ui5/fs/Resource-s transferable object - * - * @param {Promise} resources - * @returns {@ui5/fs/Resource[]} - */ -export function deserializeResources(resources) { - return resources.map((res) => { - // res.buffer is an Uint8Array object and needs to be cast - // to a Buffer in order to be read correctly. - return createResource({path: res.path, buffer: Buffer.from(res.buffer)}); - }); -} - -/** - * "@ui5/fs/fsInterface" like class that uses internally - * "@ui5/fs/fsInterface", implements its methods, and - * sends the results to a MessagePort. - * - * Used in the main thread in a combination with FsWorkerThreadInterface. - */ -export class FsMainThreadInterface { - #comPorts = new Set(); - #fsInterfaceReader = null; - #cache = Object.create(null); - - /** - * Constructor - * - * @param {@ui5/fs/fsInterface} fsInterfaceReader Reader for the Resources - */ - constructor(fsInterfaceReader) { - if (!fsInterfaceReader) { - throw new Error("fsInterfaceReader is mandatory argument"); - } - - this.#fsInterfaceReader = fsInterfaceReader; - } - - /** - * Adds MessagePort and starts listening for requests on it. - * - * @param {MessagePort} comPort port1 from a {code}MessageChannel{/code} - */ - startCommunication(comPort) { - if (!comPort) { - throw new Error("Communication channel is mandatory argument"); - } - - this.#comPorts.add(comPort); - comPort.on("message", (e) => this.#onMessage(e, comPort)); - comPort.on("close", () => comPort.close()); - } - - /** - * Ends MessagePort communication. - * - * @param {MessagePort} comPort port1 to remove from handling. - */ - endCommunication(comPort) { - comPort.close(); - this.#comPorts.delete(comPort); - } - - /** - * Destroys the FsMainThreadInterface - */ - cleanup() { - this.#comPorts.forEach((comPort) => comPort.close()); - this.#cache = null; - this.#fsInterfaceReader = null; - } - - /** - * Handles messages from the MessagePort - * - * @param {object} e data to construct the request - * @param {string} e.action Action to perform. Corresponds to the names of - * the public methods of "@ui5/fs/fsInterface" - * @param {string} e.fsPath Path of the Resource - * @param {object} e.options Options for "readFile" action - * @param {MessagePort} comPort The communication channel - */ - #onMessage(e, comPort) { - switch (e.action) { - case "readFile": - this.#doRequest(comPort, {action: "readFile", fsPath: e.fsPath, options: e.options}); - break; - case "stat": - this.#doRequest(comPort, {action: "stat", fsPath: e.fsPath}); - break; - } - } - - /** - * Requests a Resource from the "@ui5/fs/fsInterface" and sends it to the worker threads - * via postMessage. - * - * @param {MessagePort} comPort The communication channel - * @param {object} parameters - * @param {string} parameters.action Action to perform. Corresponds to the names of - * the public methods of "@ui5/fs/fsInterface" and triggers this method of the - * "@ui5/fs/fsInterface" instance. - * @param {string} parameters.fsPath Path of the Resource - * @param {object} parameters.options Options for "readFile" action - */ - async #doRequest(comPort, {action, fsPath, options}) { - const cacheKey = `${fsPath}-${action}`; - if (!this.#cache[cacheKey]) { - this.#cache[cacheKey] = new Promise((res) => { - if (action === "readFile") { - this.#fsInterfaceReader.readFile(fsPath, options, (error, result) => res({error, result})); - } else if (action === "stat") { - this.#fsInterfaceReader.stat(fsPath, (error, result) => - // The Stat object has some special methods that sometimes cannot be serialized - // properly in the postMessage. In this scenario, we do not need those methods, - // but just to check whether stats has resolved to something. - res(JSON.parse(JSON.stringify({error, result}))) - ); - } else { - res({error: new Error(`Action "${action}" is not available.`), result: null}); - } - }); - } - - const fromCache = await this.#cache[cacheKey]; - comPort.postMessage({action, fsPath, ...fromCache}); - } -} - -/** - * "@ui5/fs/fsInterface" like class that uses internally - * "@ui5/fs/fsInterface", implements its methods, and - * requests resources via MessagePort. - * - * Used in the main thread in a combination with FsMainThreadInterface. - */ -export class FsWorkerThreadInterface { - #comPort = null; - #callbacks = []; - #cache = Object.create(null); - - /** - * Constructor - * - * @param {MessagePort} comPort Communication port - */ - constructor(comPort) { - if (!comPort) { - throw new Error("Communication port is mandatory argument"); - } - - this.#comPort = comPort; - comPort.on("message", this.#onMessage.bind(this)); - comPort.on("close", this.#onClose.bind(this)); - } - - /** - * Handles messages from MessagePort - * - * @param {object} e - * @param {string} e.action Action to perform. Corresponds to the names of - * the public methods of "@ui5/fs/fsInterface" - * @param {string} e.fsPath Path of the Resource - * @param {*} e.result Response from the "action". - * @param {object} e.error Error from the "action". - */ - #onMessage(e) { - const cbObject = this.#callbacks.find((cb) => cb.action === e.action && cb.fsPath === e.fsPath); - - if (cbObject) { - this.#cache[`${e.fsPath}-${e.action}`] = {error: e.error, result: e.result}; - this.#callbacks.splice(this.#callbacks.indexOf(cbObject), 1); - cbObject.callback(e.error, e.result); - } else { - throw new Error("No callback found for this message! Possible hang for the thread!", e); - } - } - - /** - * End communication - */ - #onClose() { - this.#comPort.close(); - this.#cache = null; - } - - /** - * Makes a request via the MessagePort - * - * @param {object} parameters - * @param {string} parameters.action Action to perform. Corresponds to the names of - * the public methods. - * @param {string} parameters.fsPath Path of the Resource - * @param {object} parameters.options Options for "readFile" action - * @param {Function} callback Callback to call when the "action" is executed and ready. - */ - #doRequest({action, fsPath, options}, callback) { - const cacheKey = `${fsPath}-${action}`; - - if (this.#cache[cacheKey]) { - const {result, error} = this.#cache[cacheKey]; - callback(error, result); - } else { - this.#callbacks.push({action, fsPath, callback}); - this.#comPort.postMessage({action, fsPath, options}); - } - } - - readFile(fsPath, options, callback) { - this.#doRequest({action: "readFile", fsPath, options}, callback); - } - - stat(fsPath, callback) { - this.#doRequest({action: "stat", fsPath}, callback); - } -} - -// Test execution via ava is never done on the main thread -/* istanbul ignore else */ -if (!workerpool.isMainThread) { - // Script got loaded through workerpool - // => Create a worker and register public functions - workerpool.worker({ - execThemeBuild - }); -} diff --git a/lib/tasks/buildThemes.js b/lib/tasks/buildThemes.js index 4660b1d15..5e37a1329 100644 --- a/lib/tasks/buildThemes.js +++ b/lib/tasks/buildThemes.js @@ -4,40 +4,7 @@ import ReaderCollectionPrioritized from "@ui5/fs/ReaderCollectionPrioritized"; import {getLogger} from "@ui5/logger"; const log = getLogger("builder:tasks:buildThemes"); import {fileURLToPath} from "node:url"; -import os from "node:os"; -import workerpool from "workerpool"; -import {deserializeResources, serializeResources, FsMainThreadInterface} from "../processors/themeBuilderWorker.js"; - -let pool; - -function getPool(taskUtil) { - if (!pool) { - const MIN_WORKERS = 2; - const MAX_WORKERS = 4; - const osCpus = os.cpus().length || 1; - const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); - - log.verbose(`Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})`); - const workerPath = fileURLToPath(new URL("../processors/themeBuilderWorker.js", import.meta.url)); - pool = workerpool.pool(workerPath, { - workerType: "thread", - maxWorkers - }); - taskUtil.registerCleanupTask(() => { - log.verbose(`Terminating workerpool`); - const poolToBeTerminated = pool; - pool = null; - poolToBeTerminated.terminate(); - }); - } - return pool; -} - -async function buildThemeInWorker(taskUtil, options, transferList) { - const toTransfer = transferList ? {transfer: transferList} : undefined; - - return getPool(taskUtil).exec("execThemeBuild", [options], toTransfer); -} +import {PoolDispatcher} from "../lbt/utils/PoolDispatcher.js"; /** @@ -172,29 +139,19 @@ export default async function({ let processedResources; const useWorkers = !!taskUtil; if (useWorkers) { - const threadMessageHandler = new FsMainThreadInterface(fsInterface(combo)); - + const modulePath = fileURLToPath(new URL("../processors/themeBuilder.js", import.meta.url)); + const processor = PoolDispatcher.getInstance().getProcessor(modulePath); processedResources = await Promise.all(themeResources.map(async (themeRes) => { - const {port1, port2} = new MessageChannel(); - threadMessageHandler.startCommunication(port1); - - const result = await buildThemeInWorker(taskUtil, { - fsInterfacePort: port2, - themeResources: await serializeResources([themeRes]), + return processor.execute("default", { + resources: [themeRes], + fs: fsInterface(combo), options: { compress, cssVariables: !!cssVariables, - }, - }, [port2]); - - threadMessageHandler.endCommunication(port1); - - return result; + } + }); })) - .then((resources) => Array.prototype.concat.apply([], resources)) - .then(deserializeResources); - - threadMessageHandler.cleanup(); + .then((resources) => Array.prototype.concat.apply([], resources)); } else { // Do not use workerpool const themeBuilder = (await import("../processors/themeBuilder.js")).default; diff --git a/test/lib/processors/minifier.js b/test/lib/processors/minifier.js index a2ebf6a3d..30fa4f087 100644 --- a/test/lib/processors/minifier.js +++ b/test/lib/processors/minifier.js @@ -104,9 +104,6 @@ ${SOURCE_MAPPING_URL}=test.controller.js.map`; "mappings": ";;;AAGC,SAASA,OAAOC,GACfC,OAAOC,IAAIC,QAAQ,aACnBC,QAAQC,IAAI,qBACb,CACDN" }); t.deepEqual(await sourceMapResource.getString(), expectedSourceMap, "Correct source map content"); - - // Call to registerCleanupTask indicates worker pool was used - t.is(taskUtilMock.registerCleanupTask.callCount, 1, "taskUtil#registerCleanupTask got called once"); }); test("minifier with useWorkers: true and missing taskUtil", async (t) => { diff --git a/test/lib/tasks/buildThemes.js b/test/lib/tasks/buildThemes.js index 48fd06a8f..84505d45d 100644 --- a/test/lib/tasks/buildThemes.js +++ b/test/lib/tasks/buildThemes.js @@ -1,7 +1,7 @@ import test from "ava"; import sinon from "sinon"; import esmock from "esmock"; -import {deserializeResources} from "../../../lib/processors/themeBuilderWorker.js"; +import {deserializeResources} from "../../../lib/lbt/utils/PoolDispatcher.js"; let buildThemes; test.before(async () => {