Skip to content

Commit df5c5cf

Browse files
committed
Rework decompression implementations to not be Workers, but instead communicate to the host via a MessagePort. This doesn't address issue #44 directly yet (still need a way to swap out port connection with a non-Web Worker).
1 parent 1ab380c commit df5c5cf

File tree

6 files changed

+186
-108
lines changed

6 files changed

+186
-108
lines changed

archive/decompress-internal.js

Lines changed: 73 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export const UnarchiveEventType = {
3333
/**
3434
* An unarchive event.
3535
*/
36-
export class UnarchiveEvent extends Event {
36+
export class UnarchiveEvent extends Event {
3737
/**
3838
* @param {string} type The event type.
3939
*/
@@ -45,7 +45,7 @@ export const UnarchiveEventType = {
4545
/**
4646
* Updates all Archiver listeners that an append has occurred.
4747
*/
48-
export class UnarchiveAppendEvent extends UnarchiveEvent {
48+
export class UnarchiveAppendEvent extends UnarchiveEvent {
4949
/**
5050
* @param {number} numBytes The number of bytes appended.
5151
*/
@@ -167,18 +167,34 @@ export class UnarchiveExtractEvent extends UnarchiveEvent {
167167
* Base class for all Unarchivers.
168168
*/
169169
export class Unarchiver extends EventTarget {
170+
/**
171+
* A handle to the decompressor implementation context.
172+
* @type {Worker|*}
173+
* @private
174+
*/
175+
implRef_;
176+
177+
/**
178+
* The client-side port that sends messages to, and receives messages from the
179+
* decompressor implementation.
180+
* @type {MessagePort}
181+
* @private
182+
*/
183+
port_;
184+
170185
/**
171186
* @param {ArrayBuffer} arrayBuffer The Array Buffer. Note that this ArrayBuffer must not be
172187
* referenced once it is sent to the Unarchiver, since it is marked as Transferable and sent
173-
* to the Worker.
174-
* @param {Function(string):Worker} createWorkerFn A function that creates a Worker from a script file.
188+
* to the decompress implementation.
189+
* @param {Function(string, MessagePort):Promise<*>} connectPortFn A function that takes a path
190+
* to a JS decompression implementation (unzip.js) and connects it to a MessagePort.
175191
* @param {Object|string} options An optional object of options, or a string representing where
176192
* the BitJS files are located. The string version of this argument is deprecated.
177193
* Available options:
178194
* 'pathToBitJS': A string indicating where the BitJS files are located.
179195
* 'debug': A boolean where true indicates that the archivers should log debug output.
180196
*/
181-
constructor(arrayBuffer, createWorkerFn, options = {}) {
197+
constructor(arrayBuffer, connectPortFn, options = {}) {
182198
super();
183199

184200
if (typeof options === 'string') {
@@ -195,11 +211,11 @@ export class UnarchiveExtractEvent extends UnarchiveEvent {
195211
this.ab = arrayBuffer;
196212

197213
/**
198-
* A factory method that creates a Worker that does the unarchive work.
199-
* @type {Function(string): Worker}
214+
* A factory method that connects a port to the decompress implementation.
215+
* @type {Function(MessagePort): Promise<*>}
200216
* @private
201217
*/
202-
this.createWorkerFn_ = createWorkerFn;
218+
this.connectPortFn_ = connectPortFn;
203219

204220
/**
205221
* The path to the BitJS files.
@@ -213,13 +229,6 @@ export class UnarchiveExtractEvent extends UnarchiveEvent {
213229
* @type {boolean}
214230
*/
215231
this.debugMode_ = !!(options.debug);
216-
217-
/**
218-
* Private web worker initialized during start().
219-
* @private
220-
* @type {Worker}
221-
*/
222-
this.worker_ = null;
223232
}
224233

225234
/**
@@ -241,7 +250,7 @@ export class UnarchiveExtractEvent extends UnarchiveEvent {
241250
}
242251

243252
/**
244-
* Create an UnarchiveEvent out of the object sent back from the Worker.
253+
* Create an UnarchiveEvent out of the object sent back from the implementation.
245254
* @param {Object} obj
246255
* @returns {UnarchiveEvent}
247256
* @private
@@ -276,107 +285,115 @@ export class UnarchiveExtractEvent extends UnarchiveEvent {
276285
* @param {Object} obj
277286
* @private
278287
*/
279-
handleWorkerEvent_(obj) {
288+
handlePortEvent_(obj) {
280289
const type = obj.type;
281290
if (type && Object.values(UnarchiveEventType).includes(type)) {
282291
const evt = this.createUnarchiveEvent_(obj);
283292
this.dispatchEvent(evt);
284293
if (evt.type == UnarchiveEventType.FINISH) {
285-
this.worker_.terminate();
294+
this.stop();
286295
}
287296
} else {
288-
console.log(`Unknown object received from worker: ${obj}`);
297+
console.log(`Unknown object received from port: ${obj}`);
289298
}
290299
}
291300

292301
/**
293-
* Starts the unarchive in a separate Web Worker thread and returns immediately.
302+
* Starts the unarchive by connecting the ports and sending the first ArrayBuffer.
294303
*/
295304
start() {
296305
const me = this;
297-
const scriptFileName = this.pathToBitJS_ + this.getScriptFileName();
298-
if (scriptFileName) {
299-
this.worker_ = this.createWorkerFn_(scriptFileName);
300-
301-
this.worker_.onerror = function (e) {
302-
console.log('Worker error: message = ' + e.message);
306+
const messageChannel = new MessageChannel();
307+
this.port_ = messageChannel.port1;
308+
this.connectPortFn_(this.pathToBitJS_,
309+
this.getScriptFileName(),
310+
messageChannel.port2).then((implRef) => {
311+
this.implRef_ = implRef;
312+
313+
this.port_.onerror = function (e) {
314+
console.log('Impl error: message = ' + e.message);
303315
throw e;
304316
};
305-
306-
this.worker_.onmessage = function (e) {
317+
318+
this.port_.onmessage = function (e) {
307319
if (typeof e.data == 'string') {
308-
// Just log any strings the workers pump our way.
320+
// Just log any strings the port pumps our way.
309321
console.log(e.data);
310322
} else {
311-
me.handleWorkerEvent_(e.data);
323+
me.handlePortEvent_(e.data);
312324
}
313325
};
314-
326+
315327
const ab = this.ab;
316-
this.worker_.postMessage({
328+
this.port_.postMessage({
317329
file: ab,
318330
logToConsole: this.debugMode_,
319331
}, [ab]);
320-
this.ab = null;
321-
}
332+
this.ab = null;
333+
});
322334
}
323335

324-
// TODO: Create a startSync() method that does not use a worker for Node.
325-
326336
/**
327-
* Adds more bytes to the unarchiver's Worker thread.
337+
* Adds more bytes to the unarchiver.
328338
* @param {ArrayBuffer} ab The ArrayBuffer with more bytes in it. If opt_transferable is
329339
* set to true, this ArrayBuffer must not be referenced after calling update(), since it
330-
* is marked as Transferable and sent to the Worker.
340+
* is marked as Transferable and sent to the implementation.
331341
* @param {boolean=} opt_transferable Optional boolean whether to mark this ArrayBuffer
332342
* as a Tranferable object, which means it can no longer be referenced outside of
333-
* the Worker thread.
343+
* the implementation context.
334344
*/
335345
update(ab, opt_transferable = false) {
336346
const numBytes = ab.byteLength;
337-
if (this.worker_) {
347+
if (this.port_) {
338348
// Send the ArrayBuffer over, and mark it as a Transferable object if necessary.
339349
if (opt_transferable) {
340-
this.worker_.postMessage({ bytes: ab }, [ab]);
350+
this.port_.postMessage({ bytes: ab }, [ab]);
341351
} else {
342-
this.worker_.postMessage({ bytes: ab });
352+
this.port_.postMessage({ bytes: ab });
343353
}
344354
}
345355

346356
this.dispatchEvent(new UnarchiveAppendEvent(numBytes));
347357
}
348358

349359
/**
350-
* Terminates the Web Worker for this Unarchiver and returns immediately.
360+
* Closes the port to the decompressor implementation and terminates it.
351361
*/
352362
stop() {
353-
if (this.worker_) {
354-
this.worker_.terminate();
363+
if (this.port_) {
364+
this.port_.close();
365+
this.port_ = null;
366+
}
367+
if (this.implRef_) {
368+
if (this.implRef_ instanceof Worker) {
369+
this.implRef_.terminate();
370+
this.implRef_ = null;
371+
}
355372
}
356373
}
357374
}
358375

359376
export class UnzipperInternal extends Unarchiver {
360-
constructor(arrayBuffer, createWorkerFn, options) {
361-
super(arrayBuffer, createWorkerFn, options);
377+
constructor(arrayBuffer, connectPortFn, options) {
378+
super(arrayBuffer, connectPortFn, options);
362379
}
363380

364381
getMIMEType() { return 'application/zip'; }
365382
getScriptFileName() { return 'archive/unzip.js'; }
366383
}
367384

368385
export class UnrarrerInternal extends Unarchiver {
369-
constructor(arrayBuffer, createWorkerFn, options) {
370-
super(arrayBuffer, createWorkerFn, options);
386+
constructor(arrayBuffer, connectPortFn, options) {
387+
super(arrayBuffer, connectPortFn, options);
371388
}
372389

373390
getMIMEType() { return 'application/x-rar-compressed'; }
374391
getScriptFileName() { return 'archive/unrar.js'; }
375392
}
376393

377394
export class UntarrerInternal extends Unarchiver {
378-
constructor(arrayBuffer, createWorkerFn, options) {
379-
super(arrayBuffer, createWorkerFn, options);
395+
constructor(arrayBuffer, connectPortFn, options) {
396+
super(arrayBuffer, connectPortFn, options);
380397
}
381398

382399
getMIMEType() { return 'application/x-tar'; }
@@ -387,12 +404,12 @@ export class UntarrerInternal extends Unarchiver {
387404
* Factory method that creates an unarchiver based on the byte signature found
388405
* in the arrayBuffer.
389406
* @param {ArrayBuffer} ab
390-
* @param {Function(string):Worker} createWorkerFn A function that creates a Worker from a script file.
407+
* @param {Function(string):Promise<*>} connectPortFn A function that connects the impl port.
391408
* @param {Object|string} options An optional object of options, or a string representing where
392409
* the path to the unarchiver script files.
393410
* @returns {Unarchiver}
394411
*/
395-
export function getUnarchiverInternal(ab, createWorkerFn, options = {}) {
412+
export function getUnarchiverInternal(ab, connectPortFn, options = {}) {
396413
if (ab.byteLength < 10) {
397414
return null;
398415
}
@@ -401,11 +418,11 @@ export class UntarrerInternal extends Unarchiver {
401418
const mimeType = findMimeType(ab);
402419

403420
if (mimeType === 'application/x-rar-compressed') { // Rar!
404-
unarchiver = new UnrarrerInternal(ab, createWorkerFn, options);
421+
unarchiver = new UnrarrerInternal(ab, connectPortFn, options);
405422
} else if (mimeType === 'application/zip') { // PK (Zip)
406-
unarchiver = new UnzipperInternal(ab, createWorkerFn, options);
423+
unarchiver = new UnzipperInternal(ab, connectPortFn, options);
407424
} else { // Try with tar
408-
unarchiver = new UntarrerInternal(ab, createWorkerFn, options);
425+
unarchiver = new UntarrerInternal(ab, connectPortFn, options);
409426
}
410427
return unarchiver;
411428
}

archive/decompress.js

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,46 +39,50 @@ export {
3939
*/
4040

4141
/**
42-
* The goal is to make this testable - send getUnarchiver() an array buffer of
43-
* an archive, call start on the unarchiver, expect the returned result.
44-
*
45-
* Problem: It relies on Web Workers, and that won't work in a nodejs context.
46-
* Solution: Make archive.js very thin, have it feed web-specific things into
47-
* an internal module that is isomorphic JavaScript.
48-
*
49-
* TODO:
50-
* - write unit tests for archive-internal.js that use the nodejs Worker
51-
* equivalent.
52-
* - maybe use @pgriess/node-webworker or @audreyt/node-webworker-threads or
53-
* just node's worker_threads ?
54-
*/
42+
* Creates a WebWorker with the given decompressor implementation (i.e. unzip.js)
43+
* and transfers a MessagePort for communication. Returns a Promise to the Worker.
44+
* @param {string} pathToBitJS The path to the bitjs folder.
45+
* @param {string} implFilename The decompressor implementation filename
46+
* relative to the bitjs root (e.g. archive/unzip.js)
47+
* @param {MessagePort} implPort The MessagePort to connect to the decompressor
48+
* implementation.
49+
* @returns {Promise<*>} Returns a Promise that resolves to the Worker object.
50+
*/
51+
const connectPortFn = (pathToBitJS, implFilename, implPort) => {
52+
return new Promise((resolve, reject) => {
53+
const worker = new Worker(pathToBitJS + 'archive/unarchiver-webworker.js', {
54+
type: 'module'
55+
});
5556

56-
const createWorkerFn = (scriptFilename) => new Worker(scriptFilename, { type: 'module' });
57+
worker.postMessage({ implSrc: (pathToBitJS + implFilename), }, [implPort]);
58+
resolve(worker);
59+
});
60+
};
5761

58-
// Thin wrappers of compressors for clients who want to construct a specific
62+
// Thin wrappers of decompressors for clients who want to construct a specific
5963
// unarchiver themselves rather than use getUnarchiver().
6064
export class Unzipper extends UnzipperInternal {
61-
constructor(ab, options) { super(ab, createWorkerFn, options); }
65+
constructor(ab, options) { super(ab, connectPortFn, options); }
6266
}
6367

6468
export class Unrarrer extends UnrarrerInternal {
65-
constructor(ab, options) { super(ab, createWorkerFn, options); }
69+
constructor(ab, options) { super(ab, connectPortFn, options); }
6670
}
6771

6872
export class Untarrer extends UntarrerInternal {
69-
constructor(ab, options) { super(ab, createWorkerFn, options); }
73+
constructor(ab, options) { super(ab, connectPortFn, options); }
7074
}
7175

7276
/**
7377
* Factory method that creates an unarchiver based on the byte signature found
74-
* in the arrayBuffer.
78+
* in the ArrayBuffer.
7579
* @param {ArrayBuffer} ab The ArrayBuffer to unarchive. Note that this ArrayBuffer
76-
* must not be referenced after calling this method, as the ArrayBuffer is marked
77-
* as Transferable and sent to a Worker thread once start() is called.
80+
* must not be referenced after calling this method, as the ArrayBuffer may be
81+
* tranferred to a different JS context once start() is called.
7882
* @param {Object|string} options An optional object of options, or a string
7983
* representing where the path to the unarchiver script files.
8084
* @returns {Unarchiver}
8185
*/
8286
export function getUnarchiver(ab, options = {}) {
83-
return getUnarchiverInternal(ab, createWorkerFn, options);
87+
return getUnarchiverInternal(ab, connectPortFn, options);
8488
}

archive/unarchiver-webworker.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/**
2+
* unarchiver-webworker.js
3+
*
4+
* Licensed under the MIT License
5+
*
6+
* Copyright(c) 2023 Google Inc.
7+
*/
8+
9+
/**
10+
* A WebWorker wrapper for a decompress implementation. Upon creation and being
11+
* sent its first message, it dynamically loads the correct decompressor and
12+
* connects the message port
13+
*/
14+
15+
/** @type {MessagePort} */
16+
let implPort;
17+
18+
onmessage = async (evt) => {
19+
const module = await import(evt.data.implSrc);
20+
module.connect(evt.ports[0]);
21+
};

0 commit comments

Comments
 (0)