Skip to content

Commit 2148c20

Browse files
authored
refactor: Support JSON-RPC notifications in addition to requests (#496)
Closes #368 Adds support for JSON-RPC notifications in addition to JSON-RPC requests. Adds a new method, `RpcClient.notify()`, for this purpose. Updates stream types to permit notifications in additions to request. We do not make use of this capability for any method right now, but may do so for #488.
1 parent 4c13b3c commit 2148c20

File tree

20 files changed

+393
-188
lines changed

20 files changed

+393
-188
lines changed

packages/extension/src/kernel-integration/VatWorkerClient.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
import { isJsonRpcResponse } from '@metamask/utils';
2-
import type {
3-
JsonRpcId,
4-
JsonRpcRequest,
5-
JsonRpcResponse,
6-
} from '@metamask/utils';
2+
import type { JsonRpcId, JsonRpcResponse } from '@metamask/utils';
73
import type { VatWorkerManager, VatId, VatConfig } from '@ocap/kernel';
84
import { vatWorkerServiceMethodSpecs } from '@ocap/kernel/rpc';
95
import { RpcClient } from '@ocap/rpc-methods';
@@ -16,7 +12,7 @@ import type {
1612
PostMessageEnvelope,
1713
PostMessageTarget,
1814
} from '@ocap/streams/browser';
19-
import type { JsonRpcMessage, Logger } from '@ocap/utils';
15+
import type { JsonRpcCall, JsonRpcMessage, Logger } from '@ocap/utils';
2016
import { isJsonRpcMessage, makeLogger, stringify } from '@ocap/utils';
2117

2218
// Appears in the docs.
@@ -25,7 +21,7 @@ import type { ExtensionVatWorkerService } from './VatWorkerServer.ts';
2521

2622
export type VatWorkerClientStream = PostMessageDuplexStream<
2723
MessageEvent<JsonRpcResponse>,
28-
PostMessageEnvelope<JsonRpcRequest>
24+
PostMessageEnvelope<JsonRpcCall>
2925
>;
3026

3127
export class ExtensionVatWorkerClient implements VatWorkerManager {
@@ -61,8 +57,10 @@ export class ExtensionVatWorkerClient implements VatWorkerManager {
6157
this.#rpcClient = new RpcClient(
6258
vatWorkerServiceMethodSpecs,
6359
async (request) => {
64-
if (request.method === 'launch') {
65-
this.#portMap.set(request.id, undefined);
60+
if ('id' in request) {
61+
if (request.method === 'launch') {
62+
this.#portMap.set(request.id, undefined);
63+
}
6664
}
6765
await this.#stream.write({ payload: request, transfer: [] });
6866
},

packages/extension/src/kernel-integration/kernel-worker.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { JsonRpcEngine } from '@metamask/json-rpc-engine';
22
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
3-
import { isJsonRpcRequest } from '@metamask/utils';
43
import type { ClusterConfig } from '@ocap/kernel';
54
import { ClusterConfigStruct, Kernel } from '@ocap/kernel';
65
import { makeSQLKernelDatabase } from '@ocap/store/sqlite/wasm';
@@ -9,7 +8,8 @@ import {
98
MessagePortDuplexStream,
109
receiveMessagePort,
1110
} from '@ocap/streams/browser';
12-
import { fetchValidatedJson, Logger } from '@ocap/utils';
11+
import { fetchValidatedJson, isJsonRpcCall, Logger } from '@ocap/utils';
12+
import type { JsonRpcCall } from '@ocap/utils';
1313

1414
import { makeLoggingMiddleware } from './middleware/logging.ts';
1515
import { createPanelMessageMiddleware } from './middleware/panel-message.ts';
@@ -39,9 +39,9 @@ async function main(): Promise<void> {
3939
);
4040

4141
const kernelStream = await MessagePortDuplexStream.make<
42-
JsonRpcRequest,
42+
JsonRpcCall,
4343
JsonRpcResponse
44-
>(port, isJsonRpcRequest);
44+
>(port, isJsonRpcCall);
4545

4646
// Initialize kernel dependencies
4747
const vatWorkerClient = ExtensionVatWorkerClient.make(
@@ -63,7 +63,11 @@ async function main(): Promise<void> {
6363
const kernelEngine = new JsonRpcEngine();
6464
kernelEngine.push(makeLoggingMiddleware(logger.subLogger('kernel-command')));
6565
kernelEngine.push(createPanelMessageMiddleware(kernel, kernelDatabase));
66-
receiveUiConnections(async (request) => kernelEngine.handle(request), logger);
66+
// JsonRpcEngine type error: does not handle JSON-RPC notifications
67+
receiveUiConnections(
68+
async (request) => kernelEngine.handle(request as JsonRpcRequest),
69+
logger,
70+
);
6771
const launchDefaultSubcluster = firstTime || ALWAYS_RESET_STORAGE;
6872

6973
const defaultSubcluster = await fetchValidatedJson<ClusterConfig>(

packages/extension/src/kernel-integration/ui-connections.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
1+
import type { JsonRpcResponse } from '@metamask/utils';
22
import type { PostMessageTarget } from '@ocap/streams/browser';
33
import { delay } from '@ocap/test-utils';
44
import { TestDuplexStream } from '@ocap/test-utils/streams';
5-
import type { Logger } from '@ocap/utils';
5+
import type { JsonRpcCall, Logger } from '@ocap/utils';
66
import { describe, it, expect, vi, beforeEach } from 'vitest';
77

88
import {
@@ -161,7 +161,7 @@ describe('ui-connections', () => {
161161
const logger = makeMockLogger();
162162

163163
const mockHandleMessage = vi.fn(
164-
async (_request: JsonRpcRequest): Promise<JsonRpcResponse> => ({
164+
async (_request: JsonRpcCall): Promise<JsonRpcResponse> => ({
165165
id: 'foo',
166166
jsonrpc: '2.0' as const,
167167
result: { vats: [], clusterConfig },

packages/extension/src/kernel-integration/ui-connections.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { isJsonRpcRequest, isJsonRpcResponse } from '@metamask/utils';
2-
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
2+
import type { JsonRpcResponse } from '@metamask/utils';
33
import { PostMessageDuplexStream } from '@ocap/streams/browser';
44
import { stringify } from '@ocap/utils';
5-
import type { Logger } from '@ocap/utils';
5+
import type { JsonRpcCall, Logger } from '@ocap/utils';
66
import { nanoid } from 'nanoid';
77

88
import { isUiControlCommand } from './ui-control-command.ts';
@@ -11,18 +11,16 @@ import type { UiControlCommand } from './ui-control-command.ts';
1111
export const UI_CONTROL_CHANNEL_NAME = 'ui-control';
1212

1313
export type KernelControlStream = PostMessageDuplexStream<
14-
JsonRpcRequest,
14+
JsonRpcCall,
1515
JsonRpcResponse
1616
>;
1717

1818
export type KernelControlReplyStream = PostMessageDuplexStream<
1919
JsonRpcResponse,
20-
JsonRpcRequest
20+
JsonRpcCall
2121
>;
2222

23-
type HandleInstanceMessage = (
24-
request: JsonRpcRequest,
25-
) => Promise<JsonRpcResponse>;
23+
type HandleInstanceMessage = (request: JsonRpcCall) => Promise<JsonRpcResponse>;
2624

2725
/**
2826
* Establishes a connection between a UI instance and the kernel. Should be called
@@ -45,7 +43,7 @@ export const establishKernelConnection = async (
4543

4644
const kernelStream = await PostMessageDuplexStream.make<
4745
JsonRpcResponse,
48-
JsonRpcRequest
46+
JsonRpcCall
4947
>({
5048
validateInput: isJsonRpcResponse,
5149
messageTarget: instanceChannel,

packages/extension/src/offscreen.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
import { isJsonRpcRequest, isJsonRpcResponse } from '@metamask/utils';
2-
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
1+
import { isJsonRpcResponse } from '@metamask/utils';
2+
import type { JsonRpcResponse } from '@metamask/utils';
33
import type { DuplexStream } from '@ocap/streams';
44
import {
55
initializeMessageChannel,
66
ChromeRuntimeDuplexStream,
77
MessagePortDuplexStream,
88
} from '@ocap/streams/browser';
99
import type { PostMessageTarget } from '@ocap/streams/browser';
10-
import { delay, Logger } from '@ocap/utils';
10+
import { delay, isJsonRpcCall, Logger } from '@ocap/utils';
11+
import type { JsonRpcCall } from '@ocap/utils';
1112

1213
import { makeIframeVatWorker } from './kernel-integration/iframe-vat-worker.ts';
1314
import { ExtensionVatWorkerService } from './kernel-integration/VatWorkerServer.ts';
@@ -25,9 +26,9 @@ async function main(): Promise<void> {
2526

2627
// Create stream for messages from the background script
2728
const backgroundStream = await ChromeRuntimeDuplexStream.make<
28-
JsonRpcRequest,
29+
JsonRpcCall,
2930
JsonRpcResponse
30-
>(chrome.runtime, 'offscreen', 'background', isJsonRpcRequest);
31+
>(chrome.runtime, 'offscreen', 'background', isJsonRpcCall);
3132

3233
const { kernelStream, vatWorkerService } = await makeKernelWorker();
3334

@@ -45,7 +46,7 @@ async function main(): Promise<void> {
4546
* @returns The message port stream for worker communication
4647
*/
4748
async function makeKernelWorker(): Promise<{
48-
kernelStream: DuplexStream<JsonRpcResponse, JsonRpcRequest>;
49+
kernelStream: DuplexStream<JsonRpcResponse, JsonRpcCall>;
4950
vatWorkerService: ExtensionVatWorkerService;
5051
}> {
5152
const worker = new Worker('kernel-worker.js', { type: 'module' });
@@ -56,7 +57,7 @@ async function makeKernelWorker(): Promise<{
5657

5758
const kernelStream = await MessagePortDuplexStream.make<
5859
JsonRpcResponse,
59-
JsonRpcRequest
60+
JsonRpcCall
6061
>(port, isJsonRpcResponse);
6162

6263
const vatWorkerService = ExtensionVatWorkerService.make(

packages/kernel/src/Kernel.ts

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { CapData } from '@endo/marshal';
22
import { serializeError } from '@metamask/rpc-errors';
3-
import type { JsonRpcRequest, JsonRpcResponse } from '@metamask/utils';
3+
import { hasProperty } from '@metamask/utils';
4+
import type { JsonRpcResponse } from '@metamask/utils';
45
import {
56
StreamReadError,
67
VatAlreadyExistsError,
@@ -11,6 +12,7 @@ import type { ExtractParams, ExtractResult } from '@ocap/rpc-methods';
1112
import type { KernelDatabase } from '@ocap/store';
1213
import type { DuplexStream } from '@ocap/streams';
1314
import { Logger } from '@ocap/utils';
15+
import type { JsonRpcCall } from '@ocap/utils';
1416

1517
import { KernelQueue } from './KernelQueue.ts';
1618
import { KernelRouter } from './KernelRouter.ts';
@@ -33,7 +35,7 @@ import { VatHandle } from './VatHandle.ts';
3335

3436
export class Kernel {
3537
/** Command channel from the controlling console/browser extension/test driver */
36-
readonly #commandStream: DuplexStream<JsonRpcRequest, JsonRpcResponse>;
38+
readonly #commandStream: DuplexStream<JsonRpcCall, JsonRpcResponse>;
3739

3840
readonly #rpcService: RpcService<typeof kernelHandlers>;
3941

@@ -70,7 +72,7 @@ export class Kernel {
7072
*/
7173
// eslint-disable-next-line no-restricted-syntax
7274
private constructor(
73-
commandStream: DuplexStream<JsonRpcRequest, JsonRpcResponse>,
75+
commandStream: DuplexStream<JsonRpcCall, JsonRpcResponse>,
7476
vatWorkerService: VatWorkerManager,
7577
kernelDatabase: KernelDatabase,
7678
options: {
@@ -108,7 +110,7 @@ export class Kernel {
108110
* @returns A promise for the new kernel instance.
109111
*/
110112
static async make(
111-
commandStream: DuplexStream<JsonRpcRequest, JsonRpcResponse>,
113+
commandStream: DuplexStream<JsonRpcCall, JsonRpcResponse>,
112114
vatWorkerService: VatWorkerManager,
113115
kernelDatabase: KernelDatabase,
114116
options: {
@@ -155,25 +157,29 @@ export class Kernel {
155157
*
156158
* @param message - The message to handle.
157159
*/
158-
async #handleCommandMessage(message: JsonRpcRequest): Promise<void> {
160+
async #handleCommandMessage(message: JsonRpcCall): Promise<void> {
159161
try {
160162
this.#rpcService.assertHasMethod(message.method);
161163
const result = await this.#rpcService.execute(
162164
message.method,
163165
message.params,
164166
);
165-
await this.#commandStream.write({
166-
id: message.id,
167-
jsonrpc: '2.0',
168-
result,
169-
});
167+
if (hasProperty(message, 'id') && typeof message.id === 'string') {
168+
await this.#commandStream.write({
169+
id: message.id,
170+
jsonrpc: '2.0',
171+
result,
172+
});
173+
}
170174
} catch (error) {
171175
this.#logger.error('Error executing command', error);
172-
await this.#commandStream.write({
173-
id: message.id,
174-
jsonrpc: '2.0',
175-
error: serializeError(error),
176-
});
176+
if (hasProperty(message, 'id') && typeof message.id === 'string') {
177+
await this.#commandStream.write({
178+
id: message.id,
179+
jsonrpc: '2.0',
180+
error: serializeError(error),
181+
});
182+
}
177183
}
178184
}
179185

packages/kernel/src/rpc/kernel/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1-
import type { MethodRequest } from '@ocap/rpc-methods';
1+
import type {
2+
HandlerRecord,
3+
MethodRequest,
4+
MethodSpecRecord,
5+
} from '@ocap/rpc-methods';
26

37
import { pingHandler, pingSpec } from '../vat/ping.ts';
48

59
export const kernelHandlers = {
610
ping: pingHandler,
7-
} as const;
11+
} as HandlerRecord<typeof pingHandler>;
812

913
export const kernelMethodSpecs = {
1014
ping: pingSpec,
11-
} as const;
15+
} as MethodSpecRecord<typeof pingSpec>;
1216

1317
type Handlers = (typeof kernelHandlers)[keyof typeof kernelHandlers];
1418

packages/kernel/src/rpc/vat-syscall/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
import type { MethodSpecRecord, HandlerRecord } from '@ocap/rpc-methods';
2+
13
import { vatSyscallSpec, vatSyscallHandler } from './vat-syscall.ts';
24

35
export const vatSyscallHandlers = {
46
syscall: vatSyscallHandler,
5-
} as const;
7+
} as HandlerRecord<typeof vatSyscallHandler>;
68

79
export const vatSyscallMethodSpecs = {
810
syscall: vatSyscallSpec,
9-
} as const;
11+
} as MethodSpecRecord<typeof vatSyscallSpec>;
1012

1113
type Handlers = (typeof vatSyscallHandlers)[keyof typeof vatSyscallHandlers];
1214

packages/rpc-methods/src/RpcClient.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,33 @@ describe('RpcClient', () => {
6969
});
7070
});
7171

72+
describe('notify', () => {
73+
it('should call a notification method', async () => {
74+
const sendMessage = vi.fn(async () => Promise.resolve());
75+
const client = new RpcClient(getMethods(), sendMessage, 'test');
76+
await client.notify('method3', ['test']);
77+
expect(sendMessage).toHaveBeenCalledWith({
78+
jsonrpc: jsonrpc2,
79+
method: 'method3',
80+
params: ['test'],
81+
});
82+
});
83+
84+
it('should log an error if the message fails to send', async () => {
85+
const logger = makeLogger('[test]');
86+
const sendMessage = vi.fn(async () =>
87+
Promise.reject(new Error('test error')),
88+
);
89+
const client = new RpcClient(getMethods(), sendMessage, 'test', logger);
90+
const logError = vi.spyOn(logger, 'error');
91+
await client.notify('method3', ['test']);
92+
expect(logError).toHaveBeenCalledWith(
93+
'Failed to send notification',
94+
new Error('test error'),
95+
);
96+
});
97+
});
98+
7299
describe('callAndGetId', () => {
73100
it('should call a method and return the id', async () => {
74101
const client = new RpcClient(getMethods(), vi.fn(), 'test');

0 commit comments

Comments
 (0)