Skip to content

Commit 4c36b86

Browse files
committed
wip: handling workers that fail to create properly and test expanding
1 parent 3239b18 commit 4c36b86

File tree

6 files changed

+325
-221
lines changed

6 files changed

+325
-221
lines changed

src/WorkerManager.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { WorkerFactory } from './types.js';
22
import Logger from '@matrixai/logger';
33
import { CreateDestroy, ready } from '@matrixai/async-init/CreateDestroy.js';
4-
import { WorkerTask } from './types.js';
4+
import { WorkerTaskInformation } from './types.js';
55
import * as errors from './errors.js';
66
import WorkerPool from './WorkerPool.js';
77

@@ -63,12 +63,12 @@ class WorkerManager {
6363
}
6464

6565
@ready(new errors.ErrorWorkerManagerDestroyed())
66-
public async call(task: WorkerTask): Promise<unknown> {
66+
public async call(task: WorkerTaskInformation): Promise<unknown> {
6767
return await this.queue(task);
6868
}
6969

7070
@ready(new errors.ErrorWorkerManagerDestroyed())
71-
public queue(task: WorkerTask): Promise<unknown> {
71+
public queue(task: WorkerTaskInformation): Promise<unknown> {
7272
return new Promise((resolve, reject) => {
7373
this.pool.runTask(task, (result, error) => {
7474
if (error != null) return reject(error);

src/WorkerPool.ts

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,40 @@ import type {
44
TaskCallback,
55
WorkerFactory,
66
WorkerResultInternal,
7-
WorkerTask,
7+
WorkerTaskInformation,
88
WorkerTaskInput,
99
} from './types.js';
10+
import { AsyncResource } from 'node:async_hooks';
1011
import { Subject } from 'rxjs';
1112
import { WorkerFunction, WorkerResult } from './types.js';
1213
import * as errors from './errors.js';
1314

15+
class WorkerTask extends AsyncResource {
16+
protected callback: TaskCallback;
17+
18+
constructor(callback: TaskCallback, triggerAsyncId?: number) {
19+
super('WorkerTask', triggerAsyncId);
20+
this.callback = callback;
21+
}
22+
23+
public done(result, error: Error) {
24+
this.runInAsyncScope(this.callback, null, result, error);
25+
this.emitDestroy();
26+
}
27+
}
28+
1429
const taskInfoSymbol = Symbol('Task Info Symbol');
15-
type PoolStatus = 'idle' | 'working' | 'queued';
30+
type PoolStatus = 'terminated' | 'idle' | 'working' | 'queued';
1631

1732
class WorkerPool {
1833
protected workerFactory: WorkerFactory;
1934
protected workers: Set<Worker> = new Set();
2035
protected freeWorkers: Array<Worker> = [];
21-
protected queue: Array<{ task: WorkerTask; callback: TaskCallback }> = [];
22-
protected terminating: boolean = false;
36+
protected queue: Array<{
37+
task: WorkerTaskInformation;
38+
callback: TaskCallback;
39+
}> = [];
40+
protected terminatedError: Error | undefined = undefined;
2341
protected handleDestroySubscription: Subscription;
2442

2543
public $workerCreated = new Subject<void>();
@@ -57,15 +75,31 @@ class WorkerPool {
5775

5876
protected addWorker() {
5977
const worker = this.workerFactory();
78+
let workerError: Error;
79+
let initializing = true;
6080
const messageHandler = (result: WorkerResultInternal) => {
61-
if (result.error != null) worker[taskInfoSymbol](undefined, result.error);
62-
else worker[taskInfoSymbol](result.data, undefined);
81+
if (initializing) {
82+
// @ts-ignore: ignoring type here
83+
if (result !== 'initialized') {
84+
throw Error('TMP IMP failed to initialize properly');
85+
}
86+
initializing = false;
87+
this.freeWorkers.push(worker);
88+
this.$workerFreed.next();
89+
return;
90+
}
91+
if (result.error != null) {
92+
worker[taskInfoSymbol].done(undefined, result.error);
93+
} else {
94+
worker[taskInfoSymbol].done(result.data, undefined);
95+
}
6396
worker[taskInfoSymbol] = undefined;
6497
this.freeWorkers.push(worker);
6598
this.$workerFreed.next();
6699
};
67100
const errorHandler = (e) => {
68-
if (worker[taskInfoSymbol]) worker[taskInfoSymbol](undefined, e);
101+
workerError = e;
102+
if (worker[taskInfoSymbol]) worker[taskInfoSymbol].done(undefined, e);
69103
else this.$workerError.next(e);
70104
};
71105
worker.on('message', messageHandler);
@@ -74,24 +108,37 @@ class WorkerPool {
74108
worker.off('message', messageHandler);
75109
worker.off('error', errorHandler);
76110
this.workers.delete(worker);
111+
if (
112+
workerError != null &&
113+
`${workerError.message}`.includes('Cannot find module')
114+
) {
115+
// If the worker errored then we want to check if it was a failure to load error
116+
if (this.workers.size === 0 && this.terminatedError == null) {
117+
this.terminatedError = new Error(
118+
'TMP IMP Workers failed to load modules',
119+
);
120+
this.cleanUp(this.terminatedError);
121+
}
122+
return;
123+
}
77124
this.$workerDestroyed.next();
78125
});
126+
worker.once('online', async () => {
127+
worker.postMessage({ type: 'initialize', data: undefined });
128+
});
79129

80130
// TODO: debugging
81131
// worker.on('message', (...args) => console.log('DEBUG message: ', args));
82132
// worker.on('messageerror', (...args) => console.log('DEBUG messageerror: ', args));
83133
// worker.on('error', (...args) => console.log('DEBUG error: ', args));
84134
// worker.on('exit', (...args) => console.log('DEBUG exit: ', args));
85135
// worker.on('online', (...args) => console.log('DEBUG online: ', args));
86-
87136
this.workers.add(worker);
88137
this.$workerCreated.next();
89-
this.freeWorkers.push(worker);
90-
this.$workerFreed.next();
91138
}
92139

93140
public runTask(task: WorkerTaskInput, callback: TaskCallback) {
94-
if (this.terminating) throw Error('TMP IMP terminating');
141+
if (this.terminatedError != null) throw this.terminatedError;
95142
if (this.freeWorkers.length === 0) {
96143
this.queue.push({ task, callback });
97144
if (this.queue.length === 1) this.$poolStatus.next('queued');
@@ -101,12 +148,13 @@ class WorkerPool {
101148
const wasIdle = this.freeWorkers.length === this.workers.size;
102149
const worker = this.freeWorkers.pop()!;
103150
if (wasIdle) this.$poolStatus.next('working');
104-
worker[taskInfoSymbol] = callback;
151+
worker[taskInfoSymbol] = new WorkerTask(callback);
105152
worker.postMessage({ type: task.type, data: task.data }, task.transferList);
106153
}
107154

108155
public async terminate(force: boolean) {
109-
this.terminating = true;
156+
if (this.terminatedError != null) return;
157+
this.terminatedError = Error('TMP IMP terminating');
110158
// Prevent new tasks and wait for exising queue to drain
111159
if (!force) await this.settled();
112160
// Prevent terminations from creating new workers
@@ -116,9 +164,18 @@ class WorkerPool {
116164
workerTerminatePs.push(worker.terminate());
117165
}
118166
await Promise.all(workerTerminatePs);
119-
// Console.log('workers', this.workers.size);
120-
// console.log('queue', this.queue.length);
121-
// console.log('freeWorkers', this.freeWorkers.length);
167+
this.cleanUp(this.terminatedError);
168+
}
169+
170+
protected cleanUp(terminatedError: Error): void {
171+
// Cleaning up remaining queue and observables
172+
let task = this.queue.pop();
173+
while (task != null) {
174+
const workerTask = new WorkerTask(task.callback);
175+
workerTask.done(undefined, terminatedError);
176+
task = this.queue.pop();
177+
}
178+
this.$poolStatus.next('terminated');
122179
// Cleaning up subjects
123180
this.$workerCreated.complete();
124181
this.$workerDestroyed.complete();
@@ -133,14 +190,16 @@ class WorkerPool {
133190
*/
134191
public completed(): Promise<void> {
135192
return new Promise<void>((resolve, reject) => {
136-
if (this.poolStatus === 'idle') return resolve();
193+
if (this.poolStatus === 'idle' || this.poolStatus === 'terminated') {
194+
return resolve();
195+
}
137196
const errorSubscription = this.$workerError.subscribe((e) => {
138197
errorSubscription.unsubscribe();
139198
stateSubscription.unsubscribe();
140199
reject(e);
141200
});
142201
const stateSubscription = this.$poolStatus.subscribe((v) => {
143-
if (v === 'idle') {
202+
if (v === 'idle' || this.poolStatus === 'terminated') {
144203
errorSubscription.unsubscribe();
145204
stateSubscription.unsubscribe();
146205
return resolve();
@@ -154,9 +213,11 @@ class WorkerPool {
154213
*/
155214
public settled(): Promise<void> {
156215
return new Promise<void>((resolve) => {
157-
if (this.poolStatus === 'idle') return resolve();
216+
if (this.poolStatus === 'idle' || this.poolStatus === 'terminated') {
217+
return resolve();
218+
}
158219
const subscription = this.$poolStatus.subscribe((v) => {
159-
if (v === 'idle') {
220+
if (v === 'idle' || this.poolStatus === 'terminated') {
160221
subscription.unsubscribe();
161222
return resolve();
162223
}

src/expose.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type {
2-
WorkerTask,
2+
WorkerTaskInformation,
33
WorkerManifest,
44
WorkerResult,
55
WorkerResultInternal,
@@ -15,7 +15,11 @@ import * as workerErrors from '#errors.js';
1515
function expose(workerManifest: WorkerManifest) {
1616
// We don't want to run this in the main thread since it's not acting as a worker
1717
if (isMainThread) return;
18-
const handleMessage = async (task: WorkerTask) => {
18+
const handleMessage = async (task: WorkerTaskInformation) => {
19+
if (task.type === 'initialize') {
20+
parentPort!.postMessage('initialized');
21+
return;
22+
}
1923
const method = workerManifest[task.type];
2024
if (method == null) {
2125
parentPort!.postMessage({

src/types.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ import type { TransferListItem, Worker } from 'node:worker_threads';
22

33
type WorkerFactory = () => Worker;
44

5-
type WorkerTask<T extends string = string, D = unknown> = {
5+
type WorkerTaskInformation<T extends string = string, D = unknown> = {
66
type: T;
77
data: D;
88
};
99

10-
type WorkerTaskInput = WorkerTask & {
10+
type WorkerTaskInput = WorkerTaskInformation & {
1111
transferList?: Array<TransferListItem>;
1212
};
1313

@@ -25,7 +25,7 @@ type TaskCallback<O = unknown> = (result: O, error?: Error) => void;
2525

2626
export type {
2727
WorkerFactory,
28-
WorkerTask,
28+
WorkerTaskInformation,
2929
WorkerTaskInput,
3030
WorkerFunction,
3131
WorkerResult,

0 commit comments

Comments
 (0)