Skip to content

Commit 1c6506a

Browse files
authored
Add a subpackage for running child processes synchronously (#81)
0 parents  commit 1c6506a

File tree

2 files changed

+330
-0
lines changed

2 files changed

+330
-0
lines changed

lib/index.test.ts

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
// Copyright 2021 Google LLC. Use of this source code is governed by an
2+
// MIT-style license that can be found in the LICENSE file or at
3+
// https://opensource.org/licenses/MIT.
4+
5+
import * as fs from 'fs';
6+
import * as p from 'path';
7+
import {MessagePort, Worker} from 'worker_threads';
8+
9+
import {SyncMessagePort} from './sync-message-port';
10+
11+
describe('SyncMessagePort', () => {
12+
describe('sends a message', () => {
13+
it('before the other endpoint calls receiveMessage()', () => {
14+
const channel = SyncMessagePort.createChannel();
15+
const port1 = new SyncMessagePort(channel.port1);
16+
port1.postMessage('hi there!');
17+
18+
const port2 = new SyncMessagePort(channel.port2);
19+
expect(port2.receiveMessage()).toEqual('hi there!');
20+
});
21+
22+
it('after the other endpoint calls receiveMessage()', () => {
23+
const channel = SyncMessagePort.createChannel();
24+
const port = new SyncMessagePort(channel.port1);
25+
26+
spawnWorker(
27+
`
28+
// Wait a little bit just to make entirely sure that the parent thread
29+
// is awaiting a message.
30+
setTimeout(() => {
31+
port.postMessage('done!');
32+
port.close();
33+
}, 100);
34+
`,
35+
channel.port2
36+
);
37+
38+
expect(port.receiveMessage()).toEqual('done!');
39+
});
40+
41+
it('multiple times before the other endpoint starts reading', () => {
42+
const channel = SyncMessagePort.createChannel();
43+
const port1 = new SyncMessagePort(channel.port1);
44+
port1.postMessage('message1');
45+
port1.postMessage('message2');
46+
port1.postMessage('message3');
47+
port1.postMessage('message4');
48+
49+
const port2 = new SyncMessagePort(channel.port2);
50+
expect(port2.receiveMessage()).toEqual('message1');
51+
expect(port2.receiveMessage()).toEqual('message2');
52+
expect(port2.receiveMessage()).toEqual('message3');
53+
expect(port2.receiveMessage()).toEqual('message4');
54+
});
55+
});
56+
57+
describe('with an asynchronous listener', () => {
58+
it('receives a message sent before listening', async () => {
59+
const channel = SyncMessagePort.createChannel();
60+
const port1 = new SyncMessagePort(channel.port1);
61+
port1.postMessage('hi there!');
62+
63+
const port2 = new SyncMessagePort(channel.port2);
64+
65+
// Wait a macrotask to make sure the message is as queued up as it's going
66+
// to be.
67+
await new Promise(process.nextTick);
68+
69+
const promise = new Promise(resolve => port2.once('message', resolve));
70+
await expect(promise).resolves.toEqual('hi there!');
71+
port1.close();
72+
});
73+
74+
it('receives a message sent after listening', async () => {
75+
const channel = SyncMessagePort.createChannel();
76+
const port1 = new SyncMessagePort(channel.port1);
77+
const promise = new Promise(resolve => port1.once('message', resolve));
78+
79+
// Wait a macrotask to make sure the message is as queued up as it's going
80+
// to be.
81+
await new Promise(process.nextTick);
82+
const port2 = new SyncMessagePort(channel.port2);
83+
port2.postMessage('hi there!');
84+
85+
await expect(promise).resolves.toEqual('hi there!');
86+
port1.close();
87+
});
88+
89+
it('receiveMessage() throws an error after listening', async () => {
90+
const channel = SyncMessagePort.createChannel();
91+
const port1 = new SyncMessagePort(channel.port1);
92+
port1.on('message', () => {});
93+
94+
expect(port1.receiveMessage).toThrow();
95+
port1.close();
96+
});
97+
});
98+
99+
describe('close()', () => {
100+
it('closing one port closes the other', async () => {
101+
const channel = SyncMessagePort.createChannel();
102+
const port1 = new SyncMessagePort(channel.port1);
103+
const port2 = new SyncMessagePort(channel.port2);
104+
105+
port1.close();
106+
107+
// Should resolve.
108+
await new Promise(resolve => port2.once('close', resolve));
109+
});
110+
111+
it('receiveMessage() throws an error for a closed port', () => {
112+
const channel = SyncMessagePort.createChannel();
113+
const port1 = new SyncMessagePort(channel.port1);
114+
const port2 = new SyncMessagePort(channel.port2);
115+
116+
port1.close();
117+
expect(port1.receiveMessage).toThrow();
118+
expect(port2.receiveMessage).toThrow();
119+
});
120+
});
121+
});
122+
123+
/**
124+
* Spawns a worker that executes the given TypeScript `source`.
125+
*
126+
* Automatically initializes a `SyncMessageChannel` named `port` connected to
127+
* `port`.
128+
*/
129+
function spawnWorker(source: string, port: MessagePort): Worker {
130+
fs.mkdirSync('spec/sandbox', {recursive: true});
131+
const file = p.join('spec/sandbox', `${Math.random()}.ts`.slice(2));
132+
fs.writeFileSync(
133+
file,
134+
`
135+
const {SyncMessagePort} = require(${JSON.stringify(
136+
p.join(p.dirname(__filename), 'sync-message-port')
137+
)});
138+
const {workerData} = require('worker_threads');
139+
140+
const port = new SyncMessagePort(workerData);
141+
142+
${source}
143+
`
144+
);
145+
146+
const worker = new Worker(
147+
`
148+
require('ts-node').register();
149+
require(${JSON.stringify(p.resolve(file.substring(0, file.length - 3)))});
150+
`,
151+
{eval: true, workerData: port, transferList: [port]}
152+
);
153+
154+
worker.on('error', error => {
155+
throw error;
156+
});
157+
worker.on('exit', () => fs.unlinkSync(file));
158+
159+
return worker;
160+
}

lib/index.ts

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright 2021 Google LLC. Use of this source code is governed by an
2+
// MIT-style license that can be found in the LICENSE file or at
3+
// https://opensource.org/licenses/MIT.
4+
5+
import {strict as assert} from 'assert';
6+
import {EventEmitter} from 'events';
7+
import {
8+
receiveMessageOnPort,
9+
MessageChannel,
10+
MessagePort,
11+
TransferListItem,
12+
} from 'worker_threads';
13+
14+
// TODO(nex3): Make this its own package.
15+
16+
/**
17+
* An enum of possible states for the shared buffer that two `SyncMessagePort`s
18+
* use to communicate.
19+
*/
20+
enum BufferState {
21+
/**
22+
* The initial state. When an endpoint is ready to receive messages, it'll set
23+
* the buffer to this state so that it can use `Atomics.wait()` to be notified
24+
* when it switches to `MessageSent`.
25+
*/
26+
AwaitingMessage,
27+
/**
28+
* The state indicating that a message has been sent. Whenever an endpoint
29+
* sends a message, it'll set the buffer to this state so that the other
30+
* endpoint's `Atomics.wait()` call terminates.
31+
*/
32+
MessageSent,
33+
/**
34+
* The state indicating that the channel has been closed. This never
35+
* transitions to any other states.
36+
*/
37+
Closed,
38+
}
39+
40+
/**
41+
* A communication port that can receive messages synchronously from another
42+
* `SyncMessagePort`.
43+
*
44+
* This also emits the same asynchronous events as `MessagePort`.
45+
*/
46+
export class SyncMessagePort extends EventEmitter {
47+
/** Creates a channel whose ports can be passed to `new SyncMessagePort()`. */
48+
static createChannel(): MessageChannel {
49+
const channel = new MessageChannel();
50+
// Four bytes is the minimum necessary to use `Atomics.wait()`.
51+
const buffer = new SharedArrayBuffer(4);
52+
53+
// Queue up messages on each port so the caller doesn't have to explicitly
54+
// pass the buffer around along with them.
55+
channel.port1.postMessage(buffer);
56+
channel.port2.postMessage(buffer);
57+
return channel;
58+
}
59+
60+
/**
61+
* An Int32 view of the shared buffer.
62+
*
63+
* Each port sets this to `BufferState.AwaitingMessage` before checking for
64+
* new messages in `receiveMessage()`, and each port sets it to
65+
* `BufferState.MessageSent` after sending a new message. It's set to
66+
* `BufferState.Closed` when the channel is closed.
67+
*/
68+
private readonly buffer: Int32Array;
69+
70+
/**
71+
* Creates a new message port. The `port` must be created by
72+
* `SyncMessagePort.createChannel()` and must connect to a port passed to
73+
* another `SyncMessagePort` in another worker.
74+
*/
75+
constructor(private readonly port: MessagePort) {
76+
super();
77+
78+
const buffer = receiveMessageOnPort(this.port)?.message;
79+
if (!buffer) {
80+
throw new Error(
81+
'new SyncMessagePort() must be passed a port from ' +
82+
'SyncMessagePort.createChannel().'
83+
);
84+
}
85+
this.buffer = new Int32Array(buffer as SharedArrayBuffer);
86+
87+
this.on('newListener', (event, listener) => {
88+
this.port.on(event, listener);
89+
});
90+
this.on('removeListener', (event, listener) =>
91+
this.port.removeListener(event, listener)
92+
);
93+
}
94+
95+
/** See `MessagePort.postMesage()`. */
96+
postMessage(value: unknown, transferList?: TransferListItem[]): void {
97+
this.port.postMessage(value, transferList);
98+
99+
// If the other port is waiting for a new message, notify it that the
100+
// message is ready. Use `Atomics.compareExchange` so that we don't
101+
// overwrite the "closed" state.
102+
if (
103+
Atomics.compareExchange(
104+
this.buffer,
105+
0,
106+
BufferState.AwaitingMessage,
107+
BufferState.MessageSent
108+
) === BufferState.AwaitingMessage
109+
) {
110+
Atomics.notify(this.buffer, 0);
111+
}
112+
}
113+
114+
// TODO(nex3):
115+
// * Add a non-blocking `receiveMessage()`
116+
// * Add a timeout option to `receiveMessage()`
117+
// * Add an option to `receiveMessage()` to return a special value if the
118+
// channel is closed.
119+
120+
/**
121+
* Blocks and returns the next message sent by the other port.
122+
*
123+
* This may not be called while this has a listener for the `'message'` event.
124+
* Throws an error if the channel is closed, including if it closes while this
125+
* is waiting for a message.
126+
*/
127+
receiveMessage(): unknown {
128+
if (this.listenerCount('message')) {
129+
throw new Error(
130+
'SyncMessageChannel.receiveMessage() may not be called while there ' +
131+
'are message listeners.'
132+
);
133+
}
134+
135+
// Set the "new message" indicator to zero before we check for new messages.
136+
// That way if the other port sets it to 1 between the call to
137+
// `receiveMessageOnPort` and the call to `Atomics.wait()`, we won't
138+
// overwrite it. Use `Atomics.compareExchange` so that we don't overwrite
139+
// the "closed" state.
140+
if (
141+
Atomics.compareExchange(
142+
this.buffer,
143+
0,
144+
BufferState.MessageSent,
145+
BufferState.AwaitingMessage
146+
) === BufferState.Closed
147+
) {
148+
throw new Error("The SyncMessagePort's channel is closed.");
149+
}
150+
151+
let message = receiveMessageOnPort(this.port);
152+
if (message) return message.message;
153+
154+
// If there's no new message, wait for the other port to flip the "new
155+
// message" indicator to 1. If it's been set to 1 since we stored 0, this
156+
// will terminate immediately.
157+
Atomics.wait(this.buffer, 0, BufferState.AwaitingMessage);
158+
message = receiveMessageOnPort(this.port);
159+
if (message) return message.message;
160+
161+
assert.equal(Atomics.load(this.buffer, 0), BufferState.Closed);
162+
throw new Error("The SyncMessagePort's channel is closed.");
163+
}
164+
165+
/** See `MessagePort.close()`. */
166+
close(): void {
167+
Atomics.store(this.buffer, 0, BufferState.Closed);
168+
this.port.close();
169+
}
170+
}

0 commit comments

Comments
 (0)