Skip to content

Commit 3239b18

Browse files
committed
wip: fixes and adding transferable support
[ci skip]
1 parent ae049cd commit 3239b18

File tree

8 files changed

+274
-245
lines changed

8 files changed

+274
-245
lines changed

src/WorkerManager.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ class WorkerManager {
6363
}
6464

6565
@ready(new errors.ErrorWorkerManagerDestroyed())
66-
public async call<T>(task: WorkerTask): Promise<T> {
66+
public async call(task: WorkerTask): Promise<unknown> {
6767
return await this.queue(task);
6868
}
6969

7070
@ready(new errors.ErrorWorkerManagerDestroyed())
71-
public queue<T>(task: WorkerTask): Promise<T> {
72-
return new Promise<T>((resolve, reject) => {
71+
public queue(task: WorkerTask): Promise<unknown> {
72+
return new Promise((resolve, reject) => {
7373
this.pool.runTask(task, (result, error) => {
7474
if (error != null) return reject(error);
7575
return resolve(result);

src/WorkerPool.ts

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
import type { Worker } from 'node:worker_threads';
22
import type { Subscription } from 'rxjs';
3-
import type { WorkerFactory, WorkerTask } from './types.js';
3+
import type {
4+
TaskCallback,
5+
WorkerFactory,
6+
WorkerResultInternal,
7+
WorkerTask,
8+
WorkerTaskInput,
9+
} from './types.js';
410
import { Subject } from 'rxjs';
11+
import { WorkerFunction, WorkerResult } from './types.js';
12+
import * as errors from './errors.js';
513

614
const taskInfoSymbol = Symbol('Task Info Symbol');
7-
type TaskCallback = (result: any | undefined, error: Error | undefined) => void;
815
type PoolStatus = 'idle' | 'working' | 'queued';
916

1017
class WorkerPool {
@@ -23,7 +30,7 @@ class WorkerPool {
2330
public poolStatus: PoolStatus = 'idle';
2431

2532
constructor(workerNum: number, workerFactory: WorkerFactory) {
26-
if (workerNum < 1) throw Error('TMP IMP must be at least 1 worker');
33+
if (workerNum < 1) throw new errors.ErrorWorkerPoolInvalidWorkers();
2734
this.workerFactory = workerFactory;
2835
for (let i = 0; i < workerNum; i++) {
2936
this.addWorker();
@@ -50,9 +57,9 @@ class WorkerPool {
5057

5158
protected addWorker() {
5259
const worker = this.workerFactory();
53-
const messageHandler = (result: { result?: unknown; error?: Error }) => {
60+
const messageHandler = (result: WorkerResultInternal) => {
5461
if (result.error != null) worker[taskInfoSymbol](undefined, result.error);
55-
else worker[taskInfoSymbol](result.result);
62+
else worker[taskInfoSymbol](result.data, undefined);
5663
worker[taskInfoSymbol] = undefined;
5764
this.freeWorkers.push(worker);
5865
this.$workerFreed.next();
@@ -83,7 +90,7 @@ class WorkerPool {
8390
this.$workerFreed.next();
8491
}
8592

86-
public runTask(task: any, callback: any) {
93+
public runTask(task: WorkerTaskInput, callback: TaskCallback) {
8794
if (this.terminating) throw Error('TMP IMP terminating');
8895
if (this.freeWorkers.length === 0) {
8996
this.queue.push({ task, callback });
@@ -95,33 +102,13 @@ class WorkerPool {
95102
const worker = this.freeWorkers.pop()!;
96103
if (wasIdle) this.$poolStatus.next('working');
97104
worker[taskInfoSymbol] = callback;
98-
worker.postMessage(task);
105+
worker.postMessage({ type: task.type, data: task.data }, task.transferList);
99106
}
100107

101108
public async terminate(force: boolean) {
102109
this.terminating = true;
103-
if (!force) {
104-
// Prevent new tasks and wait for exising queue to drain
105-
await new Promise<void>((resolve) => {
106-
if (
107-
this.freeWorkers.length === this.workers.size &&
108-
this.queue.length === 0
109-
) {
110-
return resolve();
111-
}
112-
const subscription = this.$workerFreed.subscribe(() => {
113-
// Wait for the queue to drain and the workers to free
114-
if (
115-
this.freeWorkers.length === this.workers.size &&
116-
this.queue.length === 0
117-
) {
118-
subscription.unsubscribe();
119-
resolve();
120-
}
121-
});
122-
});
123-
}
124-
110+
// Prevent new tasks and wait for exising queue to drain
111+
if (!force) await this.settled();
125112
// Prevent terminations from creating new workers
126113
this.handleDestroySubscription.unsubscribe();
127114
const workerTerminatePs: Array<Promise<number>> = [];

src/errors.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ class ErrorWorkerManagerDestroyed<T> extends ErrorWorkerManager<T> {
88
static description = 'WorkerManager is destroyed';
99
}
1010

11+
class ErrorWorkerPool<T> extends AbstractError<T> {
12+
static description = 'WorkerPool error';
13+
}
14+
15+
class ErrorWorkerPoolInvalidWorkers<T> extends ErrorWorkerManager<T> {
16+
static description = 'You must have at minimum 1 worker';
17+
}
18+
1119
class ErrorWorker<T> extends AbstractError<T> {
1220
static description = 'Worker error';
1321
}
@@ -19,6 +27,8 @@ class ErrorWorkerHandlerMissing<T> extends ErrorWorkerManager<T> {
1927
export {
2028
ErrorWorkerManager,
2129
ErrorWorkerManagerDestroyed,
30+
ErrorWorkerPool,
31+
ErrorWorkerPoolInvalidWorkers,
2232
ErrorWorker,
2333
ErrorWorkerHandlerMissing,
2434
};

src/expose.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import type { WorkerTask, WorkerManifest } from '#types.js';
1+
import type {
2+
WorkerTask,
3+
WorkerManifest,
4+
WorkerResult,
5+
WorkerResultInternal,
6+
} from '#types.js';
27
import { isMainThread, parentPort, threadId } from 'node:worker_threads';
38
import * as workerErrors from '#errors.js';
49

@@ -18,14 +23,14 @@ function expose(workerManifest: WorkerManifest) {
1823
});
1924
return;
2025
}
21-
let result: unknown;
26+
let result: WorkerResultInternal;
2227
try {
2328
result = await method(task.data);
2429
} catch (error) {
2530
parentPort!.postMessage({ error });
2631
return;
2732
}
28-
parentPort!.postMessage({ result });
33+
parentPort!.postMessage({ data: result.data }, result.transferList);
2934
};
3035
const handleMessageError = (e_) => {
3136
// Do nothing for now

src/types.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,35 @@
1-
import type { Worker } from 'node:worker_threads';
1+
import type { TransferListItem, Worker } from 'node:worker_threads';
22

33
type WorkerFactory = () => Worker;
44

5-
type WorkerTask<T extends string = string, D extends any = any> = {
5+
type WorkerTask<T extends string = string, D = unknown> = {
66
type: T;
77
data: D;
88
};
99

10-
type WorkerFunction<I = any, O = any> = (data: I) => Promise<O>;
10+
type WorkerTaskInput = WorkerTask & {
11+
transferList?: Array<TransferListItem>;
12+
};
13+
14+
type WorkerFunction<I = unknown, O = unknown> = (
15+
data: I,
16+
) => Promise<WorkerResult<O>>;
17+
type WorkerResult<O = unknown> = { data: O };
18+
type WorkerResultInternal<O = unknown> = WorkerResult<O> & {
19+
error?: Error;
20+
transferList?: Array<TransferListItem>;
21+
};
1122
type WorkerManifest = Record<string, WorkerFunction>;
1223

13-
export type { WorkerFactory, WorkerTask, WorkerFunction, WorkerManifest };
24+
type TaskCallback<O = unknown> = (result: O, error?: Error) => void;
25+
26+
export type {
27+
WorkerFactory,
28+
WorkerTask,
29+
WorkerTaskInput,
30+
WorkerFunction,
31+
WorkerResult,
32+
WorkerResultInternal,
33+
WorkerManifest,
34+
TaskCallback,
35+
};

src/worker.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,28 @@ import { expose } from './expose.js';
55

66
const worker = {
77
test: async () => {
8-
return 'hello world!';
8+
return { data: 'hello world!' };
99
},
10-
add: async (data: { a: number; b: number }): Promise<number> => {
11-
return data.a + data.b;
10+
add: async (data: { a: number; b: number }) => {
11+
return { data: data.a + data.b };
1212
},
13-
sub: async (data: { a: number; b: number }): Promise<number> => {
14-
return data.a - data.b;
13+
sub: async (data: { a: number; b: number }) => {
14+
return { data: data.a - data.b };
1515
},
16-
fac: async (data: number): Promise<number> => {
16+
fac: async (data: number) => {
1717
let acc = 1;
1818
for (let i = 1; i <= data; i++) {
1919
acc = acc * i;
2020
}
21-
return acc;
21+
return { data: acc };
2222
},
2323
sleep: async (data: number) => {
2424
await new Promise<void>((resolve) => {
2525
const timer = setTimeout(resolve, data);
2626
// Prevent holding the process open
2727
timer.unref();
2828
});
29+
return { data: undefined };
2930
},
3031
} satisfies WorkerManifest;
3132

0 commit comments

Comments
 (0)