Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ The following options are available:
- `workerOpts: Object`: the `workerOpts` option of this pool
- `script: string`: the `script` option of this pool
Optionally, this callback can return an object containing one or more of the above properties. The provided properties will be used to override the Pool properties for the worker being created.
- `onCreatedWorker: Function`. A callback that is called whenever a worker is created. The callback is passed as argument the worker instance, which can be used to access properties like `pid` (for `process` type) or `threadId` (for `thread` type). This is useful for resource management, such as applying CPU limits to worker processes.
- `worker`: The created worker instance (`Web Worker`, `worker_threads.Worker`, or `ChildProcess` depending on the `workerType`).
- `onTerminateWorker: Function`. A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
- `emitStdStreams: boolean`. For `process` or `thread` worker type. If `true`, the worker will emit `stdout` and `stderr` events instead of passing it through to the parent streams. Default value is `false`.

Expand Down
48 changes: 48 additions & 0 deletions examples/cpulimit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const workerpool = require("..");
const limiter = require("cpulimit"); // need npm install cpulimit

// create a worker pool
const pool = workerpool.pool(__dirname + "/workers/cpuIntensiveWorker.js", {
// cpu limit is supported for process worker
workerType: "process",
workerTerminateTimeout: 1000,
onCreatedWorker: (worker) => {
const cpuLimitOption = {
limit: 50,
includeChildren: true,
pid: worker.pid,
};
limiter.createProcessFamily(cpuLimitOption, function (err, processFamily) {
if (err) {
console.error("Error:", err.message);
return;
}

limiter.limit(processFamily, cpuLimitOption, function (err) {
if (err) {
console.error("Error:", err.message);
} else {
console.log("Done.");
}
});
});
},
});

const main = async () => {
try {
await pool
.exec('cpuIntensive', [])
.timeout(10000);
} catch (err) {
if (err instanceof workerpool.Promise.TimeoutError) {
console.log('Timeout (expected)');
} else {
console.log('Unexpected error:', err);
}
}

await pool.terminate();
};

main();
16 changes: 16 additions & 0 deletions examples/workers/cpuIntensiveWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Example of a CPU-intensive worker that never shuts down

var workerpool = require('../..');

function cpuIntensive () {
return new Promise(function (resolve, reject) {
while(true) {
process.stdout.write(".");
for (let i = 0; i < 1e8; i++) {}
}
});
}

workerpool.worker({
cpuIntensive: cpuIntensive,
});
10 changes: 8 additions & 2 deletions src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ function Pool(script, options) {
/** @readonly */
this.onCreateWorker = options.onCreateWorker || (() => null);
/** @readonly */
this.onCreatedWorker = options.onCreatedWorker || (() => null);
/** @readonly */
this.onTerminateWorker = options.onTerminateWorker || (() => null);

/** @readonly */
Expand Down Expand Up @@ -430,8 +432,8 @@ Pool.prototype._createWorkerHandler = function () {
workerThreadOpts: this.workerThreadOpts,
script: this.script,
}) || {};

return new WorkerHandler(overriddenParams.script || this.script, {
const workerHandler = new WorkerHandler(overriddenParams.script || this.script, {
forkArgs: overriddenParams.forkArgs || this.forkArgs,
forkOpts: overriddenParams.forkOpts || this.forkOpts,
workerOpts: overriddenParams.workerOpts || this.workerOpts,
Expand All @@ -444,6 +446,10 @@ Pool.prototype._createWorkerHandler = function () {
workerTerminateTimeout: this.workerTerminateTimeout,
emitStdStreams: this.emitStdStreams,
});

this.onCreatedWorker(workerHandler.worker);

return workerHandler;
};

/**
Expand Down
5 changes: 5 additions & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
* @property {string} [script] The `script` option of this pool
*/

/**
* @typedef {Worker | import('worker_threads').Worker | import('child_process').ChildProcess} WorkerInstance
*/

/**
* @typedef {Object} Resolver
* @property {Promise} promise - The promise object
Expand Down Expand Up @@ -56,6 +60,7 @@
* @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options).
* @property {boolean} [emitStdStreams] Capture stdout and stderr from the worker and emit them via the `stdout` and `stderr` events. Not supported by the `web` worker type.
* @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created.
* @property { (worker: WorkerInstance) => void } [onCreatedWorker] A callback that is called whenever a worker is created. The callback is passed as argument the worker instance, which can be used to access properties like `pid` (for `process` type) or `threadId` (for `thread` type). This is useful for resource management, such as applying CPU limits to worker processes.
* @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
*/

Expand Down
93 changes: 93 additions & 0 deletions test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1904,4 +1904,97 @@ describe('Pool', function () {
after(() => { delete Object.prototype.env });
});
});
describe('onCreatedWorker callback', function() {
var WorkerThreads = tryRequire('worker_threads');

it('should call onCreatedWorker with worker instance for process type', function () {
var onCreatedWorkerCalled = false;
var workerPid = null;

var pool = createPool({
workerType: 'process',
onCreatedWorker: (worker) => {
onCreatedWorkerCalled = true;
workerPid = worker.pid;
}
});

return pool.exec(add, [3, 4])
.then((result) => {
assert.strictEqual(result, 7);
assert.strictEqual(onCreatedWorkerCalled, true);
assert.strictEqual(typeof workerPid, 'number');
});
});

if (WorkerThreads) {
it('should call onCreatedWorker with worker instance for thread type', function () {
var onCreatedWorkerCalled = false;
var threadId = null;

var pool = createPool({
workerType: 'thread',
onCreatedWorker: (worker) => {
onCreatedWorkerCalled = true;
threadId = worker.threadId;
}
});

return pool.exec(add, [3, 4])
.then((result) => {
assert.strictEqual(result, 7);
assert.strictEqual(onCreatedWorkerCalled, true);
assert.strictEqual(typeof threadId, 'number');
});
});
}

it('should call onCreatedWorker for each worker created', function () {
var workerPids = [];

var pool = createPool({
workerType: 'process',
maxWorkers: 4,
onCreatedWorker: (worker) => {
workerPids.push(worker.pid);
}
});

return Promise.all([
pool.exec(add, [1, 2]),
pool.exec(add, [3, 4]),
pool.exec(add, [5, 6])
]).then((results) => {
assert.deepStrictEqual(results, [3, 7, 11]);
assert.strictEqual(workerPids.length, 3);
var uniquePids = [...new Set(workerPids)];
assert.strictEqual(uniquePids.length, 3);
});
});

it('should call onCreatedWorker after onCreateWorker but before task completion', function () {
var callOrder = [];

var pool = createPool({
workerType: 'process',
onCreateWorker: (_opts) => {
callOrder.push('onCreateWorker');
},
onCreatedWorker: () => {
callOrder.push('onCreatedWorker');
}
});

function trackedAdd(a, b) {
return a + b;
}

return pool.exec(trackedAdd, [3, 4])
.then((result) => {
callOrder.push('taskComplete');
assert.strictEqual(result, 7);
assert.deepStrictEqual(callOrder, ['onCreateWorker', 'onCreatedWorker', 'taskComplete']);
});
});
});
});