From 62aa0147ccd3179f5201d09d1934c049f9d4864a Mon Sep 17 00:00:00 2001 From: "gwiyeong.j" Date: Thu, 5 Dec 2024 18:36:37 +0900 Subject: [PATCH 1/3] add onCreatedWorker callback --- src/Pool.js | 6 +++++- src/types.js | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Pool.js b/src/Pool.js index 3f762b2f..25853b93 100644 --- a/src/Pool.js +++ b/src/Pool.js @@ -51,6 +51,8 @@ function Pool(script, options) { /** @readonly */ this.onCreateWorker = options.onCreateWorker || (() => null); /** @readonly */ + this.onCreatedWorker = options.onCreatedWorker || (() => null); + /** @readonly */ this.onTerminateWorker = options.onTerminateWorker || (() => null); /** @readonly */ @@ -421,7 +423,7 @@ Pool.prototype._createWorkerHandler = function () { script: this.script }) || {}; - return new WorkerHandler(overriddenParams.script || this.script, { + const worker = new WorkerHandler(overriddenParams.script || this.script, { forkArgs: overriddenParams.forkArgs || this.forkArgs, forkOpts: overriddenParams.forkOpts || this.forkOpts, workerOpts: overriddenParams.workerOpts || this.workerOpts, @@ -431,6 +433,8 @@ Pool.prototype._createWorkerHandler = function () { workerTerminateTimeout: this.workerTerminateTimeout, emitStdStreams: this.emitStdStreams, }); + this.onCreatedWorker(worker) + return worker; } /** diff --git a/src/types.js b/src/types.js index c4709fe9..a09e5e58 100644 --- a/src/types.js +++ b/src/types.js @@ -24,6 +24,7 @@ * @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options). * @property {boolean} [emitStdStreams] Capture stdout and stderr from the worker and emit them via the `stdout` and `stderr` events. Not supported by the `web` worker type. * @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created. + * @property { (arg: WorkerHandler) => void } [onCreatedWorker] A callback that is called whenever a worker is created. For example, it can be used to add a limit, such as a cpu limit, for each created worker. * @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. */ From 71b31be37fd7c662763b2225e3aff00de7b8fed0 Mon Sep 17 00:00:00 2001 From: "gwiyeong.j" Date: Thu, 5 Dec 2024 18:47:58 +0900 Subject: [PATCH 2/3] add readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 1d4c0727..fe952574 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,7 @@ The following options are available: - `workerOpts: Object`: the `workerOpts` option of this pool - `script: string`: the `script` option of this pool Optionally, this callback can return an object containing one or more of the above properties. The provided properties will be used to override the Pool properties for the worker being created. +- `onCreatedWorker: Function`. A callback that is called whenever a worker is created. For example, it can be used to add a limit, such as a cpu limit, for each created worker. - `onTerminateWorker: Function`. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. - `emitStdStreams: boolean`. For `process` or `thread` worker type. If `true`, the worker will emit `stdout` and `stderr` events instead of passing it through to the parent streams. Default value is `false`. From ef978cb095ac31d66c2977aa94b7fbde2623aedd Mon Sep 17 00:00:00 2001 From: "gwiyeong.j" Date: Thu, 5 Dec 2024 19:00:07 +0900 Subject: [PATCH 3/3] add cpulimit example --- examples/cpulimit.js | 44 ++++++++++++++++++++++++++ examples/workers/cpuIntensiveWorker.js | 17 ++++++++++ 2 files changed, 61 insertions(+) create mode 100644 examples/cpulimit.js create mode 100644 examples/workers/cpuIntensiveWorker.js diff --git a/examples/cpulimit.js b/examples/cpulimit.js new file mode 100644 index 00000000..d61f876e --- /dev/null +++ b/examples/cpulimit.js @@ -0,0 +1,44 @@ +const workerpool = require(".."); +const limiter = require("cpulimit"); // need npm install cpulimit + +// create a worker pool +const pool = workerpool.pool(__dirname + "/workers/cpuIntensiveWorker.js", { + // cpu limit is supported for process worker + workerType: "process", + workerTerminateTimeout: 1000, + onCreatedWorker: (worker) => { + const cpuLimitOption = { + limit: 50, + includeChildren: true, + pid: worker.worker.pid + }; + limiter.createProcessFamily(cpuLimitOption, function(err, processFamily) { + if(err) { + console.error('Error:', err.message); + return; + } + + limiter.limit(processFamily, cpuLimitOption, function(err) { + if(err) { + console.error('Error:', err.message); + } + else { + console.log('Done.'); + } + }); + }); + } +}); + +const main = async () => { + try { + await pool + .exec("cpuIntensive", []) + .timeout(10000) + } catch(err) { + console.log('Timeout') + } + await pool.terminate() +}; + +main(); diff --git a/examples/workers/cpuIntensiveWorker.js b/examples/workers/cpuIntensiveWorker.js new file mode 100644 index 00000000..29fe5aea --- /dev/null +++ b/examples/workers/cpuIntensiveWorker.js @@ -0,0 +1,17 @@ +// Example of a CPU-intensive worker that never shuts down + +var workerpool = require('../..'); + +function cpuIntensive () { + return new Promise(function (resolve, reject) { + while(true) { + process.stdout.write("."); + for (let i = 0; i < 1e8; i++) {} + } + resolve({}) + }); +} + +workerpool.worker({ + cpuIntensive: cpuIntensive, +});