Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ The following options are available:
- `workerOpts: Object`: the `workerOpts` option of this pool
- `script: string`: the `script` option of this pool
Optionally, this callback can return an object containing one or more of the above properties. The provided properties will be used to override the Pool properties for the worker being created.
- `onCreatedWorker: Function`. A callback that is called whenever a worker is created. For example, it can be used to add a limit, such as a cpu limit, for each created worker.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a unit test for the new option please?

- `onTerminateWorker: Function`. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
- `emitStdStreams: boolean`. For `process` or `thread` worker type. If `true`, the worker will emit `stdout` and `stderr` events instead of passing it through to the parent streams. Default value is `false`.

Expand Down
44 changes: 44 additions & 0 deletions examples/cpulimit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const workerpool = require("..");
const limiter = require("cpulimit"); // need npm install cpulimit

// create a worker pool
const pool = workerpool.pool(__dirname + "/workers/cpuIntensiveWorker.js", {
// cpu limit is supported for process worker
workerType: "process",
workerTerminateTimeout: 1000,
onCreatedWorker: (worker) => {
const cpuLimitOption = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is a nice example!

Can you please change the indentation to 2 spaces? (I know, I should set up a linter)

limit: 50,
includeChildren: true,
pid: worker.worker.pid
};
limiter.createProcessFamily(cpuLimitOption, function(err, processFamily) {
if(err) {
console.error('Error:', err.message);
return;
}

limiter.limit(processFamily, cpuLimitOption, function(err) {
if(err) {
console.error('Error:', err.message);
}
else {
console.log('Done.');
}
});
});
}
});

const main = async () => {
try {
await pool
.exec("cpuIntensive", [])
.timeout(10000)
} catch(err) {
console.log('Timeout')
}
await pool.terminate()
};

main();
17 changes: 17 additions & 0 deletions examples/workers/cpuIntensiveWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Example of a CPU-intensive worker that never shuts down

var workerpool = require('../..');

function cpuIntensive () {
return new Promise(function (resolve, reject) {
while(true) {
process.stdout.write(".");
for (let i = 0; i < 1e8; i++) {}
}
resolve({})
});
}

workerpool.worker({
cpuIntensive: cpuIntensive,
});
6 changes: 5 additions & 1 deletion src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ function Pool(script, options) {
/** @readonly */
this.onCreateWorker = options.onCreateWorker || (() => null);
/** @readonly */
this.onCreatedWorker = options.onCreatedWorker || (() => null);
/** @readonly */
this.onTerminateWorker = options.onTerminateWorker || (() => null);

/** @readonly */
Expand Down Expand Up @@ -421,7 +423,7 @@ Pool.prototype._createWorkerHandler = function () {
script: this.script
}) || {};

return new WorkerHandler(overriddenParams.script || this.script, {
const worker = new WorkerHandler(overriddenParams.script || this.script, {
forkArgs: overriddenParams.forkArgs || this.forkArgs,
forkOpts: overriddenParams.forkOpts || this.forkOpts,
workerOpts: overriddenParams.workerOpts || this.workerOpts,
Expand All @@ -431,6 +433,8 @@ Pool.prototype._createWorkerHandler = function () {
workerTerminateTimeout: this.workerTerminateTimeout,
emitStdStreams: this.emitStdStreams,
});
this.onCreatedWorker(worker)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass the WorkerHandler to onCreatedWorker, we pass a fully undocumented class. How about passing the Worker instead (ie. this.onCreatedWorker(worker.worker)?

return worker;
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options).
* @property {boolean} [emitStdStreams] Capture stdout and stderr from the worker and emit them via the `stdout` and `stderr` events. Not supported by the `web` worker type.
* @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created.
* @property { (arg: WorkerHandler) => void } [onCreatedWorker] A callback that is called whenever a worker is created. For example, it can be used to add a limit, such as a cpu limit, for each created worker.
* @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
*/

Expand Down