Skip to content

Commit 43f345e

Browse files
committed
wip: expanding docs
1 parent df05712 commit 43f345e

File tree

5 files changed

+75
-16
lines changed

5 files changed

+75
-16
lines changed

src/WorkerManager.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ class WorkerManager<Manifest extends WorkerManifest> {
5959
}) {
6060
this.logger = logger;
6161
this.pool = new WorkerPool(cores, workerFactory);
62-
this.methods = new Proxy(manifest, {
63-
get: (target: Manifest, prop: string | symbol) => {
62+
this.methods = new Proxy<Manifest>(manifest, {
63+
get: (_, prop: string | symbol) => {
6464
if (typeof prop === 'symbol') return;
6565
return async (
6666
data: WorkerResult,

src/WorkerPool.ts

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import { AsyncResource } from 'node:async_hooks';
1010
import { Subject } from 'rxjs';
1111
import * as errors from './errors.js';
1212

13+
/**
14+
* This defines a `WorkerTask` `AsyncResource` for tracking the life-cycle of a worker task
15+
* It works with the `Node:Async_hooks` for tracking async resources.
16+
*/
1317
class WorkerTask extends AsyncResource {
1418
protected callback: TaskCallback;
1519

@@ -27,6 +31,12 @@ class WorkerTask extends AsyncResource {
2731
const taskInfoSymbol = Symbol('Task Info Symbol');
2832
type PoolStatus = 'terminated' | 'idle' | 'working' | 'queued';
2933

34+
/**
35+
* A WorkerPool class that manages a pool of worker threads for parallel processing.
36+
* This class allows tasks to be distributed among a fixed number of workers, ensuring
37+
* efficient utilization of resources. It provides mechanisms to queue tasks when all workers
38+
* are occupied and handles error recovery and worker recreation.
39+
*/
3040
class WorkerPool {
3141
protected workerFactory: WorkerFactory;
3242
protected workers: Set<Worker> = new Set();
@@ -45,6 +55,14 @@ class WorkerPool {
4555
public $poolStatus = new Subject<PoolStatus>();
4656
public poolStatus: PoolStatus = 'idle';
4757

58+
/**
59+
* Constructs an instance of the Worker Pool class and initializes the specified number of workers.
60+
*
61+
* @param workerNum - The number of workers to be created in the pool. Must be at least 1.
62+
* @param workerFactory - A factory instance responsible for creating workers.
63+
* @return A Worker Pool instance initialized with the specified workers and configurations.
64+
* @throws {ErrorWorkerPoolInvalidWorkers} If the value of workerNum is less than 1.
65+
*/
4866
constructor(workerNum: number, workerFactory: WorkerFactory) {
4967
if (workerNum < 1) throw new errors.ErrorWorkerPoolInvalidWorkers();
5068
this.workerFactory = workerFactory;
@@ -71,6 +89,14 @@ class WorkerPool {
7189
});
7290
}
7391

92+
/**
93+
* Adds a new worker to the worker pool by initializing its lifecycle handlers.
94+
*
95+
* This method creates a worker instance using the `workerFactory`, sets up its message and error event handlers,
96+
* and handles worker initialization, error propagation, task completion, and cleanup when the worker exits.
97+
* The initialized worker is stored in the list of free workers upon success.
98+
*
99+
*/
74100
protected addWorker() {
75101
const worker = this.workerFactory();
76102
let workerError: Error;
@@ -133,6 +159,12 @@ class WorkerPool {
133159
this.$workerCreated.next();
134160
}
135161

162+
/**
163+
* Executes a task using an available worker. If no workers are free, the task is added to the queue.
164+
*
165+
* @param task - The task information to be executed by a worker, including type, data, and transfer list.
166+
* @param callback - The callback function to handle the outcome of the executed task.
167+
*/
136168
public runTask(task: WorkerTaskInformation, callback: TaskCallback) {
137169
if (this.terminatedError != null) throw this.terminatedError;
138170
if (this.freeWorkers.length === 0) {
@@ -151,6 +183,12 @@ class WorkerPool {
151183
);
152184
}
153185

186+
/**
187+
* Terminates the worker pool by stopping all workers, preventing new tasks, and cleaning up resources.
188+
*
189+
* @param force - Determines whether to force termination immediately (true) or wait for existing tasks to complete (false).
190+
* @return A promise that resolves when all workers are terminated and resources are cleaned up.
191+
*/
154192
public async terminate(force: boolean) {
155193
if (this.terminatedError != null) return;
156194
this.terminatedError = new errors.ErrorWorkerPoolWorkerTerminated();
@@ -184,42 +222,59 @@ class WorkerPool {
184222
}
185223

186224
/**
187-
* Returns a promise that will resolve once all tasks have been completed.
188-
* Will reject if a task failed.
225+
* Returns a promise that resolves when the pool status becomes 'idle',
226+
* or rejects if the pool status changes to 'terminated' or an error occurs.
227+
*
228+
* @return A promise that resolves when the pool is idle or rejects with an error.
189229
*/
190230
public completed(): Promise<void> {
191231
return new Promise<void>((resolve, reject) => {
192-
if (this.poolStatus === 'idle' || this.poolStatus === 'terminated') {
193-
return resolve();
232+
if (this.poolStatus === 'idle') return resolve();
233+
if (this.poolStatus === 'terminated') {
234+
return reject(this.terminatedError!);
194235
}
195236
const errorSubscription = this.$workerError.subscribe((e) => {
196237
errorSubscription.unsubscribe();
197238
stateSubscription.unsubscribe();
198239
reject(e);
199240
});
200241
const stateSubscription = this.$poolStatus.subscribe((v) => {
201-
if (v === 'idle' || this.poolStatus === 'terminated') {
242+
if (v === 'idle') {
202243
errorSubscription.unsubscribe();
203244
stateSubscription.unsubscribe();
204245
return resolve();
205246
}
247+
if (v === 'terminated') {
248+
errorSubscription.unsubscribe();
249+
stateSubscription.unsubscribe();
250+
return reject(this.terminatedError!);
251+
}
206252
});
207253
});
208254
}
209255

210256
/**
211-
* Returns a promise that will resolve once all tasks have been completed.
257+
* Returns a promise that resolves when the pool status becomes 'idle',
258+
* or rejects if the pool status becomes 'terminated'.
259+
*
260+
* @return A promise that resolves once the pool status is 'idle',
261+
* or rejects if the pool status becomes 'terminated'.
212262
*/
213263
public settled(): Promise<void> {
214-
return new Promise<void>((resolve) => {
215-
if (this.poolStatus === 'idle' || this.poolStatus === 'terminated') {
216-
return resolve();
264+
return new Promise<void>((resolve, reject) => {
265+
if (this.poolStatus === 'idle') return resolve();
266+
if (this.poolStatus === 'terminated') {
267+
return reject(this.terminatedError!);
217268
}
218269
const subscription = this.$poolStatus.subscribe((v) => {
219-
if (v === 'idle' || this.poolStatus === 'terminated') {
270+
if (v === 'idle') {
220271
subscription.unsubscribe();
221272
return resolve();
222273
}
274+
if (v === 'terminated') {
275+
subscription.unsubscribe();
276+
return reject(this.terminatedError!);
277+
}
223278
});
224279
});
225280
}

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
export { default as WorkerManager } from './WorkerManager.js';
2+
export { expose } from './expose.js';
23
export * as errors from './errors.js';
4+
export * from './types.js';

src/worker.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
// This is an example worker script
2-
31
import type { WorkerManifest } from './types.js';
42
import { expose } from './expose.js';
53

4+
/**
5+
* This is an example worker script used purely for example and internal testing
6+
*/
67
const worker = {
78
test: async (): Promise<{ data: 'hello world!' }> => {
89
return { data: 'hello world!' };

tests/WorkerPool.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ describe('WorkerPool', () => {
8989
return resolve(result);
9090
});
9191
});
92-
await expect(taskP).rejects.toThrow();
93-
await pool.settled();
92+
await expect(taskP).rejects.toThrow(errors.ErrorWorkerPoolWorkerCreationFailed);
93+
await expect(pool.settled()).rejects.toThrow(errors.ErrorWorkerPoolWorkerCreationFailed);
94+
await expect(pool.completed()).rejects.toThrow(errors.ErrorWorkerPoolWorkerCreationFailed);
9495
await pool.terminate(false);
9596
});
9697
});

0 commit comments

Comments
 (0)