Skip to content
Draft
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
a00b452
chore: Add a stub for shared worker code
bebraw Oct 5, 2021
9dea04c
chore: Refine shared worker implementation
bebraw Oct 5, 2021
98555a8
chore: Add initial docs
bebraw Oct 5, 2021
f224972
test: Add initial tests
bebraw Oct 5, 2021
2f5d1c0
chore: Sketch out spawn for shared workers
bebraw Oct 6, 2021
3e74bd8
chore: Refine type check
bebraw Oct 6, 2021
e807f5b
fix: Use instanceof for shared worker checks
bebraw Oct 6, 2021
9a0c7ab
fix: Start shared worker when events are added
bebraw Oct 6, 2021
66d9e2d
chore: Add a todo
bebraw Oct 6, 2021
dc8f1df
fix: Make shared optional
bebraw Oct 11, 2021
c742173
fix: Add a check against possibly missing port
bebraw Oct 11, 2021
bebb1af
fix: Include shared worker to ts
bebraw Oct 11, 2021
5a9b2a9
fix: Add a missing browser field
bebraw Oct 12, 2021
aed0dda
fix: Drop @types/sharedworker
bebraw Oct 12, 2021
ad356b1
fix: Fix test paths
bebraw Oct 12, 2021
6a70a09
chore: Add a todo
bebraw Oct 12, 2021
cf06ee9
fix: Implement terminate for shared workers
bebraw Oct 12, 2021
ebecb9e
chore: Improve spacing
bebraw Oct 12, 2021
e373b20
chore: Clean up formatting
bebraw Oct 18, 2021
b661764
chore: Remove unnecessary formatting from readme
bebraw Oct 18, 2021
1c22039
chore: Drop ;'s
bebraw Oct 18, 2021
9828320
chore: Drop ;'s
bebraw Oct 18, 2021
f594466
chore: Remove unnecessary formatting from readme
bebraw Oct 18, 2021
bdee01a
chore: Drop ,'s
bebraw Oct 18, 2021
8073c52
chore: Revert formatting
bebraw Oct 18, 2021
2d004ab
chore: Revert formatting
bebraw Oct 18, 2021
5c1f8ac
chore: Revert formatting
bebraw Oct 18, 2021
e232ee6
chore: Revert formatting
bebraw Oct 18, 2021
1594e2c
chore: Drop semicolons
bebraw Oct 20, 2021
127c55d
chore: Simplify code
bebraw Oct 20, 2021
2b7d881
refactor: Simplify implementation
bebraw Oct 20, 2021
04554a8
chore: Drop formatting
bebraw Oct 20, 2021
a906d77
chore: Drop formatting
bebraw Oct 20, 2021
42d15bf
chore: Drop formatting
bebraw Oct 20, 2021
9138fd5
chore: Drop formatting
bebraw Oct 20, 2021
fdf90a3
chore: Drop formatting
bebraw Oct 20, 2021
a1b837b
chore: Drop formatting
bebraw Oct 20, 2021
4606977
chore: Drop formatting
bebraw Oct 20, 2021
7f1a625
chore: Drop formatting
bebraw Oct 20, 2021
8663a6d
chore: Drop formatting
bebraw Oct 20, 2021
cf5cf41
chore: Mark a todo
bebraw Oct 20, 2021
2ea1d55
chore: Drop a reference
bebraw Oct 20, 2021
4c9d072
chore: Drop formatting
bebraw Oct 20, 2021
5aef0bd
chore: Drop formatting
bebraw Oct 20, 2021
3c00490
chore: Drop formatting
bebraw Oct 20, 2021
8e008d5
refactor: Merge shared worker tests with spawn tests
bebraw Oct 22, 2021
de025a1
fix: Solve the proxy type issue
bebraw Oct 22, 2021
b3dfead
fix: Fix shared worker checks within a worker
bebraw Oct 25, 2021
aceed4b
chore: Add a debug print
bebraw Oct 25, 2021
cffac58
chore: Sketch out onconnect
bebraw Oct 25, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,40 @@ Use `expose()` to make a function or an object containing methods callable from

In case of exposing an object, `spawn()` will asynchronously return an object exposing all the object's functions. If you `expose()` a function, `spawn` will also return a callable function, not an object.

## Shared Workers

[Shared Workers](https://developer.mozilla.org/en-US/docs/Web/API/SharedWorker) can be accessed from multiple browsing contexts (windows, iframes, other workers) making them useful for sharing tasks such as synchronization and avoiding redundant work.

In threads.js, the functionality is exposed as follows:

```js
// master.js
import { spawn, Thread, SharedWorker } from "threads"

const auth = await spawn(new SharedWorker("./workers/auth"))
const hashed = await auth.hashPassword("Super secret password", "1234")

console.log("Hashed password:", hashed)

await Thread.terminate(auth)
```

```js
// workers/auth.js
import sha256 from "js-sha256"
import { expose } from "threads/shared-worker"

exposeShared({
hashPassword(password, salt) {
return sha256(password + salt)
},
})
```

As you might notice, compared to the original example, only the imports (`Worker` -> `SharedWorker` and `expose` path) have changed.

Note that as the functionality makes sense only in the browser, it's available only there. Based on [caniuse](https://caniuse.com/sharedworkers), the functionality is widely supported [Safari being a notable exception](https://bugs.webkit.org/show_bug.cgi?id=149850).

## Usage

<p>
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
"build:es": "tsc -p tsconfig-esm.json",
"postbuild": "npm run bundle",
"bundle": "rollup -c -f umd --file=bundle/worker.js --name=threads --silent -- dist-esm/worker/bundle-entry.js",
"test": "npm run test:library && npm run test:tooling && npm run test:puppeteer:basic && npm run test:puppeteer:webpack",
"test": "npm run test:library && npm run test:tooling && npm run test:puppeteer:basic && test:puppeteer:shared && npm run test:puppeteer:webpack",
"test:library": "cross-env TS_NODE_FILES=true ava ./test/**/*.test.ts",
"test:tooling": "cross-env TS_NODE_FILES=true ava ./test-tooling/**/*.test.ts",
"test:puppeteer:basic": "puppet-run --plugin=mocha --bundle=./test/workers/:workers/ --serve=./bundle/worker.js:/worker.js ./test/*.chromium*.ts",
"test:puppeteer:shared": "puppet-run --plugin=mocha --bundle=./test/shared-workers/:shared-workers/ --serve=./bundle/worker.js:/worker.js ./test/shared.ts",
"test:puppeteer:webpack": "puppet-run --serve ./test-tooling/webpack/dist/app.web/0.worker.js --serve ./test-tooling/webpack/dist/app.web/1.worker.js --plugin=mocha ./test-tooling/webpack/webpack.chromium.mocha.ts",
"posttest": "tslint --project .",
"prepare": "npm run build"
Expand Down Expand Up @@ -128,6 +129,7 @@
"./dist/worker/implementation.js": "./dist/worker/implementation.browser.js",
"./dist/worker/implementation.tiny-worker.js": false,
"./dist/worker/implementation.worker_threads.js": false,
"./dist/shared-worker/implementation.js": "./dist/shared-worker/implementation.js",
"./dist-esm/master/implementation.js": "./dist-esm/master/implementation.browser.js",
"./dist-esm/master/implementation.node.js": false,
"./dist-esm/worker/implementation.js": "./dist-esm/worker/implementation.browser.js",
Expand Down
273 changes: 273 additions & 0 deletions src/get-expose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
import isSomeObservable from "is-observable"
import { Observable, Subscription } from "observable-fns"
import { deserialize, serialize } from "./common"
import { isTransferDescriptor, TransferDescriptor } from "./transferable"
import {
MasterJobCancelMessage,
MasterJobRunMessage,
MasterMessageType,
SerializedError,
WorkerInitMessage,
WorkerJobErrorMessage,
WorkerJobResultMessage,
WorkerJobStartMessage,
WorkerMessageType,
WorkerUncaughtErrorMessage,
} from "./types/messages"
import {
AbstractedWorkerAPI,
WorkerFunction,
WorkerModule,
} from "./types/worker"

function getExpose(Implementation: AbstractedWorkerAPI) {
let exposeCalled = false

const activeSubscriptions = new Map<number, Subscription<any>>()

const isMasterJobCancelMessage = (
thing: any
): thing is MasterJobCancelMessage =>
thing && thing.type === MasterMessageType.cancel
const isMasterJobRunMessage = (thing: any): thing is MasterJobRunMessage =>
thing && thing.type === MasterMessageType.run

/**
* There are issues with `is-observable` not recognizing zen-observable's instances.
* We are using `observable-fns`, but it's based on zen-observable, too.
*/
const isObservable = (thing: any): thing is Observable<any> =>
isSomeObservable(thing) || isZenObservable(thing)

function isZenObservable(thing: any): thing is Observable<any> {
return (
thing &&
typeof thing === "object" &&
typeof thing.subscribe === "function"
)
}

function deconstructTransfer(thing: any) {
return isTransferDescriptor(thing)
? { payload: thing.send, transferables: thing.transferables }
: { payload: thing, transferables: undefined }
}

function postFunctionInitMessage() {
const initMessage: WorkerInitMessage = {
type: WorkerMessageType.init,
exposed: {
type: "function",
},
}
Implementation.postMessageToMaster(initMessage)
}

function postModuleInitMessage(methodNames: string[]) {
const initMessage: WorkerInitMessage = {
type: WorkerMessageType.init,
exposed: {
type: "module",
methods: methodNames,
},
}
Implementation.postMessageToMaster(initMessage)
}

function postJobErrorMessage(
uid: number,
rawError: Error | TransferDescriptor<Error>
) {
const { payload: error, transferables } = deconstructTransfer(rawError)
const errorMessage: WorkerJobErrorMessage = {
type: WorkerMessageType.error,
uid,
error: serialize(error) as any as SerializedError,
}
Implementation.postMessageToMaster(errorMessage, transferables)
}

function postJobResultMessage(
uid: number,
completed: boolean,
resultValue?: any
) {
const { payload, transferables } = deconstructTransfer(resultValue)
const resultMessage: WorkerJobResultMessage = {
type: WorkerMessageType.result,
uid,
complete: completed ? true : undefined,
payload,
}
Implementation.postMessageToMaster(resultMessage, transferables)
}

function postJobStartMessage(
uid: number,
resultType: WorkerJobStartMessage["resultType"]
) {
const startMessage: WorkerJobStartMessage = {
type: WorkerMessageType.running,
uid,
resultType,
}
Implementation.postMessageToMaster(startMessage)
}

function postUncaughtErrorMessage(error: Error) {
try {
const errorMessage: WorkerUncaughtErrorMessage = {
type: WorkerMessageType.uncaughtError,
error: serialize(error) as any as SerializedError,
}
Implementation.postMessageToMaster(errorMessage)
} catch (subError) {
// tslint:disable-next-line no-console
console.error(
"Not reporting uncaught error back to master thread as it " +
"occured while reporting an uncaught error already." +
"\nLatest error:",
subError,
"\nOriginal error:",
error
)
}
}

async function runFunction(jobUID: number, fn: WorkerFunction, args: any[]) {
let syncResult: any

try {
syncResult = fn(...args)
} catch (error) {
return postJobErrorMessage(jobUID, error)
}

const resultType = isObservable(syncResult) ? "observable" : "promise"
postJobStartMessage(jobUID, resultType)

if (isObservable(syncResult)) {
const subscription = syncResult.subscribe(
(value) => postJobResultMessage(jobUID, false, serialize(value)),
(error) => {
postJobErrorMessage(jobUID, serialize(error) as any)
activeSubscriptions.delete(jobUID)
},
() => {
postJobResultMessage(jobUID, true)
activeSubscriptions.delete(jobUID)
}
)
activeSubscriptions.set(jobUID, subscription)
} else {
try {
const result = await syncResult
postJobResultMessage(jobUID, true, serialize(result))
} catch (error) {
postJobErrorMessage(jobUID, serialize(error) as any)
}
}
}

/**
* Expose a function or a module (an object whose values are functions)
* to the main thread. Must be called exactly once in every worker thread
* to signal its API to the main thread.
*
* @param exposed Function or object whose values are functions
*/
function expose(exposed: WorkerFunction | WorkerModule<any>) {
if (!Implementation.isWorkerRuntime()) {
throw Error("expose() called in the master thread.")
}
if (exposeCalled) {
throw Error(
"expose() called more than once. This is not possible. Pass an object to expose() if you want to expose multiple functions."
)
}
exposeCalled = true

if (typeof exposed === "function") {
Implementation.subscribeToMasterMessages((messageData) => {
if (isMasterJobRunMessage(messageData) && !messageData.method) {
runFunction(
messageData.uid,
exposed,
messageData.args.map(deserialize)
)
}
})
postFunctionInitMessage()
} else if (typeof exposed === "object" && exposed) {
Implementation.subscribeToMasterMessages((messageData) => {
if (isMasterJobRunMessage(messageData) && messageData.method) {
runFunction(
messageData.uid,
exposed[messageData.method],
messageData.args.map(deserialize)
)
}
})

const methodNames = Object.keys(exposed).filter(
(key) => typeof exposed[key] === "function"
)
postModuleInitMessage(methodNames)
} else {
throw Error(
`Invalid argument passed to expose(). Expected a function or an object, got: ${exposed}`
)
}

Implementation.subscribeToMasterMessages((messageData) => {
if (isMasterJobCancelMessage(messageData)) {
const jobUID = messageData.uid
const subscription = activeSubscriptions.get(jobUID)

if (subscription) {
subscription.unsubscribe()
activeSubscriptions.delete(jobUID)
}
}
})
}

if (
typeof self !== "undefined" &&
typeof self.addEventListener === "function" &&
Implementation.isWorkerRuntime()
) {
self.addEventListener("error", (event) => {
// Post with some delay, so the master had some time to subscribe to messages
setTimeout(() => postUncaughtErrorMessage(event.error || event), 250)
})
self.addEventListener("unhandledrejection", (event) => {
const error = (event as any).reason
if (error && typeof (error as any).message === "string") {
// Post with some delay, so the master had some time to subscribe to messages
setTimeout(() => postUncaughtErrorMessage(error), 250)
}
})
}

if (
typeof process !== "undefined" &&
typeof process.on === "function" &&
Implementation.isWorkerRuntime()
) {
process.on("uncaughtException", (error) => {
// Post with some delay, so the master had some time to subscribe to messages
setTimeout(() => postUncaughtErrorMessage(error), 250)
})
process.on("unhandledRejection", (error) => {
if (error && typeof (error as any).message === "string") {
// Post with some delay, so the master had some time to subscribe to messages
setTimeout(() => postUncaughtErrorMessage(error as any), 250)
}
})
}

return expose
}

export default getExpose
8 changes: 7 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
export { registerSerializer } from "./common"
export * from "./master/index"
export { expose } from "./worker/index"
export { DefaultSerializer, JsonSerializable, Serializer, SerializerImplementation } from "./serializers"
export { expose as exposeShared } from "./shared-worker/index"
export {
DefaultSerializer,
JsonSerializable,
Serializer,
SerializerImplementation,
} from "./serializers"
export { Transfer, TransferDescriptor } from "./transferable"
1 change: 1 addition & 0 deletions src/master/implementation.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ function selectWorkerImplementation(): ImplementationExport {

return {
blob: BlobWorker,
shared: SharedWorker,
default: WebWorker
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/master/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ export const BlobWorker = getWorkerImplementation().blob

/** Worker implementation. Either web worker or a node.js Worker class. */
export const Worker = getWorkerImplementation().default

/** Shared Worker implementation. Available only in the web. */
export const SharedWorker = getWorkerImplementation().shared
Loading