Skip to content

Commit d122ca4

Browse files
committed
Add a timeout option
1 parent f3181ed commit d122ca4

File tree

3 files changed

+116
-12
lines changed

3 files changed

+116
-12
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
* Add `SyncMessagePort.receiveMessageIfAvailable()`.
44

5+
* Add `timeout` and `timeoutValue` options to
6+
`SyncMessagePort.receiveMessage()`.
7+
58
## 1.0.0
69

710
* Initial release

lib/index.test.ts

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import * as fs from 'fs';
66
import * as p from 'path';
77
import {MessagePort, Worker} from 'worker_threads';
88

9-
import {SyncMessagePort} from './index';
9+
import {SyncMessagePort, TimeoutException} from './index';
1010

1111
describe('SyncMessagePort', () => {
1212
describe('sends a message', () => {
@@ -127,6 +127,70 @@ describe('SyncMessagePort', () => {
127127
});
128128
});
129129

130+
describe('timeout', () => {
131+
it("returns a value if it's already available", () => {
132+
const channel = SyncMessagePort.createChannel();
133+
const port1 = new SyncMessagePort(channel.port1);
134+
const port2 = new SyncMessagePort(channel.port2);
135+
port1.postMessage('message');
136+
expect(port2.receiveMessage({timeout: 0})).toBe('message');
137+
});
138+
139+
it('returns a value if it becomes available before the timeout', () => {
140+
const channel = SyncMessagePort.createChannel();
141+
const port = new SyncMessagePort(channel.port1);
142+
143+
spawnWorker(
144+
`
145+
port.postMessage('ready');
146+
setTimeout(() => {
147+
port.postMessage('message');
148+
port.close();
149+
}, 100);
150+
`,
151+
channel.port2,
152+
);
153+
154+
expect(port.receiveMessage()).toEqual('ready');
155+
expect(port.receiveMessage({timeout: 200})).toEqual('message');
156+
});
157+
158+
it('throws an error if it times out before a value is available', () => {
159+
const channel = SyncMessagePort.createChannel();
160+
const port = new SyncMessagePort(channel.port1);
161+
expect(() => port.receiveMessage({timeout: 0})).toThrow(TimeoutException);
162+
});
163+
164+
it('returns timeoutValue if it times out before a value is available', () => {
165+
const channel = SyncMessagePort.createChannel();
166+
const port = new SyncMessagePort(channel.port1);
167+
expect(port.receiveMessage({timeout: 0, timeoutValue: 'timed out'})).toBe(
168+
'timed out',
169+
);
170+
});
171+
172+
it('throws an error if the channel closes before the request times out', () => {
173+
const channel = SyncMessagePort.createChannel();
174+
const port = new SyncMessagePort(channel.port1);
175+
176+
spawnWorker(
177+
`
178+
port.postMessage('ready');
179+
setTimeout(() => {
180+
port.close();
181+
}, 100);
182+
`,
183+
channel.port2,
184+
);
185+
186+
expect(port.receiveMessage()).toEqual('ready');
187+
// timeoutValue shouldn't take precedence over this error
188+
expect(() =>
189+
port.receiveMessage({timeout: 10000, timeoutValue: 'timed out'}),
190+
).toThrow();
191+
});
192+
});
193+
130194
describe('with an asynchronous listener', () => {
131195
it('receives a message sent before listening', async () => {
132196
const channel = SyncMessagePort.createChannel();

lib/index.ts

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
MessageChannel,
99
MessagePort,
1010
TransferListItem,
11+
Worker,
1112
receiveMessageOnPort,
1213
} from 'worker_threads';
1314

@@ -36,6 +37,33 @@ enum BufferState {
3637
Closed = 0b10,
3738
}
3839

40+
/**
41+
* Options that can be passed to {@link SyncMessagePort.receiveMessage}.
42+
*/
43+
export interface ReceiveMessageOptions {
44+
/**
45+
* The time (in milliseconds) to wait for a message before returning {@link
46+
* timeoutValue} (if set) or throwing a [TimeoutException] otherwise.
47+
*/
48+
timeout?: number;
49+
50+
/**
51+
* If a message isn't received within {@link timeout} milliseconds, this value
52+
* is returned. Ignored if {@link timeout} is not set.
53+
*/
54+
timeoutValue?: unknown;
55+
}
56+
57+
/**
58+
* An exception thrown by {@link SyncMessagePort.receiveMessage} if a message
59+
* isn't received within {@link ReceivedMessageOptions.timeout} milliseconds.
60+
*/
61+
export class TimeoutException extends Error {
62+
constructor(message: string) {
63+
super(message);
64+
}
65+
}
66+
3967
/**
4068
* A communication port that can receive messages synchronously from another
4169
* `SyncMessagePort`.
@@ -132,7 +160,6 @@ export class SyncMessagePort extends EventEmitter {
132160
}
133161

134162
// TODO(nex3):
135-
// * Add a timeout option to `receiveMessage()`
136163
// * Add an option to `receiveMessage()` to return a special value if the
137164
// channel is closed.
138165

@@ -143,7 +170,7 @@ export class SyncMessagePort extends EventEmitter {
143170
* Throws an error if the channel is closed, including if it closes while this
144171
* is waiting for a message.
145172
*/
146-
receiveMessage(): unknown {
173+
receiveMessage(options?: ReceiveMessageOptions): unknown {
147174
if (this.listenerCount('message')) {
148175
throw new Error(
149176
'SyncMessageChannel.receiveMessage() may not be called while there ' +
@@ -156,14 +183,13 @@ export class SyncMessagePort extends EventEmitter {
156183
// `receiveMessageOnPort` and the call to `Atomics.wait()`, we won't
157184
// overwrite it. Use `Atomics.compareExchange` so that we don't overwrite
158185
// the "closed" state.
159-
if (
160-
Atomics.compareExchange(
161-
this.buffer,
162-
0,
163-
BufferState.MessageSent,
164-
BufferState.AwaitingMessage,
165-
) === BufferState.Closed
166-
) {
186+
const previousState = Atomics.compareExchange(
187+
this.buffer,
188+
0,
189+
BufferState.MessageSent,
190+
BufferState.AwaitingMessage,
191+
);
192+
if (previousState === BufferState.Closed) {
167193
throw new Error("The SyncMessagePort's channel is closed.");
168194
}
169195

@@ -173,10 +199,20 @@ export class SyncMessagePort extends EventEmitter {
173199
// If there's no new message, wait for the other port to flip the "new
174200
// message" indicator to 1. If it's been set to 1 since we stored 0, this
175201
// will terminate immediately.
176-
Atomics.wait(this.buffer, 0, BufferState.AwaitingMessage);
202+
const result = Atomics.wait(
203+
this.buffer,
204+
0,
205+
BufferState.AwaitingMessage,
206+
options?.timeout,
207+
);
177208
message = receiveMessageOnPort(this.port);
178209
if (message) return message.message;
179210

211+
if (result === 'timed-out') {
212+
if ('timeoutValue' in options!) return options.timeoutValue;
213+
throw new TimeoutException('SyncMessagePort.receiveMessage() timed out.');
214+
}
215+
180216
// Update the state to 0b10 after the last message is consumed.
181217
const oldState = Atomics.and(this.buffer, 0, BufferState.Closed);
182218
// Assert the old state was either 0b10 or 0b11.
@@ -187,6 +223,7 @@ export class SyncMessagePort extends EventEmitter {
187223
/** See `MessagePort.close()`. */
188224
close(): void {
189225
Atomics.or(this.buffer, 0, BufferState.Closed);
226+
Atomics.notify(this.buffer, 0);
190227
this.port.close();
191228
}
192229
}

0 commit comments

Comments
 (0)