Skip to content

Commit 2b7efc8

Browse files
committed
feat: Add timeout handling for remote message sends and URL redemptions
1 parent 2e1bf7e commit 2b7efc8

File tree

7 files changed

+224
-9
lines changed

7 files changed

+224
-9
lines changed

packages/ocap-kernel/src/remotes/RemoteHandle.test.ts

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { VatOneResolution } from '@agoric/swingset-liveslots';
22
import type { Logger } from '@metamask/logger';
3-
import { describe, it, expect, vi, beforeEach } from 'vitest';
3+
import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils';
4+
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
45

56
import type { KernelQueue } from '../KernelQueue.ts';
67
import { RemoteHandle } from './RemoteHandle.ts';
@@ -626,4 +627,119 @@ describe('RemoteHandle', () => {
626627
// Verify they resolved independently (different values)
627628
expect(kref1).not.toBe(kref2);
628629
});
630+
631+
describe('redeemOcapURL timeout', () => {
632+
afterEach(() => {
633+
vi.restoreAllMocks();
634+
});
635+
636+
it('sets up 30-second timeout using AbortSignal.timeout', async () => {
637+
const remote = makeRemote();
638+
const mockOcapURL = 'ocap:test@peer';
639+
640+
let mockSignal: ReturnType<typeof makeAbortSignalMock> | undefined;
641+
vi.spyOn(AbortSignal, 'timeout').mockImplementation((ms: number) => {
642+
mockSignal = makeAbortSignalMock(ms);
643+
return mockSignal;
644+
});
645+
646+
const urlPromise = remote.redeemOcapURL(mockOcapURL);
647+
648+
// Verify AbortSignal.timeout was called with 30 seconds
649+
expect(AbortSignal.timeout).toHaveBeenCalledWith(30_000);
650+
expect(mockSignal?.timeoutMs).toBe(30_000);
651+
652+
// Resolve the redemption to avoid hanging
653+
const sendCall = vi.mocked(mockRemoteComms.sendRemoteMessage).mock
654+
.calls[0];
655+
const sentMessage = JSON.parse(sendCall?.[1] as string);
656+
const replyKey = sentMessage.params[1] as string;
657+
658+
await remote.handleRemoteMessage(
659+
JSON.stringify({
660+
method: 'redeemURLReply',
661+
params: [true, replyKey, 'ro+1'],
662+
}),
663+
);
664+
665+
await urlPromise;
666+
});
667+
668+
it('cleans up pending redemption when redemption succeeds before timeout', async () => {
669+
const remote = makeRemote();
670+
const mockOcapURL = 'ocap:test@peer';
671+
const mockURLResolutionRRef = 'ro+6';
672+
const mockURLResolutionKRef = 'ko1';
673+
const expectedReplyKey = '1';
674+
675+
let mockSignal: ReturnType<typeof makeAbortSignalMock> | undefined;
676+
vi.spyOn(AbortSignal, 'timeout').mockImplementation((ms: number) => {
677+
mockSignal = makeAbortSignalMock(ms);
678+
return mockSignal;
679+
});
680+
681+
const urlPromise = remote.redeemOcapURL(mockOcapURL);
682+
683+
// Send reply immediately (before timeout)
684+
const redeemURLReply = {
685+
method: 'redeemURLReply',
686+
params: [true, expectedReplyKey, mockURLResolutionRRef],
687+
};
688+
await remote.handleRemoteMessage(JSON.stringify(redeemURLReply));
689+
690+
const kref = await urlPromise;
691+
expect(kref).toBe(mockURLResolutionKRef);
692+
693+
// Verify timeout signal was not aborted
694+
expect(mockSignal?.aborted).toBe(false);
695+
696+
// Verify cleanup happened - trying to handle another reply with the same key should fail
697+
await expect(
698+
remote.handleRemoteMessage(JSON.stringify(redeemURLReply)),
699+
).rejects.toThrow(`unknown URL redemption reply key ${expectedReplyKey}`);
700+
});
701+
702+
it('cleans up pending redemption map entry on timeout', async () => {
703+
const remote = makeRemote();
704+
const mockOcapURL = 'ocap:test@peer';
705+
706+
let mockSignal: ReturnType<typeof makeAbortSignalMock> | undefined;
707+
vi.spyOn(AbortSignal, 'timeout').mockImplementation((ms: number) => {
708+
mockSignal = makeAbortSignalMock(ms);
709+
return mockSignal;
710+
});
711+
712+
// Start a redemption
713+
const urlPromise = remote.redeemOcapURL(mockOcapURL);
714+
715+
// Get the reply key that was used
716+
const sendCall = vi.mocked(mockRemoteComms.sendRemoteMessage).mock
717+
.calls[0];
718+
const sentMessage = JSON.parse(sendCall?.[1] as string);
719+
const replyKey = sentMessage.params[1] as string;
720+
721+
// Wait for the promise to be set up and event listener registered
722+
await new Promise<void>((resolve) => queueMicrotask(() => resolve()));
723+
724+
// Manually trigger the abort to simulate timeout
725+
mockSignal?.abort();
726+
727+
// Wait for the abort handler to execute
728+
await new Promise<void>((resolve) => queueMicrotask(() => resolve()));
729+
730+
// Verify the promise rejects
731+
await expect(urlPromise).rejects.toThrow(
732+
'URL redemption timed out after 30 seconds',
733+
);
734+
735+
// Verify cleanup happened - trying to handle a reply with the same key should fail
736+
const redeemURLReply = {
737+
method: 'redeemURLReply',
738+
params: [true, replyKey, 'ro+1'],
739+
};
740+
await expect(
741+
remote.handleRemoteMessage(JSON.stringify(redeemURLReply)),
742+
).rejects.toThrow(`unknown URL redemption reply key ${replyKey}`);
743+
});
744+
});
629745
});

packages/ocap-kernel/src/remotes/RemoteHandle.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -460,13 +460,34 @@ export class RemoteHandle implements EndpointHandle {
460460
const replyKey = `${this.#redemptionCounter}`;
461461
this.#redemptionCounter += 1;
462462
const { promise, resolve, reject } = makePromiseKit<string>();
463-
// XXX TODO: Probably these should have timeouts
464463
this.#pendingRedemptions.set(replyKey, [resolve, reject]);
465-
await this.#sendRemoteCommand({
466-
method: 'redeemURL',
467-
params: [url, replyKey],
464+
465+
// Set up timeout handling with AbortSignal
466+
const timeoutSignal = AbortSignal.timeout(30_000);
467+
const timeoutPromise = new Promise<never>((_resolve, _reject) => {
468+
timeoutSignal.addEventListener('abort', () => {
469+
// Clean up from pending redemptions map
470+
if (this.#pendingRedemptions.has(replyKey)) {
471+
this.#pendingRedemptions.delete(replyKey);
472+
}
473+
_reject(new Error('URL redemption timed out after 30 seconds'));
474+
});
468475
});
469-
return promise;
476+
477+
try {
478+
await this.#sendRemoteCommand({
479+
method: 'redeemURL',
480+
params: [url, replyKey],
481+
});
482+
// Wait for reply with timeout protection
483+
return await Promise.race([promise, timeoutPromise]);
484+
} catch (error) {
485+
// Clean up and remove from map if still pending
486+
if (this.#pendingRedemptions.has(replyKey)) {
487+
this.#pendingRedemptions.delete(replyKey);
488+
}
489+
throw error;
490+
}
470491
}
471492

472493
/**

packages/ocap-kernel/src/remotes/network.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { AbortError } from '@metamask/kernel-errors';
2+
import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils';
23
import {
34
describe,
45
expect,

packages/ocap-kernel/src/remotes/network.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,31 @@ export async function initNetwork(
100100
return queue;
101101
}
102102

103+
/**
104+
* Write a message to a channel stream with a timeout.
105+
*
106+
* @param channel - The channel to write to.
107+
* @param message - The message bytes to write.
108+
* @param timeoutMs - Timeout in milliseconds (default: 10 seconds).
109+
* @returns Promise that resolves when the write completes or rejects on timeout.
110+
* @throws Error if the write times out or fails.
111+
*/
112+
async function writeWithTimeout(
113+
channel: Channel,
114+
message: Uint8Array,
115+
timeoutMs = 10_000,
116+
): Promise<void> {
117+
const timeoutSignal = AbortSignal.timeout(timeoutMs);
118+
return Promise.race([
119+
channel.msgStream.write(message),
120+
new Promise<never>((_resolve, reject) => {
121+
timeoutSignal.addEventListener('abort', () => {
122+
reject(new Error(`Message send timed out after ${timeoutMs}ms`));
123+
});
124+
}),
125+
]);
126+
}
127+
103128
/**
104129
* Receive a message from a peer.
105130
*
@@ -315,7 +340,7 @@ export async function initNetwork(
315340
while ((queuedMsg = queue.dequeue()) !== undefined) {
316341
try {
317342
logger.log(`${peerId}:: send (queued) ${queuedMsg.message}`);
318-
await channel.msgStream.write(fromString(queuedMsg.message));
343+
await writeWithTimeout(channel, fromString(queuedMsg.message), 10_000);
319344
} catch (problem) {
320345
outputError(peerId, `sending queued message`, problem);
321346
// Preserve the failed message and all remaining messages
@@ -397,7 +422,7 @@ export async function initNetwork(
397422

398423
try {
399424
logger.log(`${targetPeerId}:: send ${message}`);
400-
await channel.msgStream.write(fromString(message));
425+
await writeWithTimeout(channel, fromString(message), 10_000);
401426
reconnectionManager.resetBackoff(targetPeerId);
402427
} catch (problem) {
403428
outputError(targetPeerId, `sending message`, problem);
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { vi } from 'vitest';
2+
3+
/**
4+
* Create a mock AbortSignal that can be manually aborted.
5+
*
6+
* @param timeoutMs - The timeout value (stored for verification).
7+
* @returns A mock AbortSignal.
8+
*/
9+
export function makeAbortSignalMock(timeoutMs: number): AbortSignal & {
10+
abort: () => void;
11+
timeoutMs: number;
12+
} {
13+
const handlers: (() => void)[] = [];
14+
let aborted = false;
15+
16+
const signal = {
17+
get aborted() {
18+
return aborted;
19+
},
20+
timeoutMs,
21+
addEventListener: vi.fn((event: string, handler: () => void) => {
22+
if (event === 'abort') {
23+
handlers.push(handler);
24+
}
25+
}),
26+
removeEventListener: vi.fn((event: string, handler: () => void) => {
27+
if (event === 'abort') {
28+
const index = handlers.indexOf(handler);
29+
if (index > -1) {
30+
handlers.splice(index, 1);
31+
}
32+
}
33+
}),
34+
dispatchEvent: vi.fn(),
35+
onabort: null,
36+
reason: undefined,
37+
throwIfAborted: vi.fn(),
38+
abort() {
39+
aborted = true;
40+
// Call all handlers synchronously
41+
for (const handler of handlers) {
42+
handler();
43+
}
44+
},
45+
} as AbortSignal & {
46+
abort: () => void;
47+
timeoutMs: number;
48+
};
49+
50+
return signal;
51+
}

packages/repo-tools/src/test-utils/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export { makePromiseKitMock } from './promise-kit.ts';
44
export { fetchMock } from './env/fetch-mock.ts';
55
export * from './env/mock-kernel.ts';
66
export { makeMockMessageTarget } from './postMessage.ts';
7+
export { makeAbortSignalMock } from './abort-signal.ts';
78

89
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
910
// @ts-ignore Intermittent browser/Node incompatibility

vitest.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export default defineConfig({
106106
'packages/kernel-platforms/**': {
107107
statements: 99.38,
108108
functions: 100,
109-
branches: 96.2,
109+
branches: 96.25,
110110
lines: 99.38,
111111
},
112112
'packages/kernel-rpc-methods/**': {

0 commit comments

Comments
 (0)