Skip to content

Commit 3d69558

Browse files
committed
wip: progressing and functional test worker
[ci skip]
1 parent 0cc20b5 commit 3d69558

File tree

6 files changed

+55
-40
lines changed

6 files changed

+55
-40
lines changed

package-lock.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
"bench": "tsc -p ./tsconfig.build.json && shx rm -rf ./benches/results && tsx ./benches/index.ts"
3838
},
3939
"dependencies": {
40-
"@matrixai/async-init": "^2.0.0",
41-
"@matrixai/errors": "^2.0.1",
42-
"@matrixai/logger": "^4.0.1",
40+
"@matrixai/async-init": "^2.1.2",
41+
"@matrixai/errors": "^2.1.3",
42+
"@matrixai/logger": "^4.0.3",
4343
"ts-node": "^10.9.1",
4444
"rxjs": "^7.8.2"
4545
},

src/WorkerPool.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ class WorkerPool {
5050

5151
protected addWorker() {
5252
const worker = this.workerFactory();
53-
const messageHandler = (result: unknown) => {
54-
worker[taskInfoSymbol](result, undefined);
53+
const messageHandler = (result: { result?: unknown; error?: Error }) => {
54+
if (result.error != null) worker[taskInfoSymbol](undefined, result.error);
55+
else worker[taskInfoSymbol](result.result);
5556
worker[taskInfoSymbol] = undefined;
5657
this.freeWorkers.push(worker);
5758
this.$workerFreed.next();
@@ -99,7 +100,6 @@ class WorkerPool {
99100

100101
public async terminate(force: boolean) {
101102
this.terminating = true;
102-
this.handleDestroySubscription.unsubscribe();
103103
if (!force) {
104104
// Prevent new tasks and wait for exising queue to drain
105105
await new Promise<void>((resolve) => {
@@ -122,6 +122,8 @@ class WorkerPool {
122122
});
123123
}
124124

125+
// Prevent terminations from creating new workers
126+
this.handleDestroySubscription.unsubscribe();
125127
const workerTerminatePs: Array<Promise<number>> = [];
126128
for (const worker of this.workers) {
127129
workerTerminatePs.push(worker.terminate());

src/expose.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,32 @@ function expose(workerManifest: WorkerManifest) {
1010
// We don't want to run this in the main thread since it's not acting as a worker
1111
if (isMainThread) return;
1212
const handleMessage = async (task: WorkerTask) => {
13-
console.log(task);
13+
console.log('got task!', task);
1414
const method = workerManifest[task.type];
1515
if (method == null) {
16-
const e = Error('TMP IMP missing handler');
17-
console.error(e);
18-
throw e;
16+
parentPort!.postMessage({ error: Error('TMP IMP missing handler') });
17+
return;
1918
}
20-
const result = await method(task);
21-
parentPort?.postMessage(result);
19+
let result: unknown;
20+
try {
21+
result = await method(task.data);
22+
} catch (error) {
23+
parentPort!.postMessage({ error });
24+
return;
25+
}
26+
parentPort!.postMessage({ result });
2227
};
2328
const handleMessageError = (e) => {
24-
console.error(`Worker ${threadId} got error`, e);
29+
console.log(`Worker ${threadId} got error`, e);
2530
};
31+
parentPort!.on('message', handleMessage);
32+
parentPort!.on('messageerror', handleMessageError);
2633
parentPort!.once('close', () => {
2734
console.log(`Worker ${threadId} closed!`);
2835
parentPort!.off('message', handleMessage);
2936
parentPort!.off('messageerror', handleMessageError);
3037
});
38+
console.log('expose done');
3139
}
3240

3341
export { expose };

src/worker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const worker = {
88
return 'hello world!';
99
},
1010
add: async (data: { a: number; b: number }): Promise<number> => {
11+
console.log(data);
1112
return data.a + data.b;
1213
},
1314
sub: async (data: { a: number; b: number }): Promise<number> => {

tests/WorkerPool.test.ts

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,53 @@
11
import type { WorkerFactory } from '#types.js';
22
import { Worker } from 'node:worker_threads';
3-
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
3+
// Import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
4+
import path from 'node:path';
45
import WorkerPool from '#WorkerPool.js';
56

7+
async function sleep(ms: number) {
8+
return new Promise((resolve) => {
9+
setTimeout(resolve, ms);
10+
});
11+
}
12+
613
describe('WorkerPool', () => {
7-
const logger = new Logger('WorkerPool Test', LogLevel.WARN, [
8-
new StreamHandler(),
9-
]);
14+
// Const logger = new Logger('WorkerPool Test', LogLevel.WARN, [
15+
// new StreamHandler(),
16+
// ]);
1017

11-
const delayedEchoScript = `
12-
const nodeWorkers = require("node:worker_threads");
13-
console.log("Hello from worker!: ", nodeWorkers.isMainThread, nodeWorkers.threadId);
14-
nodeWorkers.parentPort.on('message', async (v) => {
15-
// console.log('message', v);
16-
await new Promise(resolve => {
17-
setTimeout(resolve, 10);
18-
})
19-
nodeWorkers.parentPort.postMessage(v);
20-
});
21-
`;
18+
let pool: WorkerPool;
2219

2320
const workerFactory: WorkerFactory = () => {
24-
return new Worker(delayedEchoScript, { eval: true });
21+
return new Worker(path.join(__dirname, '../dist/worker.js'));
2522
};
2623

24+
afterEach(async () => {
25+
await pool.terminate(true);
26+
console.log('DONE!');
27+
});
28+
2729
test('asd', async () => {
28-
const pool = new WorkerPool(3, workerFactory);
29-
// Pool.$poolStatus.subscribe((v) => {
30-
// console.log('!queue state!', v);
31-
// });
30+
pool = new WorkerPool(3, workerFactory);
31+
pool.$poolStatus.subscribe((v) => {
32+
console.log('!queue state!', v);
33+
});
3234
let resolveP;
3335
const taskP = new Promise((resolve) => {
3436
resolveP = resolve;
3537
});
3638
let doneCount = 0;
3739
const numTasks = 20;
3840
for (let i = 0; i < numTasks; i++) {
39-
pool.runTask({ method: 'one', data: 1 }, (result, error) => {
40-
// Console.log(result, error);
41+
pool.runTask({ type: 'fac', data: 5 }, (result, error) => {
42+
console.log({ result, error });
4143
doneCount++;
4244
if (doneCount === numTasks) resolveP(result);
4345
});
4446
}
45-
const result = await taskP;
46-
// Console.log(result);
47+
console.log('waiting');
48+
await taskP;
49+
console.log('waiting done');
4750
await pool.terminate(false);
51+
console.log('term done');
4852
});
4953
});

0 commit comments

Comments
 (0)