Skip to content

Commit ae049cd

Browse files
committed
wip: expanding WorkerPool tests
[ci skip]
1 parent 3d69558 commit ae049cd

File tree

5 files changed

+82
-32
lines changed

5 files changed

+82
-32
lines changed

src/errors.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,17 @@ class ErrorWorkerManagerDestroyed<T> extends ErrorWorkerManager<T> {
88
static description = 'WorkerManager is destroyed';
99
}
1010

11-
export { ErrorWorkerManager, ErrorWorkerManagerDestroyed };
11+
class ErrorWorker<T> extends AbstractError<T> {
12+
static description = 'Worker error';
13+
}
14+
15+
class ErrorWorkerHandlerMissing<T> extends ErrorWorkerManager<T> {
16+
static description = 'Worker has not exposed a handler of this name';
17+
}
18+
19+
export {
20+
ErrorWorkerManager,
21+
ErrorWorkerManagerDestroyed,
22+
ErrorWorker,
23+
ErrorWorkerHandlerMissing,
24+
};

src/expose.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { WorkerTask, WorkerManifest } from '#types.js';
22
import { isMainThread, parentPort, threadId } from 'node:worker_threads';
3+
import * as workerErrors from '#errors.js';
34

45
/**
56
* Call inside a worker to set it up and expose the supported worker functions.
@@ -10,10 +11,11 @@ function expose(workerManifest: WorkerManifest) {
1011
// We don't want to run this in the main thread since it's not acting as a worker
1112
if (isMainThread) return;
1213
const handleMessage = async (task: WorkerTask) => {
13-
console.log('got task!', task);
1414
const method = workerManifest[task.type];
1515
if (method == null) {
16-
parentPort!.postMessage({ error: Error('TMP IMP missing handler') });
16+
parentPort!.postMessage({
17+
error: new workerErrors.ErrorWorkerHandlerMissing(),
18+
});
1719
return;
1820
}
1921
let result: unknown;
@@ -25,17 +27,15 @@ function expose(workerManifest: WorkerManifest) {
2527
}
2628
parentPort!.postMessage({ result });
2729
};
28-
const handleMessageError = (e) => {
29-
console.log(`Worker ${threadId} got error`, e);
30+
const handleMessageError = (e_) => {
31+
// Do nothing for now
3032
};
3133
parentPort!.on('message', handleMessage);
3234
parentPort!.on('messageerror', handleMessageError);
3335
parentPort!.once('close', () => {
34-
console.log(`Worker ${threadId} closed!`);
3536
parentPort!.off('message', handleMessage);
3637
parentPort!.off('messageerror', handleMessageError);
3738
});
38-
console.log('expose done');
3939
}
4040

4141
export { expose };

src/types.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ type WorkerTask<T extends string = string, D extends any = any> = {
77
data: D;
88
};
99

10-
type WorkerFunction<I extends any = any, O extends any = any> = (
11-
data: I,
12-
) => Promise<O>;
10+
type WorkerFunction<I = any, O = any> = (data: I) => Promise<O>;
1311
type WorkerManifest = Record<string, WorkerFunction>;
1412

1513
export type { WorkerFactory, WorkerTask, WorkerFunction, WorkerManifest };

src/worker.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,29 @@ import type { WorkerManifest } from '#types.js';
44
import { expose } from './expose.js';
55

66
const worker = {
7-
test: async (data: void) => {
7+
test: async () => {
88
return 'hello world!';
99
},
1010
add: async (data: { a: number; b: number }): Promise<number> => {
11-
console.log(data);
1211
return data.a + data.b;
1312
},
1413
sub: async (data: { a: number; b: number }): Promise<number> => {
1514
return data.a - data.b;
1615
},
1716
fac: async (data: number): Promise<number> => {
1817
let acc = 1;
19-
for (let i = 1; i < data; i++) {
18+
for (let i = 1; i <= data; i++) {
2019
acc = acc * i;
2120
}
2221
return acc;
2322
},
23+
sleep: async (data: number) => {
24+
await new Promise<void>((resolve) => {
25+
const timer = setTimeout(resolve, data);
26+
// Prevent holding the process open
27+
timer.unref();
28+
});
29+
},
2430
} satisfies WorkerManifest;
2531

2632
expose(worker);

tests/WorkerPool.test.ts

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { Worker } from 'node:worker_threads';
33
// Import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
44
import path from 'node:path';
55
import WorkerPool from '#WorkerPool.js';
6+
import * as errors from '#errors.js';
67

78
async function sleep(ms: number) {
89
return new Promise((resolve) => {
@@ -22,32 +23,64 @@ describe('WorkerPool', () => {
2223
};
2324

2425
afterEach(async () => {
25-
await pool.terminate(true);
26-
console.log('DONE!');
26+
await pool?.terminate(true);
2727
});
2828

29-
test('asd', async () => {
29+
test('worker pool must have atleast 1 worker', async () => {
30+
expect(() => new WorkerPool(0, workerFactory)).toThrow(
31+
errors.ErrorWorkerHandlerMissing,
32+
);
33+
expect(() => new WorkerPool(-1, workerFactory)).toThrow(
34+
errors.ErrorWorkerHandlerMissing,
35+
);
36+
pool = new WorkerPool(1, workerFactory);
37+
await pool.terminate(true);
38+
});
39+
test('can run tasks', async () => {
3040
pool = new WorkerPool(3, workerFactory);
31-
pool.$poolStatus.subscribe((v) => {
32-
console.log('!queue state!', v);
33-
});
34-
let resolveP;
35-
const taskP = new Promise((resolve) => {
36-
resolveP = resolve;
37-
});
38-
let doneCount = 0;
3941
const numTasks = 20;
4042
for (let i = 0; i < numTasks; i++) {
41-
pool.runTask({ type: 'fac', data: 5 }, (result, error) => {
42-
console.log({ result, error });
43-
doneCount++;
44-
if (doneCount === numTasks) resolveP(result);
43+
pool.runTask({ type: 'test', data: undefined }, () => {
44+
// Do nothing
4545
});
4646
}
47-
console.log('waiting');
48-
await taskP;
49-
console.log('waiting done');
47+
await pool.settled();
5048
await pool.terminate(false);
51-
console.log('term done');
5249
});
50+
test('tasks return expected results', async () => {
51+
pool = new WorkerPool(1, workerFactory);
52+
const task1 = new Promise((resolve, reject) => {
53+
pool.runTask({ type: 'test', data: undefined }, (result, error) => {
54+
if (error != null) return reject(error);
55+
return resolve(result);
56+
});
57+
});
58+
await expect(task1).resolves.toBe('hello world!');
59+
60+
const task2 = new Promise((resolve, reject) => {
61+
pool.runTask({ type: 'add', data: { a: 2, b: 2 } }, (result, error) => {
62+
if (error != null) return reject(error);
63+
return resolve(result);
64+
});
65+
});
66+
await expect(task2).resolves.toBe(4);
67+
68+
const task3 = new Promise((resolve, reject) => {
69+
pool.runTask({ type: 'fac', data: 5 }, (result, error) => {
70+
if (error != null) return reject(error);
71+
return resolve(result);
72+
});
73+
});
74+
await expect(task3).resolves.toBe(120);
75+
76+
const task4 = new Promise((resolve, reject) => {
77+
pool.runTask({ type: 'sleep', data: 10 }, (result, error) => {
78+
if (error != null) return reject(error);
79+
return resolve(result);
80+
});
81+
});
82+
await expect(task4).resolves.toBe(undefined);
83+
});
84+
// TODO: zero copy of data
85+
// TODO: apply the async resource.
5386
});

0 commit comments

Comments
 (0)