Skip to content

Commit a5042e7

Browse files
authored
feat: reject promises on connection loss (#706)
Closes #686 This PR does the following: - When a remote connection is lost and retries are exhausted, all promises for which that remote is the decider are now rejected, including promises from queued messages that were never delivered. - Exposes `maxRetryAttempts` and `maxQueue` as configurable options for remote communications initialization, converts the `relays` parameter to an `options` object for future extensibility. - Fixes a bug where `maxRetryAttempts` was not being respected due to `resetBackoff` being called too early in the reconnection flow. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Reject promises when remote connections are abandoned; switch remote comms init to options (relays/maxRetryAttempts/maxQueue) and tighten reconnection/backoff behavior across stack. > > - **Remote Comms (ocap-kernel)**: > - Add `remoteGiveUp` RPC (`kernel-remote`) and propagate from server to kernel; `RemoteManager` rejects pending URL redemptions and all promises decided by the remote on give-up. > - Change `initRemoteComms` to accept `RemoteCommsOptions` (`relays`, `maxRetryAttempts`, `maxQueue`); persist relays; update `PlatformServices`/types accordingly. > - Fix reconnection flow: respect `maxRetryAttempts`, reset backoff only after successful flush, queue flushing merges hints, intentional close handling, wake detector backoff reset; `ConnectionFactory` accepts `maxRetryAttempts`. > - Minor: `getPromisesByDecider` now uses `EndpointId`. > - **Platform Services**: > - Browser/Node servers and clients updated to new options API; add handling for `remoteGiveUp` (client callback, server forwards via RPC). > - Kernel calls `remoteManager.cleanup()` on stop; worker init uses `kernel.initRemoteComms({ relays })`. > - **Tests/Coverage**: > - Extensive unit/e2e tests added/updated for give-up, options, reconnection, queues, hints; coverage thresholds adjusted. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 232de02. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent ca6749a commit a5042e7

36 files changed

+2634
-447
lines changed

packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts

Lines changed: 235 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,72 @@ describe('PlatformServicesClient', () => {
108108
await expect(resultP).rejects.toThrow('foo');
109109
});
110110

111-
it('calls logger.debug when receiving an unexpected reply', async () => {
112-
const debugSpy = vi.spyOn(clientLogger, 'debug');
113-
const unexpectedReply = makeNullReply('m9');
111+
it('calls logger.error when receiving response with non-string id', async () => {
112+
const errorSpy = vi.spyOn(clientLogger, 'error');
113+
const unexpectedReply = new MessageEvent('message', {
114+
data: {
115+
id: 123,
116+
result: null,
117+
jsonrpc: '2.0',
118+
},
119+
});
114120

115121
await stream.receiveInput(unexpectedReply);
116122
await delay(10);
117123

118-
expect(debugSpy).toHaveBeenCalledOnce();
119-
expect(debugSpy).toHaveBeenLastCalledWith(
120-
'Received response with unexpected id "m9".',
124+
expect(errorSpy).toHaveBeenCalledOnce();
125+
expect(errorSpy).toHaveBeenLastCalledWith(
126+
'Received response with unexpected id:',
127+
expect.any(String),
128+
);
129+
});
130+
131+
it('calls logger.error when receiving message with unexpected port', async () => {
132+
const errorSpy = vi.spyOn(clientLogger, 'error');
133+
const unexpectedPortReply = makeMessageEvent(
134+
'm9',
135+
{ result: null },
136+
new MessageChannel().port1,
137+
);
138+
139+
await stream.receiveInput(unexpectedPortReply);
140+
await delay(10);
141+
142+
expect(errorSpy).toHaveBeenCalledOnce();
143+
expect(errorSpy).toHaveBeenLastCalledWith(
144+
'Received message with unexpected port:',
145+
expect.any(String),
121146
);
122147
});
123148

149+
it('handles request with unknown method by sending error response', async () => {
150+
const outputs: MessageEventWithPayload[] = [];
151+
const newStream = await TestDuplexStream.make((message) => {
152+
outputs.push(message as unknown as MessageEventWithPayload);
153+
});
154+
// eslint-disable-next-line no-new -- test setup
155+
new PlatformServicesClient(
156+
newStream as unknown as PlatformServicesClientStream,
157+
);
158+
159+
await newStream.receiveInput(
160+
new MessageEvent('message', {
161+
data: {
162+
id: 'm1',
163+
jsonrpc: '2.0',
164+
method: 'unknownMethod',
165+
params: {},
166+
},
167+
}),
168+
);
169+
await delay(10);
170+
171+
const errorResponse = outputs.find(
172+
(message) => message.payload?.id === 'm1' && 'error' in message.payload,
173+
);
174+
expect(errorResponse).toBeDefined();
175+
});
176+
124177
describe('launch', () => {
125178
it('resolves with a duplex stream when receiving a launch reply', async () => {
126179
const vatId: VatId = 'v0';
@@ -191,7 +244,50 @@ describe('PlatformServicesClient', () => {
191244
const remoteHandler = vi.fn(async () => 'response');
192245
const result = client.initializeRemoteComms(
193246
'0xabcd',
194-
['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'],
247+
{ relays: ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'] },
248+
remoteHandler,
249+
);
250+
await stream.receiveInput(makeNullReply('m1'));
251+
expect(await result).toBeUndefined();
252+
});
253+
254+
it('sends initializeRemoteComms with all options and resolves', async () => {
255+
const remoteHandler = vi.fn(async () => 'response');
256+
const result = client.initializeRemoteComms(
257+
'0xabcd',
258+
{
259+
relays: ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'],
260+
maxRetryAttempts: 5,
261+
maxQueue: 100,
262+
},
263+
remoteHandler,
264+
);
265+
await stream.receiveInput(makeNullReply('m1'));
266+
expect(await result).toBeUndefined();
267+
});
268+
269+
it('sends initializeRemoteComms with onRemoteGiveUp callback', async () => {
270+
const remoteHandler = vi.fn(async () => 'response');
271+
const giveUpHandler = vi.fn();
272+
const result = client.initializeRemoteComms(
273+
'0xabcd',
274+
{ relays: ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'] },
275+
remoteHandler,
276+
giveUpHandler,
277+
);
278+
await stream.receiveInput(makeNullReply('m1'));
279+
expect(await result).toBeUndefined();
280+
});
281+
282+
it('filters undefined values from options', async () => {
283+
const remoteHandler = vi.fn(async () => 'response');
284+
const result = client.initializeRemoteComms(
285+
'0xabcd',
286+
{
287+
relays: ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer'],
288+
maxRetryAttempts: undefined,
289+
maxQueue: undefined,
290+
},
195291
remoteHandler,
196292
);
197293
await stream.receiveInput(makeNullReply('m1'));
@@ -283,6 +379,138 @@ describe('PlatformServicesClient', () => {
283379
);
284380
expect(errorResponse).toBeDefined();
285381
});
382+
383+
it('calls handler and returns response when handler is set', async () => {
384+
const outputs: MessageEventWithPayload[] = [];
385+
const testStream = await TestDuplexStream.make((message) => {
386+
outputs.push(message as unknown as MessageEventWithPayload);
387+
});
388+
const testClient = new PlatformServicesClient(
389+
testStream as unknown as PlatformServicesClientStream,
390+
clientLogger,
391+
);
392+
// Wait for client to be ready
393+
await delay(10);
394+
395+
const remoteHandler = vi.fn(async () => 'response-message');
396+
const initP = testClient.initializeRemoteComms(
397+
'0xabcd',
398+
{},
399+
remoteHandler,
400+
);
401+
await testStream.receiveInput(makeNullReply('m1'));
402+
await initP;
403+
404+
await testStream.receiveInput(
405+
new MessageEvent('message', {
406+
data: {
407+
id: 'm2',
408+
jsonrpc: '2.0',
409+
method: 'remoteDeliver',
410+
params: {
411+
from: 'peer-123',
412+
message: 'test-message',
413+
},
414+
},
415+
}),
416+
);
417+
await delay(50);
418+
419+
expect(remoteHandler).toHaveBeenCalledOnce();
420+
expect(remoteHandler).toHaveBeenCalledWith(
421+
'peer-123',
422+
'test-message',
423+
);
424+
425+
const successResponse = outputs.find(
426+
(message) =>
427+
message.payload?.id === 'm2' &&
428+
'result' in message.payload &&
429+
message.payload.result === 'response-message',
430+
);
431+
expect(successResponse).toBeDefined();
432+
});
433+
});
434+
435+
describe('remoteGiveUp', () => {
436+
it('calls handler when handler is set', async () => {
437+
const outputs: MessageEventWithPayload[] = [];
438+
const testStream = await TestDuplexStream.make((message) => {
439+
outputs.push(message as unknown as MessageEventWithPayload);
440+
});
441+
const testClient = new PlatformServicesClient(
442+
testStream as unknown as PlatformServicesClientStream,
443+
clientLogger,
444+
);
445+
// Wait for client to be ready
446+
await delay(10);
447+
448+
const remoteHandler = vi.fn(async () => 'response');
449+
const giveUpHandler = vi.fn();
450+
const initP = testClient.initializeRemoteComms(
451+
'0xabcd',
452+
{},
453+
remoteHandler,
454+
giveUpHandler,
455+
);
456+
await testStream.receiveInput(makeNullReply('m1'));
457+
await initP;
458+
459+
await testStream.receiveInput(
460+
new MessageEvent('message', {
461+
data: {
462+
id: 'm2',
463+
jsonrpc: '2.0',
464+
method: 'remoteGiveUp',
465+
params: { peerId: 'peer-456' },
466+
},
467+
}),
468+
);
469+
await delay(50);
470+
471+
expect(giveUpHandler).toHaveBeenCalledOnce();
472+
expect(giveUpHandler).toHaveBeenCalledWith('peer-456');
473+
474+
const successResponse = outputs.find(
475+
(message) =>
476+
message.payload?.id === 'm2' &&
477+
'result' in message.payload &&
478+
message.payload.result === null,
479+
);
480+
expect(successResponse).toBeDefined();
481+
});
482+
483+
it('does not throw when handler is not set', async () => {
484+
const outputs: MessageEventWithPayload[] = [];
485+
const newStream = await TestDuplexStream.make((message) => {
486+
outputs.push(message as unknown as MessageEventWithPayload);
487+
});
488+
// eslint-disable-next-line no-new -- test setup
489+
new PlatformServicesClient(
490+
newStream as unknown as PlatformServicesClientStream,
491+
);
492+
493+
await newStream.receiveInput(
494+
new MessageEvent('message', {
495+
data: {
496+
id: 'm1',
497+
jsonrpc: '2.0',
498+
method: 'remoteGiveUp',
499+
params: { peerId: 'peer-789' },
500+
},
501+
}),
502+
);
503+
await delay(10);
504+
505+
// Should have sent success response with null result
506+
const successResponse = outputs.find(
507+
(message) =>
508+
message.payload?.id === 'm1' &&
509+
'result' in message.payload &&
510+
message.payload.result === null,
511+
);
512+
expect(successResponse).toBeDefined();
513+
});
286514
});
287515
});
288516
});

packages/kernel-browser-runtime/src/PlatformServicesClient.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
RemoteMessageHandler,
88
VatId,
99
VatConfig,
10+
RemoteCommsOptions,
1011
} from '@metamask/ocap-kernel';
1112
import {
1213
platformServicesMethodSpecs,
@@ -47,6 +48,8 @@ export class PlatformServicesClient implements PlatformServices {
4748

4849
#remoteMessageHandler: RemoteMessageHandler | undefined = undefined;
4950

51+
#remoteGiveUpHandler: ((peerId: string) => void) | undefined = undefined;
52+
5053
/**
5154
* **ATTN:** Prefer {@link PlatformServicesClient.make} over constructing
5255
* this class directly.
@@ -80,6 +83,7 @@ export class PlatformServicesClient implements PlatformServices {
8083
);
8184
this.#rpcServer = new RpcService(kernelRemoteHandlers, {
8285
remoteDeliver: this.#remoteDeliver.bind(this),
86+
remoteGiveUp: this.#remoteGiveUp.bind(this),
8387
});
8488

8589
// Start draining messages immediately after construction
@@ -171,21 +175,29 @@ export class PlatformServicesClient implements PlatformServices {
171175
* Initialize network communications.
172176
*
173177
* @param keySeed - The seed for generating this kernel's secret key.
174-
* @param knownRelays - Array of the peerIDs of relay nodes that can be used to listen for incoming
178+
* @param options - Options for remote communications initialization.
179+
* @param options.relays - Array of the peerIDs of relay nodes that can be used to listen for incoming
175180
* connections from other kernels.
181+
* @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default).
182+
* @param options.maxQueue - Maximum number of messages to queue per peer while reconnecting (default: 200).
176183
* @param remoteMessageHandler - A handler function to receive remote messages.
184+
* @param onRemoteGiveUp - Optional callback to be called when we give up on a remote.
177185
* @returns A promise that resolves once network access has been established
178186
* or rejects if there is some problem doing so.
179187
*/
180188
async initializeRemoteComms(
181189
keySeed: string,
182-
knownRelays: string[],
190+
options: RemoteCommsOptions,
183191
remoteMessageHandler: (from: string, message: string) => Promise<string>,
192+
onRemoteGiveUp?: (peerId: string) => void,
184193
): Promise<void> {
185194
this.#remoteMessageHandler = remoteMessageHandler;
195+
this.#remoteGiveUpHandler = onRemoteGiveUp;
186196
await this.#rpcClient.call('initializeRemoteComms', {
187197
keySeed,
188-
knownRelays,
198+
...Object.fromEntries(
199+
Object.entries(options).filter(([, value]) => value !== undefined),
200+
),
189201
});
190202
}
191203

@@ -252,6 +264,19 @@ export class PlatformServicesClient implements PlatformServices {
252264
throw Error(`remote message handler not set`);
253265
}
254266

267+
/**
268+
* Handle a remote give up notification from the server.
269+
*
270+
* @param peerId - The peer ID of the remote we're giving up on.
271+
* @returns A promise that resolves when handling is complete.
272+
*/
273+
async #remoteGiveUp(peerId: string): Promise<null> {
274+
if (this.#remoteGiveUpHandler) {
275+
this.#remoteGiveUpHandler(peerId);
276+
}
277+
return null;
278+
}
279+
255280
/**
256281
* Send a message to the server.
257282
*

0 commit comments

Comments
 (0)