diff --git a/README.md b/README.md index a77d22b2..85108cf0 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,8 @@ The following options are available: - `workerOpts: Object`: the `workerOpts` option of this pool - `script: string`: the `script` option of this pool Optionally, this callback can return an object containing one or more of the above properties. The provided properties will be used to override the Pool properties for the worker being created. +- `onCreatedWorker: Function`. A callback that is called whenever a worker is created. The callback is passed as argument the worker instance, which can be used to access properties like `pid` (for `process` type) or `threadId` (for `thread` type). This is useful for resource management, such as applying CPU limits to worker processes. + - `worker`: The created worker instance (`Web Worker`, `worker_threads.Worker`, or `ChildProcess` depending on the `workerType`). - `onTerminateWorker: Function`. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. - `emitStdStreams: boolean`. For `process` or `thread` worker type. If `true`, the worker will emit `stdout` and `stderr` events instead of passing it through to the parent streams. Default value is `false`. diff --git a/examples/cpulimit.js b/examples/cpulimit.js new file mode 100644 index 00000000..eb7e33c9 --- /dev/null +++ b/examples/cpulimit.js @@ -0,0 +1,48 @@ +const workerpool = require(".."); +const limiter = require("cpulimit"); // need npm install cpulimit + +// create a worker pool +const pool = workerpool.pool(__dirname + "/workers/cpuIntensiveWorker.js", { + // cpu limit is supported for process worker + workerType: "process", + workerTerminateTimeout: 1000, + onCreatedWorker: (worker) => { + const cpuLimitOption = { + limit: 50, + includeChildren: true, + pid: worker.pid, + }; + limiter.createProcessFamily(cpuLimitOption, function (err, processFamily) { + if (err) { + console.error("Error:", err.message); + return; + } + + limiter.limit(processFamily, cpuLimitOption, function (err) { + if (err) { + console.error("Error:", err.message); + } else { + console.log("Done."); + } + }); + }); + }, +}); + +const main = async () => { + try { + await pool + .exec('cpuIntensive', []) + .timeout(10000); + } catch (err) { + if (err instanceof workerpool.Promise.TimeoutError) { + console.log('Timeout (expected)'); + } else { + console.log('Unexpected error:', err); + } + } + + await pool.terminate(); +}; + +main(); diff --git a/examples/workers/cpuIntensiveWorker.js b/examples/workers/cpuIntensiveWorker.js new file mode 100644 index 00000000..f8413820 --- /dev/null +++ b/examples/workers/cpuIntensiveWorker.js @@ -0,0 +1,16 @@ +// Example of a CPU-intensive worker that never shuts down + +var workerpool = require('../..'); + +function cpuIntensive () { + return new Promise(function (resolve, reject) { + while(true) { + process.stdout.write("."); + for (let i = 0; i < 1e8; i++) {} + } + }); +} + +workerpool.worker({ + cpuIntensive: cpuIntensive, +}); \ No newline at end of file diff --git a/src/Pool.js b/src/Pool.js index ab842738..b816aee9 100644 --- a/src/Pool.js +++ b/src/Pool.js @@ -55,6 +55,8 @@ function Pool(script, options) { /** @readonly */ this.onCreateWorker = options.onCreateWorker || (() => null); /** @readonly */ + this.onCreatedWorker = options.onCreatedWorker || (() => null); + /** @readonly */ this.onTerminateWorker = options.onTerminateWorker || (() => null); /** @readonly */ @@ -430,8 +432,8 @@ Pool.prototype._createWorkerHandler = function () { workerThreadOpts: this.workerThreadOpts, script: this.script, }) || {}; - - return new WorkerHandler(overriddenParams.script || this.script, { + + const workerHandler = new WorkerHandler(overriddenParams.script || this.script, { forkArgs: overriddenParams.forkArgs || this.forkArgs, forkOpts: overriddenParams.forkOpts || this.forkOpts, workerOpts: overriddenParams.workerOpts || this.workerOpts, @@ -444,6 +446,10 @@ Pool.prototype._createWorkerHandler = function () { workerTerminateTimeout: this.workerTerminateTimeout, emitStdStreams: this.emitStdStreams, }); + + this.onCreatedWorker(workerHandler.worker); + + return workerHandler; }; /** diff --git a/src/types.js b/src/types.js index 96c51fc1..c8a46bf3 100644 --- a/src/types.js +++ b/src/types.js @@ -7,6 +7,10 @@ * @property {string} [script] The `script` option of this pool */ +/** + * @typedef {Worker | import('worker_threads').Worker | import('child_process').ChildProcess} WorkerInstance + */ + /** * @typedef {Object} Resolver * @property {Promise} promise - The promise object @@ -56,6 +60,7 @@ * @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options). * @property {boolean} [emitStdStreams] Capture stdout and stderr from the worker and emit them via the `stdout` and `stderr` events. Not supported by the `web` worker type. * @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created. + * @property { (worker: WorkerInstance) => void } [onCreatedWorker] A callback that is called whenever a worker is created. The callback is passed as argument the worker instance, which can be used to access properties like `pid` (for `process` type) or `threadId` (for `thread` type). This is useful for resource management, such as applying CPU limits to worker processes. * @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. */ diff --git a/test/Pool.test.js b/test/Pool.test.js index 3f34da15..6ce9781d 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1904,4 +1904,97 @@ describe('Pool', function () { after(() => { delete Object.prototype.env }); }); }); + describe('onCreatedWorker callback', function() { + var WorkerThreads = tryRequire('worker_threads'); + + it('should call onCreatedWorker with worker instance for process type', function () { + var onCreatedWorkerCalled = false; + var workerPid = null; + + var pool = createPool({ + workerType: 'process', + onCreatedWorker: (worker) => { + onCreatedWorkerCalled = true; + workerPid = worker.pid; + } + }); + + return pool.exec(add, [3, 4]) + .then((result) => { + assert.strictEqual(result, 7); + assert.strictEqual(onCreatedWorkerCalled, true); + assert.strictEqual(typeof workerPid, 'number'); + }); + }); + + if (WorkerThreads) { + it('should call onCreatedWorker with worker instance for thread type', function () { + var onCreatedWorkerCalled = false; + var threadId = null; + + var pool = createPool({ + workerType: 'thread', + onCreatedWorker: (worker) => { + onCreatedWorkerCalled = true; + threadId = worker.threadId; + } + }); + + return pool.exec(add, [3, 4]) + .then((result) => { + assert.strictEqual(result, 7); + assert.strictEqual(onCreatedWorkerCalled, true); + assert.strictEqual(typeof threadId, 'number'); + }); + }); + } + + it('should call onCreatedWorker for each worker created', function () { + var workerPids = []; + + var pool = createPool({ + workerType: 'process', + maxWorkers: 4, + onCreatedWorker: (worker) => { + workerPids.push(worker.pid); + } + }); + + return Promise.all([ + pool.exec(add, [1, 2]), + pool.exec(add, [3, 4]), + pool.exec(add, [5, 6]) + ]).then((results) => { + assert.deepStrictEqual(results, [3, 7, 11]); + assert.strictEqual(workerPids.length, 3); + var uniquePids = [...new Set(workerPids)]; + assert.strictEqual(uniquePids.length, 3); + }); + }); + + it('should call onCreatedWorker after onCreateWorker but before task completion', function () { + var callOrder = []; + + var pool = createPool({ + workerType: 'process', + onCreateWorker: (_opts) => { + callOrder.push('onCreateWorker'); + }, + onCreatedWorker: () => { + callOrder.push('onCreatedWorker'); + } + }); + + function trackedAdd(a, b) { + return a + b; + } + + return pool.exec(trackedAdd, [3, 4]) + .then((result) => { + callOrder.push('taskComplete'); + assert.strictEqual(result, 7); + assert.deepStrictEqual(callOrder, ['onCreateWorker', 'onCreatedWorker', 'taskComplete']); + }); + }); + }); });