Skip to content

Commit 9761519

Browse files
committed
wip: expanding tests
1 parent 4c36b86 commit 9761519

File tree

4 files changed

+83
-151
lines changed

4 files changed

+83
-151
lines changed

src/WorkerPool.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import type {
99
} from './types.js';
1010
import { AsyncResource } from 'node:async_hooks';
1111
import { Subject } from 'rxjs';
12-
import { WorkerFunction, WorkerResult } from './types.js';
1312
import * as errors from './errors.js';
1413

1514
class WorkerTask extends AsyncResource {

src/expose.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import type {
22
WorkerTaskInformation,
33
WorkerManifest,
4-
WorkerResult,
54
WorkerResultInternal,
65
} from '#types.js';
7-
import { isMainThread, parentPort, threadId } from 'node:worker_threads';
6+
import { isMainThread, parentPort } from 'node:worker_threads';
87
import * as workerErrors from '#errors.js';
98

109
/**
@@ -36,7 +35,8 @@ function expose(workerManifest: WorkerManifest) {
3635
}
3736
parentPort!.postMessage({ data: result.data }, result.transferList);
3837
};
39-
const handleMessageError = (e_) => {
38+
const handleMessageError = (e) => {
39+
console.error('Message error!', e);
4040
// Do nothing for now
4141
};
4242
parentPort!.on('message', handleMessage);

tests/WorkerManager.test.ts

Lines changed: 80 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ import url from 'url';
44
import path from 'node:path';
55
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
66
import { destroyed } from '@matrixai/async-init';
7-
import * as testUtils from './utils.js';
87
import WorkerManager from '#WorkerManager.js';
98
import * as errors from '#errors.js';
10-
import WorkerPool from '#WorkerPool.js';
119

1210
const dirname = url.fileURLToPath(new URL('.', import.meta.url));
1311

@@ -20,150 +18,97 @@ describe('WorkerManager', () => {
2018
return new Worker(path.join(dirname, '../dist/worker.js'));
2119
};
2220

21+
let workerManager: WorkerManager;
22+
23+
afterEach(async () => {
24+
await workerManager?.destroy();
25+
});
26+
2327
test('async construction and async destroy', async () => {
24-
const workerManager = await WorkerManager.createWorkerManager({
25-
workerFactory: () => new Worker('../src/worker'),
28+
workerManager = await WorkerManager.createWorkerManager({
29+
workerFactory,
30+
cores: 1,
2631
logger,
2732
});
28-
console.log('asd');
2933
expect(workerManager[destroyed]).toBe(false);
30-
console.log('asd');
31-
try {
32-
expect(await workerManager.call({ type: 'test', data: undefined })).toBe(
33-
'hello world!',
34-
);
35-
} catch (e) {
36-
console.error(e);
37-
throw e;
38-
}
39-
console.log('asd');
34+
expect(await workerManager.call({ type: 'test', data: undefined })).toBe(
35+
'hello world!',
36+
);
4037
await workerManager.destroy();
41-
console.log('asd');
4238
expect(workerManager[destroyed]).toBe(true);
43-
console.log('asd');
4439
await expect(
4540
workerManager.call({ type: 'test', data: undefined }),
4641
).rejects.toThrow(errors.ErrorWorkerManagerDestroyed);
47-
console.log('asd');
4842
});
49-
50-
// Test('starting with 0 worker cores is useless', async () => {
51-
// const workerManager = await WorkerManager.createWorkerManager<WorkerModule>(
52-
// {
53-
// workerFactory: () => spawn(new Worker('../src/worker')),
54-
// cores: 0,
55-
// logger,
56-
// },
57-
// );
58-
// // The call will never resolve, so we timeout in 1 second
59-
// expect(
60-
// await Promise.race([
61-
// workerManager.call(async () => 1),
62-
// testUtils.sleep(1000),
63-
// ]),
64-
// ).not.toBe(1);
65-
// // Force destory because of the pending call that never resolves
66-
// await workerManager.destroy({ force: true });
67-
// });
68-
69-
// test('start with 1 worker core', async () => {
70-
// const workerManager = await WorkerManager.createWorkerManager<WorkerModule>(
71-
// {
72-
// workerFactory: () => spawn(new Worker('../src/worker')),
73-
// cores: 1,
74-
// logger,
75-
// },
76-
// );
77-
// expect(await workerManager.call(async () => 1)).toBe(1);
78-
// await workerManager.destroy();
79-
// });
80-
81-
// test('call runs in the main thread', async () => {
82-
// const mainPid1 = process.pid;
83-
// const workerManager = await WorkerManager.createWorkerManager<WorkerModule>(
84-
// {
85-
// workerFactory: () => spawn(new Worker('../src/worker')),
86-
// cores: 1,
87-
// logger,
88-
// },
89-
// );
90-
// let mainPid2: number;
91-
// let mainPid3: number;
92-
// // Only `w.f()` functions are running in the worker threads
93-
// // the callback passed to `call` is still running in the main thread
94-
// expect(
95-
// await workerManager.call(async (w) => {
96-
// mainPid2 = process.pid;
97-
// const process2 = await import('node:process');
98-
// mainPid3 = process2.pid;
99-
// return await w.isRunningInWorker();
100-
// }),
101-
// ).toBe(true);
102-
// await workerManager.destroy();
103-
// expect(mainPid2!).toBe(mainPid1);
104-
// expect(mainPid3!).toBe(mainPid1);
105-
// });
106-
107-
// test('can await a subset of tasks', async () => {
108-
// // Use all possible cores
109-
// // if you only use 1 core, this test will be much slower
110-
// const workerManager = await WorkerManager.createWorkerManager<WorkerModule>(
111-
// {
112-
// workerFactory: () => spawn(new Worker('../src/worker')),
113-
// logger,
114-
// },
115-
// );
116-
// const task = workerManager.call(async (w) => {
117-
// return await w.sleep(500);
118-
// });
119-
// const taskCount = 5;
120-
// const tasks: Array<Promise<unknown>> = [];
121-
// for (let i = 0; i < taskCount; i++) {
122-
// tasks.push(
123-
// workerManager.call(async (w) => {
124-
// return await w.sleep(500);
125-
// }),
126-
// );
127-
// }
128-
// const rs = await Promise.all(tasks);
129-
// expect(rs.length).toBe(taskCount);
130-
// expect(rs.every((x) => x === undefined)).toBe(true);
131-
// const r = await task;
132-
// expect(r).toBeUndefined();
133-
// await workerManager.destroy();
134-
// });
135-
136-
// test('queueing up tasks', async () => {
137-
// // Use all possible cores
138-
// // if you only use 1 core, this test will be much slower
139-
// const workerManager = await WorkerManager.createWorkerManager<WorkerModule>(
140-
// {
141-
// workerFactory: () => spawn(new Worker('../src/worker')),
142-
// logger,
143-
// },
144-
// );
145-
// const t1 = workerManager.queue(async (w) => await w.sleep(500));
146-
// const t2 = workerManager.queue(async (w) => await w.sleep(500));
147-
// const t3 = workerManager.queue(async (w) => await w.sleep(500));
148-
// const t4 = workerManager.queue(async (w) => await w.sleep(500));
149-
// await workerManager.completed();
150-
// expect(await t1).toBeUndefined();
151-
// expect(await t2).toBeUndefined();
152-
// expect(await t3).toBeUndefined();
153-
// expect(await t4).toBeUndefined();
154-
// void workerManager.queue(async (w) => await w.sleep(500));
155-
// void workerManager.queue(async (w) => await w.sleep(500));
156-
// void workerManager.queue(async (w) => await w.sleep(500));
157-
// void workerManager.queue(async (w) => await w.sleep(500));
158-
// const es = await workerManager.settled();
159-
// expect(es.length).toBe(0);
160-
// await workerManager.destroy();
161-
// });
162-
163-
// test('zero-copy buffer transfer', async () => {
164-
// const workerManager = await WorkerManager.createWorkerManager<WorkerModule>(
43+
test('starting with 0 worker cores will throw', async () => {
44+
await expect(
45+
WorkerManager.createWorkerManager({
46+
workerFactory,
47+
cores: 0,
48+
logger,
49+
}),
50+
).rejects.toThrow(errors.ErrorWorkerPoolInvalidWorkers);
51+
});
52+
test('start with 1 worker core', async () => {
53+
workerManager = await WorkerManager.createWorkerManager({
54+
workerFactory,
55+
cores: 1,
56+
logger,
57+
});
58+
expect(await workerManager.call({ type: 'test', data: undefined })).toBe(
59+
'hello world!',
60+
);
61+
await workerManager.destroy();
62+
});
63+
test('can await a subset of tasks', async () => {
64+
// Use all possible cores
65+
// if you only use 1 core, this test will be much slower
66+
workerManager = await WorkerManager.createWorkerManager({
67+
workerFactory,
68+
cores: 1,
69+
logger,
70+
});
71+
const task = workerManager.call({ type: 'sleep', data: 500 });
72+
const taskCount = 5;
73+
const tasks: Array<Promise<unknown>> = [];
74+
for (let i = 0; i < taskCount; i++) {
75+
tasks.push(workerManager.call({ type: 'sleep', data: 500 }));
76+
}
77+
const rs = await Promise.all(tasks);
78+
expect(rs.length).toBe(taskCount);
79+
expect(rs.every((x) => x === undefined)).toBe(true);
80+
const r = await task;
81+
expect(r).toBeUndefined();
82+
await workerManager.destroy();
83+
});
84+
test('queueing up tasks', async () => {
85+
// Use all possible cores
86+
// if you only use 1 core, this test will be much slower
87+
workerManager = await WorkerManager.createWorkerManager({
88+
workerFactory,
89+
cores: 1,
90+
logger,
91+
});
92+
const t1 = workerManager.queue({ type: 'sleep', data: 500 });
93+
const t2 = workerManager.queue({ type: 'sleep', data: 500 });
94+
const t3 = workerManager.queue({ type: 'sleep', data: 500 });
95+
const t4 = workerManager.queue({ type: 'sleep', data: 500 });
96+
await workerManager.completed();
97+
expect(await t1).toBeUndefined();
98+
expect(await t2).toBeUndefined();
99+
expect(await t3).toBeUndefined();
100+
expect(await t4).toBeUndefined();
101+
void workerManager.queue({ type: 'sleep', data: 500 });
102+
void workerManager.queue({ type: 'sleep', data: 500 });
103+
void workerManager.queue({ type: 'sleep', data: 500 });
104+
void workerManager.queue({ type: 'sleep', data: 500 });
105+
await workerManager.settled();
106+
await workerManager.destroy();
107+
});
108+
// Test('zero-copy buffer transfer', async () => {
109+
// workerManager = await WorkerManager.createWorkerManager(
165110
// {
166-
// workerFactory: () => spawn(new Worker('../src/worker')),
111+
// workerFactory,
167112
// cores: 1,
168113
// logger,
169114
// },
@@ -194,7 +139,6 @@ describe('WorkerManager', () => {
194139
// expect(buffer).toEqual(Buffer.from('hello 2'));
195140
// await workerManager.destroy();
196141
// });
197-
198142
// test('scratch', async () => {
199143
// console.log('start');
200144
// console.log(

tests/WorkerPool.test.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,15 @@ import type { WorkerFactory } from '#types.js';
22
import { Worker } from 'node:worker_threads';
33
import path from 'node:path';
44
import * as url from 'url';
5-
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
65
import WorkerPool from '#WorkerPool.js';
76
import * as errors from '#errors.js';
87

98
const dirname = url.fileURLToPath(new URL('.', import.meta.url));
109

11-
async function sleep(ms: number) {
12-
return new Promise((resolve) => {
13-
setTimeout(resolve, ms);
14-
});
15-
}
16-
1710
// TODO: zero copy of data
1811
// TODO: apply the async resource.
1912

2013
describe('WorkerPool', () => {
21-
const logger = new Logger('WorkerPool Test', LogLevel.WARN, [
22-
new StreamHandler(),
23-
]);
24-
2514
let pool: WorkerPool;
2615
const workerFactory: WorkerFactory = () => {
2716
return new Worker(path.join(dirname, '../dist/worker.js'));

0 commit comments

Comments
 (0)