Skip to content

Commit a409c2a

Browse files
committed
worker_threads: add everysync
Signed-off-by: Matteo Collina <[email protected]>
1 parent ca74d64 commit a409c2a

File tree

10 files changed

+420
-0
lines changed

10 files changed

+420
-0
lines changed

LICENSE

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2639,3 +2639,28 @@ The externally maintained libraries used by Node.js are:
26392639
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26402640
SOFTWARE.
26412641
"""
2642+
2643+
- everysync, located at lib/internal/worker/everysync, is licensed as follows:
2644+
"""
2645+
MIT License
2646+
2647+
Copyright (c) 2024 Matteo Collina
2648+
2649+
Permission is hereby granted, free of charge, to any person obtaining a copy
2650+
of this software and associated documentation files (the "Software"), to deal
2651+
in the Software without restriction, including without limitation the rights
2652+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
2653+
copies of the Software, and to permit persons to whom the Software is
2654+
furnished to do so, subject to the following conditions:
2655+
2656+
The above copyright notice and this permission notice shall be included in all
2657+
copies or substantial portions of the Software.
2658+
2659+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
2660+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
2661+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
2662+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
2663+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2664+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2665+
SOFTWARE.
2666+
"""

doc/api/worker_threads.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1527,6 +1527,78 @@ Calling `unref()` on a worker allows the thread to exit if this is the only
15271527
active handle in the event system. If the worker is already `unref()`ed calling
15281528
`unref()` again has no effect.
15291529

1530+
## `worker.makeSync(buffer[, options])`
1531+
1532+
<!-- YAML
1533+
added: REPLACEME
1534+
-->
1535+
1536+
* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication.
1537+
* `options` {Object}
1538+
* `timeout` {number} The timeout in milliseconds for synchronous calls. **Default:** `5000`.
1539+
* `expandable` {boolean} Whether the buffer can be resized. **Default:** `true` if the buffer supports `growable` option.
1540+
* Returns: {Object} An object with synchronous methods mirroring those exposed through `wire()`.
1541+
1542+
Creates a synchronous API facade that communicates with a worker thread over a shared memory buffer.
1543+
The worker thread must call `wire()` on the same buffer to register the methods that can be called.
1544+
1545+
This function enables making synchronous calls to a worker thread, which is particularly useful
1546+
when code requires blocking operations but still wants to benefit from the worker thread's isolation.
1547+
1548+
```js
1549+
const { Worker, makeSync } = require('node:worker_threads');
1550+
1551+
// Create a SharedArrayBuffer for communication
1552+
const buffer = new SharedArrayBuffer(1024, {
1553+
maxByteLength: 64 * 1024 * 1024
1554+
});
1555+
1556+
// Create a worker, passing the buffer
1557+
const worker = new Worker('worker-script.js', {
1558+
workerData: { buffer }
1559+
});
1560+
1561+
// Create a synchronous API facade
1562+
const api = makeSync(buffer);
1563+
1564+
// Call a method synchronously - this will block until the worker responds
1565+
const result = api.methodName(arg1, arg2);
1566+
```
1567+
1568+
## `worker.wire(buffer, methods)`
1569+
1570+
<!-- YAML
1571+
added: REPLACEME
1572+
-->
1573+
1574+
* `buffer` {SharedArrayBuffer} A shared memory buffer to use for communication.
1575+
* `methods` {Object} An object whose properties are methods to expose to the main thread.
1576+
1577+
Exposes methods to the main thread that can be called synchronously using `makeSync()`.
1578+
The methods can be async functions or return promises, and the main thread will wait
1579+
for the promise to resolve or reject.
1580+
1581+
```js
1582+
const { workerData, wire } = require('node:worker_threads');
1583+
1584+
// Expose methods synchronously to the main thread
1585+
wire(workerData.buffer, {
1586+
async methodName(arg1, arg2) {
1587+
// Do work asynchronously
1588+
return result;
1589+
},
1590+
1591+
syncMethod(arg) {
1592+
// Do synchronous work
1593+
return result;
1594+
}
1595+
});
1596+
```
1597+
1598+
The `wire()` function should be called early in the worker's lifecycle to register
1599+
the methods before the main thread attempts to call them. Any values returned by
1600+
these methods are serialized and passed back to the main thread.
1601+
15301602
## Notes
15311603
15321604
### Synchronous blocking of stdio
@@ -1637,6 +1709,8 @@ thread spawned will spawn another until the application crashes.
16371709
[`worker.postMessage()`]: #workerpostmessagevalue-transferlist
16381710
[`worker.terminate()`]: #workerterminate
16391711
[`worker.threadId`]: #workerthreadid_1
1712+
[`worker.makeSync()`]: #workermakesyncbuffer-options
1713+
[`worker.wire()`]: #workerwirebuffer-methods
16401714
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool
16411715
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
16421716
[child processes]: child_process.md
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
'use strict';
2+
3+
const { read, write } = require('internal/worker/everysync/objects');
4+
const {
5+
OFFSET,
6+
TO_MAIN,
7+
TO_WORKER,
8+
} = require('internal/worker/everysync/indexes');
9+
10+
/**
11+
* Creates a synchronous API facade from a shared memory buffer.
12+
* This function is meant to be used in the main thread to communicate with
13+
* a worker thread that has called `wire()` on the same shared memory.
14+
*
15+
* @param {SharedArrayBuffer} data - The shared memory buffer for communication
16+
* @param {Object} [opts={}] - Options object
17+
* @param {number} [opts.timeout=1000] - Timeout in milliseconds for synchronous operations
18+
* @returns {Object} - An object with methods that match the ones exposed by the worker
19+
*/
20+
function makeSync(data, opts = {}) {
21+
const timeout = opts.timeout || 1000;
22+
const metaView = new Int32Array(data);
23+
24+
const res = Atomics.wait(metaView, TO_WORKER, 0, timeout);
25+
Atomics.store(metaView, TO_WORKER, 0);
26+
27+
if (res === 'ok') {
28+
const obj = read(data, OFFSET);
29+
30+
const api = {};
31+
for (const key of obj) {
32+
api[key] = (...args) => {
33+
write(data, { key, args }, OFFSET);
34+
Atomics.store(metaView, TO_MAIN, 1);
35+
Atomics.notify(metaView, TO_MAIN, 1);
36+
const res = Atomics.wait(metaView, TO_WORKER, 0, timeout);
37+
Atomics.store(metaView, TO_WORKER, 0);
38+
if (res === 'ok') {
39+
const obj = read(data, OFFSET);
40+
return obj;
41+
} else {
42+
throw new Error(`The response timed out after ${timeout}ms`);
43+
}
44+
};
45+
}
46+
47+
return api;
48+
} else {
49+
throw new Error(`The initialization timed out after ${timeout}ms`);
50+
}
51+
}
52+
53+
/**
54+
* Wires up a shared memory buffer to invoke methods on an object.
55+
* This function is meant to be used in a worker thread to expose methods
56+
* to the main thread that has called `makeSync()` on the same shared memory.
57+
*
58+
* @param {SharedArrayBuffer} data - The shared memory buffer for communication
59+
* @param {Object} obj - Object with methods to expose to the main thread
60+
* @returns {Promise<void>} - A promise that never resolves unless there's an error
61+
*/
62+
async function wire(data, obj) {
63+
write(data, Object.keys(obj), OFFSET);
64+
65+
const metaView = new Int32Array(data);
66+
67+
Atomics.store(metaView, TO_WORKER, 1);
68+
Atomics.notify(metaView, TO_WORKER);
69+
70+
while (true) {
71+
const waitAsync = Atomics.waitAsync(metaView, TO_MAIN, 0);
72+
const res = await waitAsync.value;
73+
Atomics.store(metaView, TO_MAIN, 0);
74+
75+
if (res === 'ok') {
76+
const { key, args } = read(data, OFFSET);
77+
// This is where the magic happens - invoke the requested method
78+
const result = await obj[key](...args);
79+
write(data, result, OFFSET);
80+
Atomics.store(metaView, TO_WORKER, 1);
81+
Atomics.notify(metaView, TO_WORKER, 1);
82+
}
83+
}
84+
}
85+
86+
module.exports = {
87+
makeSync,
88+
wire
89+
};
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
3+
/**
4+
* Byte offset where the actual data begins in the shared memory
5+
* @type {number}
6+
*/
7+
const OFFSET = 64;
8+
9+
/**
10+
* Index in the Int32Array for signaling from worker to main thread
11+
* 0: writing from worker, reading from main
12+
* @type {number}
13+
*/
14+
const TO_WORKER = 0;
15+
16+
/**
17+
* Index in the Int32Array for signaling from main to worker thread
18+
* 1: writing from main, reading from worker
19+
* @type {number}
20+
*/
21+
const TO_MAIN = 1;
22+
23+
module.exports = {
24+
OFFSET,
25+
TO_WORKER,
26+
TO_MAIN
27+
};
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
'use strict';
2+
3+
const { serialize, deserialize } = require('v8');
4+
5+
/**
6+
* Reads an object from a shared memory buffer
7+
*
8+
* @param {SharedArrayBuffer} buffer - The shared memory buffer containing serialized data
9+
* @param {number} [byteOffset=0] - Byte offset where the data begins
10+
* @returns {any} - The deserialized object
11+
*/
12+
function read(buffer, byteOffset = 0) {
13+
const view = new DataView(buffer, byteOffset);
14+
const length = view.getUint32(0, true);
15+
const object = deserialize(new Uint8Array(buffer, byteOffset + 4, length));
16+
return object;
17+
}
18+
19+
/**
20+
* Writes an object to a shared memory buffer
21+
*
22+
* @param {SharedArrayBuffer} buffer - The shared memory buffer to write to
23+
* @param {any} object - The object to serialize and write
24+
* @param {number} [byteOffset=0] - Byte offset where to write the data
25+
* @throws {Error} If the buffer is too small and not growable
26+
*/
27+
function write(buffer, object, byteOffset = 0) {
28+
const data = serialize(object);
29+
30+
if (buffer.byteLength < data.byteLength + 4 + byteOffset) {
31+
// Check if buffer is growable (has grow method from ShareArrayBuffer.prototype)
32+
if (typeof buffer.grow !== 'function') {
33+
throw new Error('Buffer is too small and not growable');
34+
}
35+
36+
buffer.grow(data.byteLength + 4 + byteOffset);
37+
}
38+
39+
const view = new DataView(buffer, byteOffset);
40+
view.setUint32(0, data.byteLength, true);
41+
new Uint8Array(buffer, byteOffset + 4).set(data);
42+
}
43+
44+
module.exports = {
45+
read,
46+
write
47+
};

lib/worker_threads.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ const {
2929
isMarkedAsUntransferable,
3030
} = require('internal/buffer');
3131

32+
const {
33+
makeSync,
34+
wire
35+
} = require('internal/worker/everysync/index');
36+
3237
module.exports = {
3338
isInternalThread,
3439
isMainThread,
@@ -49,4 +54,6 @@ module.exports = {
4954
BroadcastChannel,
5055
setEnvironmentData,
5156
getEnvironmentData,
57+
makeSync,
58+
wire
5259
};

test/fixtures/everysync/echo.mjs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { workerData, wire } from 'node:worker_threads';
2+
3+
wire(workerData.data, {
4+
async echo(arg) {
5+
return arg;
6+
},
7+
});
8+
9+
// Keep the event loop alive
10+
setInterval(() => {}, 100000);
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { workerData, wire } from 'node:worker_threads';
2+
3+
wire(workerData.data, {
4+
fail(arg) {
5+
return new Promise((resolve, reject) => {
6+
// nothing to do here, we will fail
7+
});
8+
},
9+
});
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { join } = require('path');
6+
const { Worker, makeSync } = require('worker_threads');
7+
8+
// Test makeSync and wire functionality
9+
{
10+
const buffer = new SharedArrayBuffer(1024, {
11+
maxByteLength: 64 * 1024 * 1024,
12+
});
13+
const worker = new Worker(join(__dirname, '..', 'fixtures', 'everysync', 'echo.mjs'), {
14+
workerData: {
15+
data: buffer,
16+
},
17+
});
18+
19+
const api = makeSync(buffer);
20+
21+
assert.strictEqual(api.echo(42), 42);
22+
assert.strictEqual(api.echo('test'), 'test');
23+
assert.deepStrictEqual(api.echo({ foo: 'bar' }), { foo: 'bar' });
24+
25+
worker.terminate();
26+
}
27+
28+
// Test timeout failure
29+
{
30+
const buffer = new SharedArrayBuffer(1024, {
31+
maxByteLength: 64 * 1024 * 1024,
32+
});
33+
const worker = new Worker(join(__dirname, '..', 'fixtures', 'everysync', 'failure.mjs'), {
34+
workerData: {
35+
data: buffer,
36+
},
37+
});
38+
39+
const api = makeSync(buffer, { timeout: 100 });
40+
41+
assert.throws(() => api.fail(), {
42+
message: 'The response timed out after 100ms'
43+
});
44+
45+
worker.terminate();
46+
}
47+
48+
// Test initialization timeout
49+
{
50+
const buffer = new SharedArrayBuffer(1024, {
51+
maxByteLength: 64 * 1024 * 1024,
52+
});
53+
54+
assert.throws(() => makeSync(buffer, { timeout: 100 }), {
55+
message: 'The initialization timed out after 100ms'
56+
});
57+
}

0 commit comments

Comments
 (0)