Skip to content

Commit 9076adb

Browse files
ntkmenex3
andauthored
Fix race condition in SyncMessagePort (#327)
Co-authored-by: Natalie Weizenbaum <[email protected]>
1 parent 80a5038 commit 9076adb

File tree

2 files changed

+30
-7
lines changed

2 files changed

+30
-7
lines changed

lib/src/sync-process/sync-message-port.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ describe('SyncMessagePort', () => {
3636
);
3737

3838
expect(port.receiveMessage()).toEqual('done!');
39+
expect(port.receiveMessage).toThrow();
3940
});
4041

4142
it('multiple times before the other endpoint starts reading', () => {
@@ -52,6 +53,24 @@ describe('SyncMessagePort', () => {
5253
expect(port2.receiveMessage()).toEqual('message3');
5354
expect(port2.receiveMessage()).toEqual('message4');
5455
});
56+
57+
it('multiple times and close', () => {
58+
const channel = SyncMessagePort.createChannel();
59+
const port = new SyncMessagePort(channel.port1);
60+
61+
spawnWorker(
62+
`
63+
port.postMessage('message1');
64+
port.postMessage('done!');
65+
port.close();
66+
`,
67+
channel.port2
68+
);
69+
70+
expect(port.receiveMessage()).toEqual('message1');
71+
expect(port.receiveMessage()).toEqual('done!');
72+
expect(port.receiveMessage).toThrow();
73+
});
5574
});
5675

5776
describe('with an asynchronous listener', () => {

lib/src/sync-process/sync-message-port.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,19 @@ enum BufferState {
2323
* the buffer to this state so that it can use `Atomics.wait()` to be notified
2424
* when it switches to `MessageSent`.
2525
*/
26-
AwaitingMessage,
26+
AwaitingMessage = 0b00,
2727
/**
2828
* The state indicating that a message has been sent. Whenever an endpoint
2929
* sends a message, it'll set the buffer to this state so that the other
3030
* endpoint's `Atomics.wait()` call terminates.
3131
*/
32-
MessageSent,
32+
MessageSent = 0b01,
3333
/**
34-
* The state indicating that the channel has been closed. This never
35-
* transitions to any other states.
34+
* The bitmask indicating that the channel has been closed. This is masked on
35+
* top of AwaitingMessage and MessageSent state. It never transitions to any
36+
* other states once closed.
3637
*/
37-
Closed,
38+
Closed = 0b10,
3839
}
3940

4041
/**
@@ -158,13 +159,16 @@ export class SyncMessagePort extends EventEmitter {
158159
message = receiveMessageOnPort(this.port);
159160
if (message) return message.message;
160161

161-
assert.equal(Atomics.load(this.buffer, 0), BufferState.Closed);
162+
// Update the state to 0b10 after the last message is consumed.
163+
const oldState = Atomics.and(this.buffer, 0, BufferState.Closed);
164+
// Assert the old state was either 0b10 or 0b11.
165+
assert.equal(oldState & BufferState.Closed, BufferState.Closed);
162166
throw new Error("The SyncMessagePort's channel is closed.");
163167
}
164168

165169
/** See `MessagePort.close()`. */
166170
close(): void {
167-
Atomics.store(this.buffer, 0, BufferState.Closed);
171+
Atomics.or(this.buffer, 0, BufferState.Closed);
168172
this.port.close();
169173
}
170174
}

0 commit comments

Comments
 (0)