diff --git a/README.md b/README.md index 1d4c0727..fe952574 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,7 @@ The following options are available: - `workerOpts: Object`: the `workerOpts` option of this pool - `script: string`: the `script` option of this pool Optionally, this callback can return an object containing one or more of the above properties. The provided properties will be used to override the Pool properties for the worker being created. +- `onCreatedWorker: Function`. A callback that is called whenever a worker is created. For example, it can be used to add a limit, such as a cpu limit, for each created worker. - `onTerminateWorker: Function`. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. - `emitStdStreams: boolean`. For `process` or `thread` worker type. If `true`, the worker will emit `stdout` and `stderr` events instead of passing it through to the parent streams. Default value is `false`. diff --git a/examples/cpulimit.js b/examples/cpulimit.js new file mode 100644 index 00000000..d61f876e --- /dev/null +++ b/examples/cpulimit.js @@ -0,0 +1,44 @@ +const workerpool = require(".."); +const limiter = require("cpulimit"); // need npm install cpulimit + +// create a worker pool +const pool = workerpool.pool(__dirname + "/workers/cpuIntensiveWorker.js", { + // cpu limit is supported for process worker + workerType: "process", + workerTerminateTimeout: 1000, + onCreatedWorker: (worker) => { + const cpuLimitOption = { + limit: 50, + includeChildren: true, + pid: worker.worker.pid + }; + limiter.createProcessFamily(cpuLimitOption, function(err, processFamily) { + if(err) { + console.error('Error:', err.message); + return; + } + + limiter.limit(processFamily, cpuLimitOption, function(err) { + if(err) { + console.error('Error:', err.message); + } + else { + console.log('Done.'); + } + }); + }); + } +}); + +const main = async () => { + try { + await pool + .exec("cpuIntensive", []) + .timeout(10000) + } catch(err) { + console.log('Timeout') + } + await pool.terminate() +}; + +main(); diff --git a/examples/workers/cpuIntensiveWorker.js b/examples/workers/cpuIntensiveWorker.js new file mode 100644 index 00000000..29fe5aea --- /dev/null +++ b/examples/workers/cpuIntensiveWorker.js @@ -0,0 +1,17 @@ +// Example of a CPU-intensive worker that never shuts down + +var workerpool = require('../..'); + +function cpuIntensive () { + return new Promise(function (resolve, reject) { + while(true) { + process.stdout.write("."); + for (let i = 0; i < 1e8; i++) {} + } + resolve({}) + }); +} + +workerpool.worker({ + cpuIntensive: cpuIntensive, +}); diff --git a/src/Pool.js b/src/Pool.js index 3f762b2f..25853b93 100644 --- a/src/Pool.js +++ b/src/Pool.js @@ -51,6 +51,8 @@ function Pool(script, options) { /** @readonly */ this.onCreateWorker = options.onCreateWorker || (() => null); /** @readonly */ + this.onCreatedWorker = options.onCreatedWorker || (() => null); + /** @readonly */ this.onTerminateWorker = options.onTerminateWorker || (() => null); /** @readonly */ @@ -421,7 +423,7 @@ Pool.prototype._createWorkerHandler = function () { script: this.script }) || {}; - return new WorkerHandler(overriddenParams.script || this.script, { + const worker = new WorkerHandler(overriddenParams.script || this.script, { forkArgs: overriddenParams.forkArgs || this.forkArgs, forkOpts: overriddenParams.forkOpts || this.forkOpts, workerOpts: overriddenParams.workerOpts || this.workerOpts, @@ -431,6 +433,8 @@ Pool.prototype._createWorkerHandler = function () { workerTerminateTimeout: this.workerTerminateTimeout, emitStdStreams: this.emitStdStreams, }); + this.onCreatedWorker(worker) + return worker; } /** diff --git a/src/types.js b/src/types.js index c4709fe9..a09e5e58 100644 --- a/src/types.js +++ b/src/types.js @@ -24,6 +24,7 @@ * @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options). * @property {boolean} [emitStdStreams] Capture stdout and stderr from the worker and emit them via the `stdout` and `stderr` events. Not supported by the `web` worker type. * @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created. + * @property { (arg: WorkerHandler) => void } [onCreatedWorker] A callback that is called whenever a worker is created. For example, it can be used to add a limit, such as a cpu limit, for each created worker. * @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. */