Skip to content

Commit 91161db

Browse files
committed
wip: prototyping
[ci skip]
1 parent 1f03c63 commit 91161db

File tree

10 files changed

+1166
-407
lines changed

10 files changed

+1166
-407
lines changed

package-lock.json

Lines changed: 862 additions & 354 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
"@matrixai/async-init": "^2.0.0",
4141
"@matrixai/errors": "^2.0.1",
4242
"@matrixai/logger": "^4.0.1",
43-
"threads": "^1.7.0",
44-
"ts-node": "^10.9.1"
43+
"ts-node": "^10.9.1",
44+
"rxjs": "^7.8.2"
4545
},
4646
"devDependencies": {
4747
"@swc/core": "^1.3.76",

src/WorkerManager.ts

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,12 @@
1-
import type WorkerManagerInterface from './WorkerManagerInterface.js';
2-
import type {
3-
ModuleMethods,
4-
ModuleThread,
5-
QueuedTask,
6-
} from './types.js';
1+
import type { WorkerFactory } from './types';
72
import Logger from '@matrixai/logger';
8-
import { CreateDestroy, ready } from '@matrixai/async-init/CreateDestroy.js';
9-
import { Pool } from 'threads';
10-
import * as errors from './errors.js';
11-
3+
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
4+
import { WorkerTask } from './types';
5+
import * as errors from './errors';
6+
import WorkerPool from './WorkerPool';
127

138
@CreateDestroy()
14-
class WorkerManager<W extends ModuleMethods>
15-
implements WorkerManagerInterface<W>
16-
{
9+
class WorkerManager {
1710
/**
1811
* Creates the WorkerManager
1912
* The workerFactory needs to be a callback:
@@ -26,15 +19,15 @@ class WorkerManager<W extends ModuleMethods>
2619
* If `cores` is set to 0, this creates a useless worker pool
2720
* Use `undefined` to mean using all cores
2821
*/
29-
public static async createWorkerManager<W extends ModuleMethods>({
22+
public static async createWorkerManager({
3023
workerFactory,
31-
cores,
24+
cores = 1,
3225
logger = new Logger(this.name),
3326
}: {
34-
workerFactory: () => Promise<ModuleThread<W>>;
27+
workerFactory: WorkerFactory;
3528
cores?: number;
3629
logger?: Logger;
37-
}): Promise<WorkerManager<W>> {
30+
}): Promise<WorkerManager> {
3831
logger.info('Creating WorkerManager');
3932
const workerManager = new this({
4033
workerFactory,
@@ -45,20 +38,20 @@ class WorkerManager<W extends ModuleMethods>
4538
return workerManager;
4639
}
4740

48-
protected pool: Pool<ModuleThread<W>>;
41+
protected pool: WorkerPool;
4942
protected logger: Logger;
5043

5144
public constructor({
5245
workerFactory,
5346
cores,
5447
logger,
5548
}: {
56-
workerFactory: () => Promise<ModuleThread<W>>;
57-
cores?: number;
49+
workerFactory: WorkerFactory;
50+
cores: number;
5851
logger: Logger;
5952
}) {
6053
this.logger = logger;
61-
this.pool = Pool(workerFactory, cores);
54+
this.pool = new WorkerPool(cores, workerFactory);
6255
}
6356

6457
public async destroy({
@@ -70,15 +63,18 @@ class WorkerManager<W extends ModuleMethods>
7063
}
7164

7265
@ready(new errors.ErrorWorkerManagerDestroyed())
73-
public async call<T>(f: (worker: ModuleThread<W>) => Promise<T>): Promise<T> {
74-
return await this.pool.queue(f);
66+
public async call<T>(task: WorkerTask): Promise<T> {
67+
return await this.queue(task);
7568
}
7669

7770
@ready(new errors.ErrorWorkerManagerDestroyed())
78-
public queue<T>(
79-
f: (worker: ModuleThread<W>) => Promise<T>,
80-
): QueuedTask<ModuleThread<W>, T> {
81-
return this.pool.queue(f);
71+
public queue<T>(task: WorkerTask): Promise<T> {
72+
return new Promise<T>((resolve, reject) => {
73+
this.pool.runTask(task, (result, error) => {
74+
if (error != null) return reject(error);
75+
return resolve(result);
76+
});
77+
});
8278
}
8379

8480
@ready(new errors.ErrorWorkerManagerDestroyed())

src/WorkerManagerInterface.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
import type {
2-
ModuleMethods,
3-
ModuleThread,
4-
QueuedTask,
5-
} from './types.js';
1+
import type { ModuleMethods, ModuleThread, QueuedTask } from './types.js';
62

73
interface WorkerManagerInterface<W extends ModuleMethods> {
84
destroy(options?: { force?: boolean }): Promise<void>;

src/WorkerPool.ts

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import type { WorkerFactory, WorkerTask } from './types';
2+
import type { Worker } from 'node:worker_threads';
3+
import type { Subscription } from 'rxjs';
4+
import { Subject } from 'rxjs';
5+
6+
const taskInfoSymbol = Symbol('Task Info Symbol');
7+
type TaskCallback = (result: any | undefined, error: Error | undefined) => void;
8+
type PoolStatus = 'idle' | 'working' | 'queued';
9+
10+
class WorkerPool {
11+
protected workerFactory: WorkerFactory;
12+
protected workers: Set<Worker> = new Set();
13+
protected freeWorkers: Array<Worker> = [];
14+
protected queue: Array<{ task: WorkerTask; callback: TaskCallback }> = [];
15+
protected terminating: boolean = false;
16+
protected handleDestroySubscription: Subscription;
17+
18+
public $workerCreated = new Subject<void>();
19+
public $workerDestroyed = new Subject<void>();
20+
public $workerFreed = new Subject<void>();
21+
public $workerError = new Subject<Error>();
22+
public $poolStatus = new Subject<PoolStatus>();
23+
public poolStatus: PoolStatus = 'idle';
24+
25+
constructor(workerNum: number, workerFactory: WorkerFactory) {
26+
if (workerNum < 1) throw Error('TMP IMP must be at least 1 worker');
27+
this.workerFactory = workerFactory;
28+
for (let i = 0; i < workerNum; i++) {
29+
this.addWorker();
30+
}
31+
32+
this.$workerFreed.subscribe(() => {
33+
if (this.queue.length > 0) {
34+
const queuedTask = this.queue.shift()!;
35+
this.runTask(queuedTask.task, queuedTask.callback);
36+
if (this.queue.length === 0) this.$poolStatus.next('working');
37+
} else if (this.freeWorkers.length === this.workers.size) {
38+
this.$poolStatus.next('idle');
39+
}
40+
});
41+
42+
this.$poolStatus.subscribe((v) => {
43+
this.poolStatus = v;
44+
});
45+
46+
this.handleDestroySubscription = this.$workerDestroyed.subscribe(() => {
47+
this.addWorker();
48+
});
49+
}
50+
51+
protected addWorker() {
52+
const worker = this.workerFactory();
53+
const messageHandler = (result: unknown) => {
54+
worker[taskInfoSymbol](result, undefined);
55+
worker[taskInfoSymbol] = undefined;
56+
this.freeWorkers.push(worker);
57+
this.$workerFreed.next();
58+
};
59+
const errorHandler = (e) => {
60+
if (worker[taskInfoSymbol]) worker[taskInfoSymbol](undefined, e);
61+
else this.$workerError.next(e);
62+
};
63+
worker.on('message', messageHandler);
64+
worker.on('error', errorHandler);
65+
worker.once('exit', () => {
66+
worker.off('message', messageHandler);
67+
worker.off('error', errorHandler);
68+
this.workers.delete(worker);
69+
this.$workerDestroyed.next();
70+
});
71+
72+
// TODO: debugging
73+
// worker.on('message', (...args) => console.log('DEBUG message: ', args));
74+
// worker.on('messageerror', (...args) => console.log('DEBUG messageerror: ', args));
75+
// worker.on('error', (...args) => console.log('DEBUG error: ', args));
76+
// worker.on('exit', (...args) => console.log('DEBUG exit: ', args));
77+
// worker.on('online', (...args) => console.log('DEBUG online: ', args));
78+
79+
this.workers.add(worker);
80+
this.$workerCreated.next();
81+
this.freeWorkers.push(worker);
82+
this.$workerFreed.next();
83+
}
84+
85+
public runTask(task: any, callback: any) {
86+
if (this.terminating) throw Error('TMP IMP terminating');
87+
if (this.freeWorkers.length === 0) {
88+
this.queue.push({ task, callback });
89+
if (this.queue.length === 1) this.$poolStatus.next('queued');
90+
return;
91+
}
92+
93+
const wasIdle = this.freeWorkers.length === this.workers.size;
94+
const worker = this.freeWorkers.pop()!;
95+
if (wasIdle) this.$poolStatus.next('working');
96+
worker[taskInfoSymbol] = callback;
97+
worker.postMessage(task);
98+
}
99+
100+
public async terminate(force: boolean) {
101+
this.terminating = true;
102+
this.handleDestroySubscription.unsubscribe();
103+
if (!force) {
104+
// Prevent new tasks and wait for exising queue to drain
105+
await new Promise<void>((resolve) => {
106+
if (
107+
this.freeWorkers.length === this.workers.size &&
108+
this.queue.length === 0
109+
) {
110+
return resolve();
111+
}
112+
const subscription = this.$workerFreed.subscribe(() => {
113+
// Wait for the queue to drain and the workers to free
114+
if (
115+
this.freeWorkers.length === this.workers.size &&
116+
this.queue.length === 0
117+
) {
118+
subscription.unsubscribe();
119+
resolve();
120+
}
121+
});
122+
});
123+
}
124+
125+
const workerTerminatePs: Array<Promise<number>> = [];
126+
for (const worker of this.workers) {
127+
workerTerminatePs.push(worker.terminate());
128+
}
129+
await Promise.all(workerTerminatePs);
130+
// Console.log('workers', this.workers.size);
131+
// console.log('queue', this.queue.length);
132+
// console.log('freeWorkers', this.freeWorkers.length);
133+
// Cleaning up subjects
134+
this.$workerCreated.complete();
135+
this.$workerDestroyed.complete();
136+
this.$workerFreed.complete();
137+
this.$workerError.complete();
138+
this.$poolStatus.complete();
139+
}
140+
141+
/**
142+
* Returns a promise that will resolve once all tasks have been completed.
143+
* Will reject if a task failed.
144+
*/
145+
public completed(): Promise<void> {
146+
return new Promise<void>((resolve, reject) => {
147+
if (this.poolStatus === 'idle') return resolve();
148+
const errorSubscription = this.$workerError.subscribe((e) => {
149+
errorSubscription.unsubscribe();
150+
stateSubscription.unsubscribe();
151+
reject(e);
152+
});
153+
const stateSubscription = this.$poolStatus.subscribe((v) => {
154+
if (v === 'idle') {
155+
errorSubscription.unsubscribe();
156+
stateSubscription.unsubscribe();
157+
return resolve();
158+
}
159+
});
160+
});
161+
}
162+
163+
/**
164+
* Returns a promise that will resolve once all tasks have been completed.
165+
*/
166+
public settled(): Promise<void> {
167+
return new Promise<void>((resolve) => {
168+
if (this.poolStatus === 'idle') return resolve();
169+
const subscription = this.$poolStatus.subscribe((v) => {
170+
if (v === 'idle') {
171+
subscription.unsubscribe();
172+
return resolve();
173+
}
174+
});
175+
});
176+
}
177+
}
178+
179+
export default WorkerPool;

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ export { default as workerModule } from './workerModule.js';
33
export * as errors from './errors.js';
44

55
export type { default as WorkerManagerInterface } from './WorkerManagerInterface.js';
6-
// export type { WorkerModule } from './workerModule.js';
6+
// Export type { WorkerModule } from './workerModule.js';
77
export type { ModuleMethods, ModuleThread, QueuedTask } from './types.js';

src/types.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
type ModuleMethods = {
2-
[methodName: string]: (...args: any) => any;
3-
};
4-
type ModuleThread<Methods = any> = any;
5-
type QueuedTask<ThreadType, Return> = any;
1+
import type { Worker } from 'node:worker_threads';
2+
3+
type WorkerFactory = () => Worker;
64

7-
export type {
8-
ModuleMethods,
9-
ModuleThread,
10-
QueuedTask
5+
type WorkerTask<T extends string = string, D extends any = any> = {
6+
type: T;
7+
data: D;
118
};
9+
10+
export type { WorkerFactory, WorkerTask };

src/workerModule.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
// import { Transfer } from 'threads';
1+
// Import { Transfer } from 'threads';
22
// import { isWorkerRuntime } from 'threads';
33

44
import * as threads from 'threads';
55

66
const { Transfer, isWorkerRuntime } = threads;
77

8-
98
/**
109
* Worker object that contains all functions that will be executed in parallel
1110
* Functions should be using CPU-parallelism not IO-parallelism

tests/WorkerManager.test.ts

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
// import type { WorkerModule } from '#worker.js';
1+
import type { WorkerFactory } from '@';
2+
import { Worker } from 'node:worker_threads';
23
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
34
import { destroyed } from '@matrixai/async-init';
4-
import * as testUtils from './utils.js';
5-
import { spawn, Worker, Transfer } from 'threads';
6-
import WorkerManager from '#WorkerManager.js';
7-
import * as errors from '#errors.js';
8-
9-
type WorkerModule = any;
5+
import WorkerManager from '@/WorkerManager';
6+
import * as errors from '@/errors';
7+
import WorkerPool from '@/WorkerPool';
8+
import * as testUtils from './utils';
109

1110
describe('WorkerManager', () => {
1211
const logger = new Logger('WorkerManager Test', LogLevel.WARN, [
@@ -169,4 +168,33 @@ describe('WorkerManager', () => {
169168
expect(buffer).toEqual(Buffer.from('hello 2'));
170169
await workerManager.destroy();
171170
});
171+
172+
test('scratch', async () => {
173+
console.log('start');
174+
console.log(
175+
'Hello from main',
176+
nodeWorkers.isMainThread,
177+
nodeWorkers.threadId,
178+
);
179+
180+
const script = `
181+
const nodeWorkers = require("node:worker_threads");
182+
console.log("Hello from worker!: ", nodeWorkers.isMainThread, nodeWorkers.threadId);
183+
nodeWorkers.parentPort.on('message', v => {
184+
console.log('message', v);
185+
nodeWorkers.parentPort.postMessage(v);
186+
});
187+
`;
188+
await new Promise((resolve, reject) => {
189+
const worker = new nodeWorkers.Worker(script, { eval: true });
190+
worker.on('message', (v) => {
191+
logger.warn(`message: ${v}`);
192+
worker.terminate();
193+
});
194+
worker.on('error', reject);
195+
worker.on('exit', resolve);
196+
worker.postMessage('some message!');
197+
});
198+
console.log('stop');
199+
});
172200
});

0 commit comments

Comments
 (0)