Skip to content

Commit 006a974

Browse files
committed
Improved Pool and Decoder workers
Decoder workers now return errors in their messages. Introduced WorkerWrapper, an internal class to manage a single worker and reliably submit jobs that now also properly handle errors. Check if the decoder prefers local execution and submit it locally if preferred.
1 parent dac745c commit 006a974

File tree

2 files changed

+99
-46
lines changed

2 files changed

+99
-46
lines changed

src/pool.js

Lines changed: 91 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,80 @@
1-
import { getDecoder } from './compression/index.js';
1+
import { getDecoder, preferWorker } from './compression/index.js';
2+
import create from './worker/create.js';
23

34
const defaultPoolSize = typeof navigator !== 'undefined' ? (navigator.hardwareConcurrency || 2) : 2;
45

56
/**
67
* @module pool
78
*/
89

10+
/**
11+
* Wrapper for a worker that can submit jobs to the worker and receive responses.
12+
*/
13+
class WorkerWrapper {
14+
/**
15+
* @param {Worker} worker the worker to wrap
16+
*/
17+
constructor(worker) {
18+
this.worker = worker;
19+
this.worker.addEventListener('message', (e) => this._onWorkerMessage(e));
20+
this.jobIdCounter = 0;
21+
this.jobs = new Map();
22+
}
23+
24+
/**
25+
* Get a new job id
26+
* @returns {Number} the new job id
27+
*/
28+
newJobId() {
29+
return this.jobIdCounter++;
30+
}
31+
32+
/**
33+
* Get the number of jobs currently running
34+
* @returns {Number} the number of jobs currently running
35+
*/
36+
getJobCount() {
37+
return this.jobs.size;
38+
}
39+
40+
_onWorkerMessage(e) {
41+
const { jobId, error, ...result } = e.data;
42+
const job = this.jobs.get(jobId);
43+
this.jobs.delete(jobId);
44+
45+
if (error) {
46+
job.reject(new Error(error));
47+
} else {
48+
job.resolve(result);
49+
}
50+
}
51+
52+
/**
53+
* Submit a job to the worker
54+
* @param {Object} message the message to send to the worker. A "jobId" property will be added to this object.
55+
* @param {Object[]} [transferables] an optional array of transferable objects to transfer to the worker.
56+
* @returns {Promise} a promise that gets resolved/rejected when a message with the same jobId is received from the worker.
57+
*/
58+
submitJob(message, transferables = undefined) {
59+
const jobId = this.newJobId();
60+
let resolve;
61+
let reject;
62+
63+
const promise = new Promise((_resolve, _reject) => {
64+
resolve = _resolve;
65+
reject = _reject;
66+
});
67+
68+
this.jobs.set(jobId, { resolve, reject });
69+
this.worker.postMessage({ ...message, jobId }, transferables);
70+
return promise;
71+
}
72+
73+
terminate() {
74+
this.worker.terminate();
75+
}
76+
}
77+
978
/**
1079
* Pool for workers to decode chunks of the images.
1180
*/
@@ -39,24 +108,16 @@ class Pool {
39108
* }
40109
* ```
41110
*/
42-
constructor(size = defaultPoolSize, createWorker) {
43-
this.workers = null;
44-
this._awaitingDecoder = null;
45-
this.size = size;
46-
this.messageId = 0;
111+
constructor(size = defaultPoolSize, createWorker = create) {
112+
this.workerWrappers = null;
47113
if (size) {
48-
this._awaitingDecoder = createWorker ? Promise.resolve(createWorker) : new Promise((resolve) => {
49-
import('./worker/decoder.js').then((module) => {
50-
resolve(module.create);
51-
});
52-
});
53-
this._awaitingDecoder.then((create) => {
54-
this._awaitingDecoder = null;
55-
this.workers = [];
114+
this.workerWrappers = (async () => {
115+
const workerWrappers = [];
56116
for (let i = 0; i < size; i++) {
57-
this.workers.push({ worker: create(), idle: true });
117+
workerWrappers.push(new WorkerWrapper(createWorker()));
58118
}
59-
});
119+
return workerWrappers;
120+
})();
60121
}
61122
}
62123

@@ -66,34 +127,24 @@ class Pool {
66127
* @returns {Promise<ArrayBuffer>} the decoded result as a `Promise`
67128
*/
68129
async decode(fileDirectory, buffer) {
69-
if (this._awaitingDecoder) {
70-
await this._awaitingDecoder;
71-
}
72-
return this.size === 0
73-
? getDecoder(fileDirectory).then((decoder) => decoder.decode(fileDirectory, buffer))
74-
: new Promise((resolve) => {
75-
const worker = this.workers.find((candidate) => candidate.idle)
76-
|| this.workers[Math.floor(Math.random() * this.size)];
77-
worker.idle = false;
78-
const id = this.messageId++;
79-
const onMessage = (e) => {
80-
if (e.data.id === id) {
81-
worker.idle = true;
82-
resolve(e.data.decoded);
83-
worker.worker.removeEventListener('message', onMessage);
84-
}
85-
};
86-
worker.worker.addEventListener('message', onMessage);
87-
worker.worker.postMessage({ fileDirectory, buffer, id }, [buffer]);
130+
if (preferWorker(fileDirectory) && this.workerWrappers) {
131+
// select the worker with the lowest jobCount
132+
const workerWrapper = (await this.workerWrappers).reduce((a, b) => {
133+
return a.getJobCount() < b.getJobCount() ? a : b;
88134
});
135+
const { decoded } = await workerWrapper.submitJob({ fileDirectory, buffer }, [buffer]);
136+
return decoded;
137+
} else {
138+
return getDecoder(fileDirectory).then((decoder) => decoder.decode(fileDirectory, buffer));
139+
}
89140
}
90141

91-
destroy() {
92-
if (this.workers) {
93-
this.workers.forEach((worker) => {
94-
worker.worker.terminate();
142+
async destroy() {
143+
if (this.workerWrappers) {
144+
(await this.workerWrappers).forEach((worker) => {
145+
worker.terminate();
95146
});
96-
this.workers = null;
147+
this.workerWrappers = null;
97148
}
98149
}
99150
}

src/worker/decoder.js

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import { getDecoder } from '../compression/index.js';
55
const worker = globalThis;
66

77
worker.addEventListener('message', async (e) => {
8-
const { id, fileDirectory, buffer } = e.data;
9-
const decoder = await getDecoder(fileDirectory);
10-
const decoded = await decoder.decode(fileDirectory, buffer);
11-
worker.postMessage({ decoded, id }, [decoded]);
8+
const { fileDirectory, buffer, ...extra } = e.data;
9+
try {
10+
const decoder = await getDecoder(fileDirectory);
11+
const decoded = await decoder.decode(fileDirectory, buffer);
12+
worker.postMessage({ decoded, ...extra }, [decoded]);
13+
} catch (error) {
14+
worker.postMessage({ error: error.message, ...extra });
15+
}
1216
});
13-
14-
export let create;

0 commit comments

Comments
 (0)