Skip to content

Commit 69f890d

Browse files
committed
wip: working methods proxy
1 parent d567774 commit 69f890d

File tree

7 files changed

+88
-49
lines changed

7 files changed

+88
-49
lines changed

src/WorkerManager.ts

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import type { WorkerFactory, WorkerManifest } from './types.js';
1+
import type { WorkerFactory, WorkerManifest, WorkerResult } from './types.js';
2+
import type { TransferListItem } from 'node:worker_threads';
23
import Logger from '@matrixai/logger';
34
import { CreateDestroy, ready } from '@matrixai/async-init/CreateDestroy.js';
4-
import { type WorkerTaskInput } from './types.js';
5+
import { type WorkerTaskInformation } from './types.js';
56
import * as errors from './errors.js';
67
import WorkerPool from './WorkerPool.js';
78

@@ -42,9 +43,8 @@ class WorkerManager<T extends WorkerManifest> {
4243
}
4344

4445
protected pool: WorkerPool;
45-
protected manifest: T;
46-
public proxy: T;
4746
protected logger: Logger;
47+
public methods: T;
4848

4949
public constructor({
5050
workerFactory,
@@ -58,17 +58,18 @@ class WorkerManager<T extends WorkerManifest> {
5858
logger: Logger;
5959
}) {
6060
this.logger = logger;
61-
this.manifest = manifest;
6261
this.pool = new WorkerPool(cores, workerFactory);
63-
this.proxy = new Proxy(this.manifest, {
64-
apply: () => {
65-
throw Error('TMP IMP NEVER');
66-
},
67-
get: (target: T, prop) => {
62+
// TODO: Add support for running functions main thread?
63+
this.methods = new Proxy(manifest, {
64+
get: (target: T, prop: string | symbol) => {
6865
if (typeof prop === 'symbol') return;
69-
return (data, transferList) => {
70-
console.log(`${prop} called with `, data, transferList);
71-
return this.call({ type: prop, data, transferList });
66+
return async (
67+
data: WorkerResult,
68+
transferList: Array<TransferListItem>,
69+
) => {
70+
const result = await this.call({ type: prop, data, transferList });
71+
if (result.transferList == null) return { data: result.data };
72+
return result;
7273
};
7374
},
7475
});
@@ -83,12 +84,12 @@ class WorkerManager<T extends WorkerManifest> {
8384
}
8485

8586
@ready(new errors.ErrorWorkerManagerDestroyed())
86-
public async call(task: WorkerTaskInput): Promise<unknown> {
87+
public async call(task: WorkerTaskInformation): Promise<WorkerResult> {
8788
return await this.queue(task);
8889
}
8990

9091
@ready(new errors.ErrorWorkerManagerDestroyed())
91-
public queue(task: WorkerTaskInput): Promise<unknown> {
92+
public queue(task: WorkerTaskInformation): Promise<WorkerResult> {
9293
return new Promise((resolve, reject) => {
9394
this.pool.runTask(task, (result, error) => {
9495
if (error != null) return reject(error);

src/WorkerPool.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import type {
55
WorkerFactory,
66
WorkerResultInternal,
77
WorkerTaskInformation,
8-
WorkerTaskInput,
98
} from './types.js';
109
import { AsyncResource } from 'node:async_hooks';
1110
import { Subject } from 'rxjs';
@@ -90,7 +89,10 @@ class WorkerPool {
9089
if (result.error != null) {
9190
worker[taskInfoSymbol].done(undefined, result.error);
9291
} else {
93-
worker[taskInfoSymbol].done(result.data, undefined);
92+
worker[taskInfoSymbol].done(
93+
{ data: result.data, transferList: result.transferList },
94+
undefined,
95+
);
9496
}
9597
worker[taskInfoSymbol] = undefined;
9698
this.freeWorkers.push(worker);
@@ -136,7 +138,7 @@ class WorkerPool {
136138
this.$workerCreated.next();
137139
}
138140

139-
public runTask(task: WorkerTaskInput, callback: TaskCallback) {
141+
public runTask(task: WorkerTaskInformation, callback: TaskCallback) {
140142
if (this.terminatedError != null) throw this.terminatedError;
141143
if (this.freeWorkers.length === 0) {
142144
this.queue.push({ task, callback });
@@ -148,7 +150,10 @@ class WorkerPool {
148150
const worker = this.freeWorkers.pop()!;
149151
if (wasIdle) this.$poolStatus.next('working');
150152
worker[taskInfoSymbol] = new WorkerTask(callback);
151-
worker.postMessage({ type: task.type, data: task.data }, task.transferList);
153+
worker.postMessage(
154+
{ type: task.type, data: task.data, transferList: task.transferList },
155+
task.transferList,
156+
);
152157
}
153158

154159
public async terminate(force: boolean) {

src/expose.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ function expose(workerManifest: WorkerManifest) {
3333
parentPort!.postMessage({ error });
3434
return;
3535
}
36-
parentPort!.postMessage({ data: result.data }, result.transferList);
36+
parentPort!.postMessage(
37+
{ data: result.data, transferList: result.transferList },
38+
result.transferList,
39+
);
3740
};
3841
parentPort!.on('message', handleMessage);
3942
parentPort!.once('close', () => {

src/types.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,32 @@ type WorkerFactory = () => Worker;
55
type WorkerTaskInformation<T extends string = string, D = unknown> = {
66
type: T;
77
data: D;
8-
};
9-
10-
type WorkerTaskInput = WorkerTaskInformation & {
118
transferList?: Array<TransferListItem>;
129
};
1310

11+
// TODO: structure this like a normal function as much as possible.
12+
// TODO: limit types to primitives and transferrables.
1413
type WorkerFunction<I = unknown, O = unknown> = (
1514
data: I,
1615
transferList?: Array<TransferListItem>,
1716
) => Promise<WorkerResult<O>>;
18-
type WorkerResult<O = unknown> = { data: O };
17+
type WorkerResult<O = unknown> = {
18+
data: O;
19+
transferList?: Array<TransferListItem>;
20+
};
1921
type WorkerResultInternal<O = unknown> = WorkerResult<O> & {
2022
error?: Error;
21-
transferList?: Array<TransferListItem>;
2223
};
2324
type WorkerManifest = Record<string, WorkerFunction>;
2425

25-
type TaskCallback<O = unknown> = (result: O, error?: Error) => void;
26+
type TaskCallback<O = unknown> = (
27+
result: WorkerResult<O>,
28+
error?: Error,
29+
) => void;
2630

2731
export type {
2832
WorkerFactory,
2933
WorkerTaskInformation,
30-
WorkerTaskInput,
3134
WorkerFunction,
3235
WorkerResult,
3336
WorkerResultInternal,

src/worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { WorkerManifest } from '#types.js';
44
import { expose } from './expose.js';
55

66
const worker = {
7-
test: async () => {
7+
test: async (): Promise<{ data: 'hello world!' }> => {
88
return { data: 'hello world!' };
99
},
1010
add: async (data: { a: number; b: number }) => {

tests/WorkerManager.test.ts

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { WorkerFactory } from '#types.js';
1+
import type { WorkerFactory, WorkerResult } from '#types.js';
22
import { Worker } from 'node:worker_threads';
33
import url from 'url';
44
import path from 'node:path';
@@ -19,7 +19,7 @@ describe('WorkerManager', () => {
1919
return new Worker(path.join(dirname, '../dist/worker.js'));
2020
};
2121

22-
let workerManager: WorkerManager;
22+
let workerManager: WorkerManager<typeof workerManifest>;
2323

2424
afterEach(async () => {
2525
await workerManager?.destroy();
@@ -28,12 +28,13 @@ describe('WorkerManager', () => {
2828
test('async construction and async destroy', async () => {
2929
workerManager = await WorkerManager.createWorkerManager({
3030
workerFactory,
31+
manifest: workerManifest,
3132
cores: 1,
3233
logger,
3334
});
3435
expect(workerManager[destroyed]).toBe(false);
35-
expect(await workerManager.call({ type: 'test', data: undefined })).toBe(
36-
'hello world!',
36+
expect(await workerManager.call({ type: 'test', data: undefined })).toEqual(
37+
{ data: 'hello world!' },
3738
);
3839
await workerManager.destroy();
3940
expect(workerManager[destroyed]).toBe(true);
@@ -45,6 +46,7 @@ describe('WorkerManager', () => {
4546
await expect(
4647
WorkerManager.createWorkerManager({
4748
workerFactory,
49+
manifest: workerManifest,
4850
cores: 0,
4951
logger,
5052
}),
@@ -53,11 +55,12 @@ describe('WorkerManager', () => {
5355
test('start with 1 worker core', async () => {
5456
workerManager = await WorkerManager.createWorkerManager({
5557
workerFactory,
58+
manifest: workerManifest,
5659
cores: 1,
5760
logger,
5861
});
59-
expect(await workerManager.call({ type: 'test', data: undefined })).toBe(
60-
'hello world!',
62+
expect(await workerManager.call({ type: 'test', data: undefined })).toEqual(
63+
{ data: 'hello world!' },
6164
);
6265
await workerManager.destroy();
6366
});
@@ -66,6 +69,7 @@ describe('WorkerManager', () => {
6669
// if you only use 1 core, this test will be much slower
6770
workerManager = await WorkerManager.createWorkerManager({
6871
workerFactory,
72+
manifest: workerManifest,
6973
cores: 1,
7074
logger,
7175
});
@@ -77,16 +81,17 @@ describe('WorkerManager', () => {
7781
}
7882
const rs = await Promise.all(tasks);
7983
expect(rs.length).toBe(taskCount);
80-
expect(rs.every((x) => x === undefined)).toBe(true);
84+
expect(rs.every((x: WorkerResult) => x.data === undefined)).toBe(true);
8185
const r = await task;
82-
expect(r).toBeUndefined();
86+
expect(r).toEqual({ data: undefined });
8387
await workerManager.destroy();
8488
});
8589
test('queueing up tasks', async () => {
8690
// Use all possible cores
8791
// if you only use 1 core, this test will be much slower
8892
workerManager = await WorkerManager.createWorkerManager({
8993
workerFactory,
94+
manifest: workerManifest,
9095
cores: 1,
9196
logger,
9297
});
@@ -95,10 +100,10 @@ describe('WorkerManager', () => {
95100
const t3 = workerManager.queue({ type: 'sleep', data: 500 });
96101
const t4 = workerManager.queue({ type: 'sleep', data: 500 });
97102
await workerManager.completed();
98-
expect(await t1).toBeUndefined();
99-
expect(await t2).toBeUndefined();
100-
expect(await t3).toBeUndefined();
101-
expect(await t4).toBeUndefined();
103+
expect(await t1).toEqual({ data: undefined });
104+
expect(await t2).toEqual({ data: undefined });
105+
expect(await t3).toEqual({ data: undefined });
106+
expect(await t4).toEqual({ data: undefined });
102107
void workerManager.queue({ type: 'sleep', data: 500 });
103108
void workerManager.queue({ type: 'sleep', data: 500 });
104109
void workerManager.queue({ type: 'sleep', data: 500 });
@@ -109,6 +114,7 @@ describe('WorkerManager', () => {
109114
test('zero-copy buffer transfer', async () => {
110115
workerManager = await WorkerManager.createWorkerManager({
111116
workerFactory,
117+
manifest: workerManifest,
112118
cores: 1,
113119
logger,
114120
});
@@ -128,7 +134,7 @@ describe('WorkerManager', () => {
128134
// The input ArrayBuffer is detached so the length is now 0
129135
expect(input.byteLength).toBe(0);
130136
// The output should be filled with 0xF
131-
expect(Buffer.from(output as ArrayBuffer)).toEqual(
137+
expect(Buffer.from(output.data as ArrayBuffer)).toEqual(
132138
Buffer.alloc(inputBuffer.byteLength, 0xf),
133139
);
134140

@@ -144,11 +150,23 @@ describe('WorkerManager', () => {
144150
manifest: workerManifest,
145151
logger,
146152
});
153+
expect(await workerManager.methods.test()).toEqual({
154+
data: 'hello world!',
155+
});
156+
expect(await workerManager.methods.add({ a: 1, b: 2 })).toEqual({
157+
data: 3,
158+
});
159+
expect(await workerManager.methods.sub({ a: 1, b: 2 })).toEqual({
160+
data: -1,
161+
});
162+
expect(await workerManager.methods.fac(5)).toEqual({ data: 120 });
163+
expect(await workerManager.methods.sleep(10)).toEqual({ data: undefined });
147164
const arrayBuffer = new ArrayBuffer(100);
148-
const result = await workerManager.proxy.transferBuffer(arrayBuffer, [
165+
const result = await workerManager.methods.transferBuffer(arrayBuffer, [
149166
arrayBuffer,
150167
]);
151-
console.log(result);
168+
expect(new Uint8Array(result.data)).toEqual(new Uint8Array(100).fill(0xf));
169+
152170
await workerManager.destroy();
153171
});
154172
});

tests/WorkerPool.test.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,38 +46,47 @@ describe('WorkerPool', () => {
4646
return resolve(result);
4747
});
4848
});
49-
await expect(task1).resolves.toBe('hello world!');
49+
await expect(task1).resolves.toEqual({
50+
data: 'hello world!',
51+
transferList: undefined,
52+
});
5053

5154
const task2 = new Promise((resolve, reject) => {
5255
pool.runTask({ type: 'add', data: { a: 2, b: 2 } }, (result, error) => {
5356
if (error != null) return reject(error);
5457
return resolve(result);
5558
});
5659
});
57-
await expect(task2).resolves.toBe(4);
60+
await expect(task2).resolves.toEqual({ data: 4, transferList: undefined });
5861

5962
const task3 = new Promise((resolve, reject) => {
6063
pool.runTask({ type: 'fac', data: 5 }, (result, error) => {
6164
if (error != null) return reject(error);
6265
return resolve(result);
6366
});
6467
});
65-
await expect(task3).resolves.toBe(120);
68+
await expect(task3).resolves.toEqual({
69+
data: 120,
70+
transferList: undefined,
71+
});
6672

6773
const task4 = new Promise((resolve, reject) => {
6874
pool.runTask({ type: 'sleep', data: 10 }, (result, error) => {
6975
if (error != null) return reject(error);
7076
return resolve(result);
7177
});
7278
});
73-
await expect(task4).resolves.toBe(undefined);
79+
await expect(task4).resolves.toEqual({
80+
data: undefined,
81+
transferList: undefined,
82+
});
7483
});
7584
test('WorkerPool handles failure to create worker', async () => {
7685
pool = new WorkerPool(3, () => new Worker('./badPath'));
77-
const taskP = new Promise<string>((resolve, reject) => {
86+
const taskP = new Promise<unknown>((resolve, reject) => {
7887
pool.runTask({ type: 'test', data: undefined }, (result, error) => {
7988
if (error != null) return reject(error);
80-
return resolve(result as string);
89+
return resolve(result);
8190
});
8291
});
8392
await expect(taskP).rejects.toThrow();

0 commit comments

Comments
 (0)