diff --git a/packages/extension/package.json b/packages/extension/package.json index b73c30132..31b4c219e 100644 --- a/packages/extension/package.json +++ b/packages/extension/package.json @@ -43,8 +43,8 @@ "dependencies": { "@endo/eventual-send": "^1.3.1", "@endo/marshal": "^1.6.4", - "@endo/promise-kit": "^1.1.10", "@metamask/json-rpc-engine": "^10.0.3", + "@metamask/rpc-errors": "^7.0.2", "@metamask/snaps-utils": "^9.1.0", "@metamask/superstruct": "^3.2.0", "@metamask/utils": "^11.4.0", diff --git a/packages/extension/src/kernel-integration/VatWorkerClient.test.ts b/packages/extension/src/kernel-integration/VatWorkerClient.test.ts index f56a24ac3..5fec4bd2b 100644 --- a/packages/extension/src/kernel-integration/VatWorkerClient.test.ts +++ b/packages/extension/src/kernel-integration/VatWorkerClient.test.ts @@ -1,9 +1,10 @@ -import type { VatId, VatWorkerServiceReply, VatConfig } from '@ocap/kernel'; -import { VatWorkerServiceCommandMethod } from '@ocap/kernel'; +import { rpcErrors } from '@metamask/rpc-errors'; +import type { JsonRpcResponse } from '@metamask/utils'; +import type { VatId, VatConfig } from '@ocap/kernel'; import type { PostMessageTarget } from '@ocap/streams/browser'; import { TestDuplexStream } from '@ocap/test-utils/streams'; import type { Logger } from '@ocap/utils'; -import { delay, makeLogger } from '@ocap/utils'; +import { delay, makeLogger, stringify } from '@ocap/utils'; import { describe, it, expect, beforeEach, vi } from 'vitest'; import type { VatWorkerClientStream } from './VatWorkerClient.ts'; @@ -38,39 +39,28 @@ const makeVatConfig = (sourceSpec: string = 'bogus.js'): VatConfig => ({ sourceSpec, }); -const makeMessageEvent = ( +const makeMessageEvent = >( messageId: `m${number}`, - payload: VatWorkerServiceReply['payload'], + payload: Response, port?: MessagePort, -): MessageEvent => +): MessageEvent => new MessageEvent('message', { - data: { id: messageId, payload }, + data: { ...payload, id: messageId, jsonrpc: '2.0' }, ports: port ? [port] : [], }); -const makeLaunchReply = (messageId: `m${number}`, vatId: VatId): MessageEvent => +const makeLaunchReply = (messageId: `m${number}`): MessageEvent => makeMessageEvent( messageId, { - method: VatWorkerServiceCommandMethod.launch, - params: { vatId }, + result: null, }, new MessageChannel().port1, ); -const makeTerminateReply = ( - messageId: `m${number}`, - vatId: VatId, -): MessageEvent => - makeMessageEvent(messageId, { - method: VatWorkerServiceCommandMethod.terminate, - params: { vatId }, - }); - -const makeTerminateAllReply = (messageId: `m${number}`): MessageEvent => +const makeNullReply = (messageId: `m${number}`): MessageEvent => makeMessageEvent(messageId, { - method: VatWorkerServiceCommandMethod.terminateAll, - params: null, + result: null, }); describe('ExtensionVatWorkerClient', () => { @@ -107,58 +97,30 @@ describe('ExtensionVatWorkerClient', () => { }); }); - it('logs error for unexpected methods', async () => { - const errorSpy = vi.spyOn(clientLogger, 'error'); - client.launch('v0', makeVatConfig()).catch((error) => { - throw error; - }); - // @ts-expect-error Destructive testing. - await stream.receiveInput(makeMessageEvent('m1', { method: 'foo' })); - await delay(10); - - expect(errorSpy).toHaveBeenCalled(); - expect(errorSpy).toHaveBeenCalledWith( - 'Received message with unexpected method', - 'foo', - ); - }); - it('rejects pending promises for error replies', async () => { const resultP = client.launch('v0', makeVatConfig()); await stream.receiveInput( makeMessageEvent('m1', { - method: VatWorkerServiceCommandMethod.launch, - params: { vatId: 'v0', error: new Error('foo') }, + error: rpcErrors.internal('foo'), }), ); await expect(resultP).rejects.toThrow('foo'); }); - it.each` - method - ${VatWorkerServiceCommandMethod.launch} - ${VatWorkerServiceCommandMethod.terminate} - `( - "calls logger.error when receiving a $method reply it wasn't waiting for", - async ({ method }) => { - const errorSpy = vi.spyOn(clientLogger, 'error'); - const unexpectedReply = makeMessageEvent('m9', { - method, - params: { vatId: 'v0' }, - }); - - await stream.receiveInput(unexpectedReply); - await delay(10); + it('calls logger.error when receiving an unexpected reply', async () => { + const errorSpy = vi.spyOn(clientLogger, 'error'); + const unexpectedReply = makeNullReply('m9'); - expect(errorSpy).toHaveBeenCalledOnce(); - expect(errorSpy).toHaveBeenLastCalledWith( - 'Received unexpected reply', - unexpectedReply.data, - ); - }, - ); + await stream.receiveInput(unexpectedReply); + await delay(10); + + expect(errorSpy).toHaveBeenCalledOnce(); + expect(errorSpy).toHaveBeenLastCalledWith( + 'Received response with unexpected id "m9".', + ); + }); describe('launch', () => { it('resolves with a duplex stream when receiving a launch reply', async () => { @@ -167,39 +129,29 @@ describe('ExtensionVatWorkerClient', () => { const result = client.launch(vatId, vatConfig); await delay(10); - await stream.receiveInput(makeLaunchReply('m1', vatId)); + await stream.receiveInput(makeLaunchReply('m1')); // @ocap/streams is mocked expect(await result).toBeInstanceOf(TestDuplexStream); }); - it('logs error when receiving reply without a port', async () => { - const errorSpy = vi.spyOn(clientLogger, 'error'); + it('throws an error when receiving reply without a port', async () => { const vatId: VatId = 'v0'; const vatConfig = makeVatConfig(); - client.launch(vatId, vatConfig).catch((error) => { - throw error; - }); - const reply = makeMessageEvent('m1', { - method: VatWorkerServiceCommandMethod.launch, - params: { vatId }, - }); + const launchP = client.launch(vatId, vatConfig); + const reply = makeNullReply('m1'); await stream.receiveInput(reply); - await delay(10); - - expect(errorSpy).toHaveBeenCalledOnce(); - expect(errorSpy.mock.lastCall?.[0]).toBe( - 'Expected a port with message reply', + await expect(launchP).rejects.toThrow( + `No port found for launch of: ${stringify({ vatId, vatConfig })}`, ); - expect(errorSpy.mock.lastCall?.[1]).toBe(reply); }); }); describe('terminate', () => { it('resolves when receiving a terminate reply', async () => { const result = client.terminate('v0'); - await stream.receiveInput(makeTerminateReply('m1', 'v0')); + await stream.receiveInput(makeNullReply('m1')); await delay(10); expect(await result).toBeUndefined(); @@ -209,7 +161,7 @@ describe('ExtensionVatWorkerClient', () => { describe('terminateAll', () => { it('resolves when receiving a terminateAll reply', async () => { const result = client.terminateAll(); - await stream.receiveInput(makeTerminateAllReply('m1')); + await stream.receiveInput(makeNullReply('m1')); await delay(10); expect(await result).toBeUndefined(); diff --git a/packages/extension/src/kernel-integration/VatWorkerClient.ts b/packages/extension/src/kernel-integration/VatWorkerClient.ts index 92db270ab..c7d063f26 100644 --- a/packages/extension/src/kernel-integration/VatWorkerClient.ts +++ b/packages/extension/src/kernel-integration/VatWorkerClient.ts @@ -1,19 +1,19 @@ -import { makePromiseKit } from '@endo/promise-kit'; -import { isObject } from '@metamask/utils'; -import { - isVatCommandReply, - isVatWorkerServiceReply, - VatWorkerServiceCommandMethod, -} from '@ocap/kernel'; +import { isJsonRpcResponse } from '@metamask/utils'; import type { - VatWorkerService, + JsonRpcId, + JsonRpcRequest, + JsonRpcResponse, +} from '@metamask/utils'; +import { isVatCommandReply } from '@ocap/kernel'; +import type { + VatWorkerManager, VatId, - VatWorkerServiceCommand, VatConfig, - VatWorkerServiceReply, VatCommand, VatCommandReply, } from '@ocap/kernel'; +import { vatWorkerService } from '@ocap/kernel/rpc'; +import { RpcClient } from '@ocap/rpc-methods'; import type { DuplexStream } from '@ocap/streams'; import { MessagePortDuplexStream, @@ -23,29 +23,26 @@ import type { PostMessageEnvelope, PostMessageTarget, } from '@ocap/streams/browser'; -import type { Logger, PromiseCallbacks } from '@ocap/utils'; -import { makeCounter, makeLogger } from '@ocap/utils'; +import type { Logger } from '@ocap/utils'; +import { makeLogger, stringify } from '@ocap/utils'; // Appears in the docs. // eslint-disable-next-line @typescript-eslint/no-unused-vars -import type { ExtensionVatWorkerServer } from './VatWorkerServer.ts'; +import type { ExtensionVatWorkerService } from './VatWorkerServer.ts'; export type VatWorkerClientStream = PostMessageDuplexStream< - MessageEvent, - PostMessageEnvelope + MessageEvent, + PostMessageEnvelope >; -export class ExtensionVatWorkerClient implements VatWorkerService { +export class ExtensionVatWorkerClient implements VatWorkerManager { readonly #logger: Logger; readonly #stream: VatWorkerClientStream; - readonly #unresolvedMessages: Map< - VatWorkerServiceCommand['id'], - PromiseCallbacks - > = new Map(); + readonly #rpcClient: RpcClient; - readonly #messageCounter = makeCounter(); + readonly #portMap: Map; /** * **ATTN:** Prefer {@link ExtensionVatWorkerClient.make} over constructing @@ -59,14 +56,26 @@ export class ExtensionVatWorkerClient implements VatWorkerService { * Note that {@link ExtensionVatWorkerClient.start} must be called to start * the client. * - * @see {@link ExtensionVatWorkerServer} for the other end of the service. + * @see {@link ExtensionVatWorkerService} for the other end of the service. * * @param stream - The stream to use for communication with the server. * @param logger - An optional {@link Logger}. Defaults to a new logger labeled '[vat worker client]'. */ constructor(stream: VatWorkerClientStream, logger?: Logger) { this.#stream = stream; + this.#portMap = new Map(); this.#logger = logger ?? makeLogger('[vat worker client]'); + this.#rpcClient = new RpcClient( + vatWorkerService.methodSpecs, + async (request) => { + if (request.method === 'launch') { + this.#portMap.set(request.id, undefined); + } + await this.#stream.write({ payload: request, transfer: [] }); + }, + 'm', + this.#logger, + ); } /** @@ -83,11 +92,8 @@ export class ExtensionVatWorkerClient implements VatWorkerService { const stream: VatWorkerClientStream = new PostMessageDuplexStream({ messageTarget, messageEventMode: 'event', - validateInput: ( - message, - ): message is MessageEvent => - message instanceof MessageEvent && - isVatWorkerServiceReply(message.data), + validateInput: (message): message is MessageEvent => + message instanceof MessageEvent && isJsonRpcResponse(message.data), }); return new ExtensionVatWorkerClient(stream, logger); } @@ -103,91 +109,56 @@ export class ExtensionVatWorkerClient implements VatWorkerService { .then(async () => this.#stream.drain(this.#handleMessage.bind(this))); } - async #sendMessage( - payload: VatWorkerServiceCommand['payload'], - ): Promise { - const message: VatWorkerServiceCommand = { - id: `m${this.#messageCounter()}`, - payload, - }; - const { promise, resolve, reject } = makePromiseKit(); - this.#unresolvedMessages.set(message.id, { - resolve: resolve as (value: unknown) => void, - reject, - }); - await this.#stream.write({ payload: message, transfer: [] }); - return promise; - } - async launch( vatId: VatId, vatConfig: VatConfig, ): Promise> { - return this.#sendMessage({ - method: VatWorkerServiceCommandMethod.launch, - params: { vatId, vatConfig }, + const [id] = await this.#rpcClient.callAndGetId('launch', { + vatId, + vatConfig, }); + const port = this.#portMap.get(id); + if (!port) { + throw new Error( + `No port found for launch of: ${stringify({ vatId, vatConfig })}`, + ); + } + this.#portMap.delete(id); + return await MessagePortDuplexStream.make( + port, + isVatCommandReply, + ); } async terminate(vatId: VatId): Promise { - return this.#sendMessage({ - method: VatWorkerServiceCommandMethod.terminate, - params: { vatId }, - }); + await this.#rpcClient.call('terminate', { vatId }); } async terminateAll(): Promise { - return this.#sendMessage({ - method: VatWorkerServiceCommandMethod.terminateAll, - params: null, - }); + await this.#rpcClient.call('terminateAll', []); } - async #handleMessage( - event: MessageEvent, - ): Promise { - const { id, payload } = event.data; - const { method } = payload; + async #handleMessage(event: MessageEvent): Promise { + const { id } = event.data; const port = event.ports.at(0); - - const promise = this.#unresolvedMessages.get(id); - - if (!promise) { - this.#logger.error('Received unexpected reply', event.data); + if (typeof id !== 'string') { + this.#logger.error( + 'Received response with unexpected id:', + stringify(event.data), + ); return; } - if (isObject(payload.params) && payload.params.error) { - promise.reject(payload.params.error); - return; + if (this.#portMap.has(id)) { + this.#portMap.set(id, port); + } else if (port !== undefined) { + this.#logger.error( + 'Received message with unexpected port:', + stringify(event.data), + ); } - switch (method) { - case VatWorkerServiceCommandMethod.launch: - if (!port) { - this.#logger.error('Expected a port with message reply', event); - return; - } - promise.resolve( - MessagePortDuplexStream.make( - port, - isVatCommandReply, - ), - ); - break; - case VatWorkerServiceCommandMethod.terminate: - case VatWorkerServiceCommandMethod.terminateAll: - // If we were caching streams on the client this would be a good place - // to remove them. - promise.resolve(undefined); - break; - default: - this.#logger.error( - 'Received message with unexpected method', - // @ts-expect-error Compile-time exhaustiveness check - method.valueOf(), - ); - } + this.#rpcClient.handleResponse(id, event.data); } } harden(ExtensionVatWorkerClient); diff --git a/packages/extension/src/kernel-integration/VatWorkerServer.test.ts b/packages/extension/src/kernel-integration/VatWorkerServer.test.ts index 0767fee07..f2e163b28 100644 --- a/packages/extension/src/kernel-integration/VatWorkerServer.test.ts +++ b/packages/extension/src/kernel-integration/VatWorkerServer.test.ts @@ -1,6 +1,7 @@ +import { rpcErrors } from '@metamask/rpc-errors'; +import type { JsonRpcRequest } from '@metamask/utils'; import { VatAlreadyExistsError, VatNotFoundError } from '@ocap/errors'; -import { VatWorkerServiceCommandMethod } from '@ocap/kernel'; -import type { VatConfig, VatId, VatWorkerServiceCommand } from '@ocap/kernel'; +import type { VatConfig, VatId } from '@ocap/kernel'; import type { PostMessageTarget } from '@ocap/streams/browser'; import { TestDuplexStream } from '@ocap/test-utils/streams'; import type { Logger } from '@ocap/utils'; @@ -8,8 +9,8 @@ import { delay, makeLogger } from '@ocap/utils'; import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; import type { Mock } from 'vitest'; -import { ExtensionVatWorkerServer } from './VatWorkerServer.ts'; -import type { VatWorker, VatWorkerServerStream } from './VatWorkerServer.ts'; +import { ExtensionVatWorkerService } from './VatWorkerServer.ts'; +import type { VatWorker, VatWorkerServiceStream } from './VatWorkerServer.ts'; vi.mock('@ocap/kernel', () => ({ VatWorkerServiceCommandMethod: { @@ -25,9 +26,11 @@ const makeVatConfig = (sourceSpec = 'bogus.js'): VatConfig => ({ const makeMessageEvent = ( messageId: `m${number}`, - payload: VatWorkerServiceCommand['payload'], -): MessageEvent => - new MessageEvent('message', { data: { id: messageId, payload } }); + payload: Pick, +): MessageEvent => + new MessageEvent('message', { + data: { ...payload, id: messageId, jsonrpc: '2.0' }, + }); const makeLaunchMessageEvent = ( messageId: `m${number}`, @@ -35,7 +38,7 @@ const makeLaunchMessageEvent = ( sourceSpec = 'bogus.js', ): MessageEvent => makeMessageEvent(messageId, { - method: VatWorkerServiceCommandMethod.launch, + method: 'launch', params: { vatId, vatConfig: makeVatConfig(sourceSpec) }, }); @@ -44,17 +47,17 @@ const makeTerminateMessageEvent = ( vatId: VatId, ): MessageEvent => makeMessageEvent(messageId, { - method: VatWorkerServiceCommandMethod.terminate, + method: 'terminate', params: { vatId }, }); const makeTerminateAllMessageEvent = (messageId: `m${number}`): MessageEvent => makeMessageEvent(messageId, { - method: VatWorkerServiceCommandMethod.terminateAll, - params: null, + method: 'terminateAll', + params: [], }); -describe('ExtensionVatWorkerServer', () => { +describe('ExtensionVatWorkerService', () => { let cleanup: (() => Promise)[] = []; beforeEach(() => { @@ -75,15 +78,15 @@ describe('ExtensionVatWorkerServer', () => { it('constructs with default logger', async () => { const stream = await TestDuplexStream.make(() => undefined); expect( - new ExtensionVatWorkerServer( - stream as unknown as VatWorkerServerStream, + new ExtensionVatWorkerService( + stream as unknown as VatWorkerServiceStream, () => ({}) as unknown as VatWorker, ), ).toBeDefined(); }); it('constructs using static factory method', () => { - const server = ExtensionVatWorkerServer.make( + const server = ExtensionVatWorkerService.make( { postMessage: vi.fn(), addEventListener: vi.fn(), @@ -98,7 +101,7 @@ describe('ExtensionVatWorkerServer', () => { let workers: ReturnType[] = []; let stream: TestDuplexStream; let logger: Logger; - let server: ExtensionVatWorkerServer; + let server: ExtensionVatWorkerService; const makeMockVatWorker = ( _id: string, @@ -123,8 +126,8 @@ describe('ExtensionVatWorkerServer', () => { workers = []; logger = makeLogger('[test server]'); stream = await TestDuplexStream.make(() => undefined); - server = new ExtensionVatWorkerServer( - stream as unknown as VatWorkerServerStream, + server = new ExtensionVatWorkerService( + stream as unknown as VatWorkerServiceStream, makeMockVatWorker, logger, ); @@ -132,7 +135,6 @@ describe('ExtensionVatWorkerServer', () => { throw error; }); - // Add cleanup addCleanup(async () => { await stream.return?.(); }); @@ -140,14 +142,13 @@ describe('ExtensionVatWorkerServer', () => { it('logs an error for unexpected methods', async () => { const errorSpy = vi.spyOn(logger, 'error'); - // @ts-expect-error Destructive testing. await stream.receiveInput(makeMessageEvent('m0', { method: 'foo' })); await delay(10); expect(errorSpy).toHaveBeenCalledOnce(); expect(errorSpy).toHaveBeenCalledWith( - 'Received message with unexpected method', - 'foo', + 'Error handling "foo" request:', + rpcErrors.methodNotFound(), ); }); @@ -170,7 +171,7 @@ describe('ExtensionVatWorkerServer', () => { expect(errorSpy).toHaveBeenCalledOnce(); expect(errorSpy).toHaveBeenCalledWith( - `Error handling ${VatWorkerServiceCommandMethod.launch} for vatId v0`, + 'Error handling "launch" request:', new VatAlreadyExistsError('v0'), ); }); @@ -200,7 +201,7 @@ describe('ExtensionVatWorkerServer', () => { expect(errorSpy).toHaveBeenCalledOnce(); expect(errorSpy).toHaveBeenCalledWith( - `Error handling ${VatWorkerServiceCommandMethod.terminate} for vatId v0`, + 'Error handling "terminate" request:', new VatNotFoundError('v0'), ); }); @@ -221,7 +222,7 @@ describe('ExtensionVatWorkerServer', () => { expect(errorSpy).toHaveBeenCalledOnce(); expect(errorSpy).toHaveBeenCalledWith( - `Error handling ${VatWorkerServiceCommandMethod.terminate} for vatId ${vatId}`, + 'Error handling "terminate" request:', vatNotFoundError, ); }); @@ -265,7 +266,7 @@ describe('ExtensionVatWorkerServer', () => { expect(errorSpy).toHaveBeenCalledOnce(); expect(errorSpy).toHaveBeenCalledWith( - `Error handling ${VatWorkerServiceCommandMethod.terminateAll} for vatId ${vatId}`, + 'Error handling "terminateAll" request:', vatNotFoundError, ); }); diff --git a/packages/extension/src/kernel-integration/VatWorkerServer.ts b/packages/extension/src/kernel-integration/VatWorkerServer.ts index e81d9cbb8..332036129 100644 --- a/packages/extension/src/kernel-integration/VatWorkerServer.ts +++ b/packages/extension/src/kernel-integration/VatWorkerServer.ts @@ -1,14 +1,16 @@ -import { VatAlreadyExistsError, VatNotFoundError } from '@ocap/errors'; -import { - isVatWorkerServiceCommand, - VatWorkerServiceCommandMethod, -} from '@ocap/kernel'; +import { rpcErrors, serializeError } from '@metamask/rpc-errors'; +import { hasProperty, isJsonRpcRequest } from '@metamask/utils'; import type { - VatWorkerServiceReply, - VatId, - VatConfig, - VatWorkerServiceCommand, -} from '@ocap/kernel'; + JsonRpcId, + JsonRpcParams, + JsonRpcRequest, + JsonRpcResponse, +} from '@metamask/utils'; +import { VatAlreadyExistsError, VatNotFoundError } from '@ocap/errors'; +import type { VatId, VatConfig } from '@ocap/kernel'; +import type { VatWorkerServiceMethod } from '@ocap/kernel/rpc'; +import { vatWorkerService } from '@ocap/kernel/rpc'; +import type { ExtractParams } from '@ocap/rpc-methods'; import { PostMessageDuplexStream } from '@ocap/streams/browser'; import type { PostMessageEnvelope, @@ -26,22 +28,22 @@ export type VatWorker = { terminate: () => Promise; }; -export type VatWorkerServerStream = PostMessageDuplexStream< - MessageEvent, - PostMessageEnvelope +export type VatWorkerServiceStream = PostMessageDuplexStream< + MessageEvent, + PostMessageEnvelope >; -export class ExtensionVatWorkerServer { +export class ExtensionVatWorkerService { readonly #logger; - readonly #stream: VatWorkerServerStream; + readonly #stream: VatWorkerServiceStream; readonly #vatWorkers: Map = new Map(); readonly #makeWorker: (vatId: VatId) => VatWorker; /** - * **ATTN:** Prefer {@link ExtensionVatWorkerServer.make} over constructing + * **ATTN:** Prefer {@link ExtensionVatWorkerService.make} over constructing * this class directly. * * The server end of the vat worker service, intended to be constructed in @@ -49,7 +51,7 @@ export class ExtensionVatWorkerServer { * from the client and uses the {@link VatWorker} methods to effect those * requests. * - * Note that {@link ExtensionVatWorkerServer.start} must be called to start + * Note that {@link ExtensionVatWorkerService.start} must be called to start * the server. * * @see {@link ExtensionVatWorkerClient} for the other end of the service. @@ -59,7 +61,7 @@ export class ExtensionVatWorkerServer { * @param logger - An optional {@link Logger}. Defaults to a new logger labeled '[vat worker server]'. */ constructor( - stream: VatWorkerServerStream, + stream: VatWorkerServiceStream, makeWorker: (vatId: VatId) => VatWorker, logger?: Logger, ) { @@ -69,29 +71,26 @@ export class ExtensionVatWorkerServer { } /** - * Create a new {@link ExtensionVatWorkerServer}. Does not start the server. + * Create a new {@link ExtensionVatWorkerService}. Does not start the server. * * @param messageTarget - The target to use for posting and receiving messages. * @param makeWorker - A method for making a {@link VatWorker}. * @param logger - An optional {@link Logger}. - * @returns A new {@link ExtensionVatWorkerServer}. + * @returns A new {@link ExtensionVatWorkerService}. */ static make( messageTarget: PostMessageTarget, makeWorker: (vatId: VatId) => VatWorker, logger?: Logger, - ): ExtensionVatWorkerServer { - const stream: VatWorkerServerStream = new PostMessageDuplexStream({ + ): ExtensionVatWorkerService { + const stream: VatWorkerServiceStream = new PostMessageDuplexStream({ messageTarget, messageEventMode: 'event', - validateInput: ( - message, - ): message is MessageEvent => - message instanceof MessageEvent && - isVatWorkerServiceCommand(message.data), + validateInput: (message): message is MessageEvent => + message instanceof MessageEvent && isJsonRpcRequest(message.data), }); - return new ExtensionVatWorkerServer(stream, makeWorker, logger); + return new ExtensionVatWorkerService(stream, makeWorker, logger); } /** @@ -105,45 +104,63 @@ export class ExtensionVatWorkerServer { .then(async () => this.#stream.drain(this.#handleMessage.bind(this))); } - async #handleMessage( - event: MessageEvent, - ): Promise { - const { id, payload } = event.data; - const { method, params } = payload; + #assertHasMethod(method: string): asserts method is VatWorkerServiceMethod { + if (!hasProperty(vatWorkerService.methodSpecs, method)) { + throw rpcErrors.methodNotFound(); + } + } + + #assertParams( + method: Method, + params: unknown, + ): asserts params is ExtractParams< + Method, + typeof vatWorkerService.methodSpecs + > { + vatWorkerService.methodSpecs[method].params.assert(params); + } - const handleError = (error: Error, vatId: VatId): void => { - this.#logger.error(`Error handling ${method} for vatId ${vatId}`, error); - // eslint-disable-next-line promise/no-promise-in-callback + async #handleMessage(event: MessageEvent): Promise { + const { id, method, params } = event.data; + try { + await this.#executeMethod(id, method, params); + } catch (error) { + this.#logger.error(`Error handling "${method}" request:`, error); this.#sendMessage({ id, - payload: { method, params: { vatId, error } }, + error: serializeError(error), + jsonrpc: '2.0', }).catch(() => undefined); - }; + } + } + + async #executeMethod( + messageId: JsonRpcId, + method: string, + params: JsonRpcParams | undefined, + ): Promise { + this.#assertHasMethod(method); + + let port: MessagePort | undefined; switch (method) { - case VatWorkerServiceCommandMethod.launch: { + case 'launch': { + this.#assertParams(method, params); const { vatId, vatConfig } = params; - const replyParams = { vatId }; - const replyPayload = { method, params: replyParams }; - await this.#launch(vatId, vatConfig) - .then(async (port) => - this.#sendMessage({ id, payload: replyPayload }, port), - ) - .catch(async (error) => handleError(error, vatId)); + port = await this.#launch(vatId, vatConfig); break; } - case VatWorkerServiceCommandMethod.terminate: - await this.#terminate(params.vatId) - .then(async () => this.#sendMessage({ id, payload })) - .catch(async (error) => handleError(error, params.vatId)); + case 'terminate': + this.#assertParams(method, params); + await this.#terminate(params.vatId); break; - case VatWorkerServiceCommandMethod.terminateAll: + case 'terminateAll': + this.#assertParams(method, params); await Promise.all( Array.from(this.#vatWorkers.keys()).map(async (vatId) => - this.#terminate(vatId).catch((error) => handleError(error, vatId)), + this.#terminate(vatId), ), ); - await this.#sendMessage({ id, payload }); break; default: this.#logger.error( @@ -151,11 +168,16 @@ export class ExtensionVatWorkerServer { // @ts-expect-error Compile-time exhaustiveness check method.valueOf(), ); + throw rpcErrors.methodNotFound(); } + await this.#sendMessage( + { id: messageId, result: null, jsonrpc: '2.0' }, + port, + ); } async #sendMessage( - message: VatWorkerServiceReply, + message: JsonRpcResponse, port?: MessagePort, ): Promise { await this.#stream.write({ @@ -183,4 +205,4 @@ export class ExtensionVatWorkerServer { this.#vatWorkers.delete(vatId); } } -harden(ExtensionVatWorkerServer); +harden(ExtensionVatWorkerService); diff --git a/packages/extension/src/offscreen.ts b/packages/extension/src/offscreen.ts index ec86355f2..2808367d4 100644 --- a/packages/extension/src/offscreen.ts +++ b/packages/extension/src/offscreen.ts @@ -10,7 +10,7 @@ import type { PostMessageTarget } from '@ocap/streams/browser'; import { delay, makeLogger } from '@ocap/utils'; import { makeIframeVatWorker } from './kernel-integration/iframe-vat-worker.ts'; -import { ExtensionVatWorkerServer } from './kernel-integration/VatWorkerServer.ts'; +import { ExtensionVatWorkerService } from './kernel-integration/VatWorkerServer.ts'; const logger = makeLogger('[offscreen]'); @@ -29,11 +29,11 @@ async function main(): Promise { KernelCommandReply >(chrome.runtime, 'offscreen', 'background'); - const { kernelStream, vatWorkerServer } = await makeKernelWorker(); + const { kernelStream, vatWorkerService } = await makeKernelWorker(); // Handle messages from the background script / kernel await Promise.all([ - vatWorkerServer.start(), + vatWorkerService.start(), kernelStream.pipe(backgroundStream), backgroundStream.pipe(kernelStream), ]); @@ -46,7 +46,7 @@ async function main(): Promise { */ async function makeKernelWorker(): Promise<{ kernelStream: DuplexStream; - vatWorkerServer: ExtensionVatWorkerServer; + vatWorkerService: ExtensionVatWorkerService; }> { const worker = new Worker('kernel-worker.js', { type: 'module' }); @@ -59,13 +59,13 @@ async function makeKernelWorker(): Promise<{ KernelCommand >(port, isKernelCommandReply); - const vatWorkerServer = ExtensionVatWorkerServer.make( + const vatWorkerService = ExtensionVatWorkerService.make( worker as PostMessageTarget, (vatId) => makeIframeVatWorker(vatId, initializeMessageChannel), ); return { kernelStream, - vatWorkerServer, + vatWorkerService, }; } diff --git a/packages/kernel-test/src/utils.ts b/packages/kernel-test/src/utils.ts index 96bb399b1..97d85f809 100644 --- a/packages/kernel-test/src/utils.ts +++ b/packages/kernel-test/src/utils.ts @@ -7,7 +7,7 @@ import type { KernelCommand, KernelCommandReply, } from '@ocap/kernel'; -import { NodejsVatWorkerService } from '@ocap/nodejs'; +import { NodejsVatWorkerManager } from '@ocap/nodejs'; import type { KernelDatabase } from '@ocap/store'; import { NodeWorkerDuplexStream } from '@ocap/streams'; import { waitUntilQuiescent } from '@ocap/utils'; @@ -84,7 +84,7 @@ export async function makeKernel( KernelCommand, KernelCommandReply >(kernelPort); - const vatWorkerClient = new NodejsVatWorkerService({}); + const vatWorkerClient = new NodejsVatWorkerManager({}); const kernel = await Kernel.make( nodeStream, vatWorkerClient, diff --git a/packages/kernel/package.json b/packages/kernel/package.json index bc1396324..326f8a7a4 100644 --- a/packages/kernel/package.json +++ b/packages/kernel/package.json @@ -19,6 +19,16 @@ "default": "./dist/index.cjs" } }, + "./rpc": { + "import": { + "types": "./dist/rpc/index.d.mts", + "default": "./dist/rpc/index.mjs" + }, + "require": { + "types": "./dist/rpc/index.d.cts", + "default": "./dist/rpc/index.cjs" + } + }, "./package.json": "./package.json" }, "files": [ @@ -52,6 +62,7 @@ "@metamask/superstruct": "^3.2.0", "@metamask/utils": "^11.4.0", "@ocap/errors": "workspace:^", + "@ocap/rpc-methods": "workspace:^", "@ocap/store": "workspace:^", "@ocap/streams": "workspace:^", "@ocap/utils": "workspace:^", diff --git a/packages/kernel/src/Kernel.test.ts b/packages/kernel/src/Kernel.test.ts index 899f9ec77..2cdfb7e49 100644 --- a/packages/kernel/src/Kernel.test.ts +++ b/packages/kernel/src/Kernel.test.ts @@ -15,7 +15,7 @@ import type { import type { VatId, VatConfig, - VatWorkerService, + VatWorkerManager, ClusterConfig, } from './types.ts'; import { VatHandle } from './VatHandle.ts'; @@ -38,7 +38,7 @@ const makeMockClusterConfig = (): ClusterConfig => ({ describe('Kernel', () => { let mockStream: DuplexStream; - let mockWorkerService: VatWorkerService; + let mockWorkerService: VatWorkerManager; let launchWorkerMock: MockInstance; let terminateWorkerMock: MockInstance; let makeVatHandleMock: MockInstance; @@ -56,7 +56,7 @@ describe('Kernel', () => { ({}) as unknown as DuplexStream, terminate: async () => undefined, terminateAll: async () => undefined, - } as unknown as VatWorkerService; + } as unknown as VatWorkerManager; launchWorkerMock = vi .spyOn(mockWorkerService, 'launch') diff --git a/packages/kernel/src/Kernel.ts b/packages/kernel/src/Kernel.ts index 53ea9e4da..4bbae90e0 100644 --- a/packages/kernel/src/Kernel.ts +++ b/packages/kernel/src/Kernel.ts @@ -30,7 +30,7 @@ import type { VatId, VRef, KRef, - VatWorkerService, + VatWorkerManager, ClusterConfig, VatConfig, RunQueueItem, @@ -74,7 +74,7 @@ export class Kernel { readonly #vats: Map; /** Service to spawn workers (in iframes) for vats to run in */ - readonly #vatWorkerService: VatWorkerService; + readonly #vatWorkerService: VatWorkerManager; /** Storage holding the kernel's own persistent state */ readonly #kernelStore: KernelStore; @@ -105,7 +105,7 @@ export class Kernel { // eslint-disable-next-line no-restricted-syntax private constructor( commandStream: DuplexStream, - vatWorkerService: VatWorkerService, + vatWorkerService: VatWorkerManager, kernelDatabase: KernelDatabase, options: { resetStorage?: boolean; @@ -138,7 +138,7 @@ export class Kernel { */ static async make( commandStream: DuplexStream, - vatWorkerService: VatWorkerService, + vatWorkerService: VatWorkerManager, kernelDatabase: KernelDatabase, options: { resetStorage?: boolean; diff --git a/packages/kernel/src/index.test.ts b/packages/kernel/src/index.test.ts index bc02a6c18..5468cac7e 100644 --- a/packages/kernel/src/index.test.ts +++ b/packages/kernel/src/index.test.ts @@ -14,7 +14,6 @@ describe('index', () => { 'VatHandle', 'VatIdStruct', 'VatSupervisor', - 'VatWorkerServiceCommandMethod', 'isKernelCommand', 'isKernelCommandReply', 'isVatCommand', @@ -22,8 +21,6 @@ describe('index', () => { 'isVatCommandReply', 'isVatConfig', 'isVatId', - 'isVatWorkerServiceCommand', - 'isVatWorkerServiceReply', 'kser', 'kunser', 'makeKernelStore', diff --git a/packages/kernel/src/index.ts b/packages/kernel/src/index.ts index 364c69bc2..0568815b2 100644 --- a/packages/kernel/src/index.ts +++ b/packages/kernel/src/index.ts @@ -5,7 +5,7 @@ export { VatSupervisor } from './VatSupervisor.ts'; export type { Message } from '@agoric/swingset-liveslots'; export type { VatId, - VatWorkerService, + VatWorkerManager, ClusterConfig, VatConfig, VatCheckpoint, diff --git a/packages/kernel/src/messages/index.ts b/packages/kernel/src/messages/index.ts index fb2313973..7e9e8d419 100644 --- a/packages/kernel/src/messages/index.ts +++ b/packages/kernel/src/messages/index.ts @@ -21,15 +21,3 @@ export type { VatCommandReply, VatCommandReturnType, } from './vat.ts'; - -// Vat worker service commands. - -export { - VatWorkerServiceCommandMethod, - isVatWorkerServiceCommand, - isVatWorkerServiceReply, -} from './vat-worker-service.ts'; -export type { - VatWorkerServiceCommand, - VatWorkerServiceReply, -} from './vat-worker-service.ts'; diff --git a/packages/kernel/src/messages/vat-worker-service.test.ts b/packages/kernel/src/messages/vat-worker-service.test.ts deleted file mode 100644 index eabbbda25..000000000 --- a/packages/kernel/src/messages/vat-worker-service.test.ts +++ /dev/null @@ -1,159 +0,0 @@ -import { describe, expect, it } from 'vitest'; - -import type { - VatWorkerServiceCommand, - VatWorkerServiceReply, -} from './vat-worker-service.ts'; -import { - isVatWorkerServiceCommand, - isVatWorkerServiceReply, - VatWorkerServiceCommandMethod, -} from './vat-worker-service.ts'; - -const launchPayload: VatWorkerServiceCommand['payload'] = harden({ - method: VatWorkerServiceCommandMethod.launch, - params: { vatId: 'v0', vatConfig: { sourceSpec: 'bogus.js' } }, -}); -const launchReplyPayload: VatWorkerServiceReply['payload'] = harden({ - method: VatWorkerServiceCommandMethod.launch, - params: { vatId: 'v0' }, -}); - -const terminatePayload: VatWorkerServiceCommand['payload'] = harden({ - method: VatWorkerServiceCommandMethod.terminate, - params: { vatId: 'v0' }, -}); -const terminateReplyPayload: VatWorkerServiceReply['payload'] = harden({ - method: VatWorkerServiceCommandMethod.terminate, - params: { vatId: 'v0' }, -}); - -const terminateAllPayload: VatWorkerServiceCommand['payload'] = harden({ - method: VatWorkerServiceCommandMethod.terminateAll, - params: null, -}); -const terminateAllReplyPayload: VatWorkerServiceReply['payload'] = harden({ - method: VatWorkerServiceCommandMethod.terminateAll, - params: null, -}); - -describe('isVatWorkerServiceCommand', () => { - describe.each` - payload - ${launchPayload} - ${terminatePayload} - ${terminateAllPayload} - `('$payload.method', ({ payload }) => { - it.each([ - [true, 'valid message id with valid payload', { id: 'm0', payload }], - [false, 'invalid id', { id: 'vat-message-id', payload }], - [false, 'numerical id', { id: 1, payload }], - [false, 'missing payload', { id: 'm0' }], - ])('returns %j for %j', (expectedResult, _, value) => { - expect(isVatWorkerServiceCommand(value)).toBe(expectedResult); - }); - }); -}); - -describe('isVatWorkerServiceReply', () => { - const withError = ( - payload: VatWorkerServiceReply['payload'], - problem: unknown, - ): unknown => ({ - method: payload.method, - params: { ...payload.params, error: problem }, - }); - - describe('launch', () => { - it.each([ - [ - true, - 'valid message id with valid payload', - { id: 'm0', payload: launchReplyPayload }, - ], - [ - false, - 'invalid id', - { id: 'vat-message-id', payload: launchReplyPayload }, - ], - [false, 'numerical id', { id: 1, payload: launchReplyPayload }], - [false, 'missing payload', { id: 'm0' }], - [ - true, - 'valid message id with error', - { id: 'm0', payload: withError(launchReplyPayload, new Error('foo')) }, - ], - [ - false, - 'valid message id with invalid error', - { id: 'm0', payload: withError(launchReplyPayload, 404) }, - ], - ])('returns %j for %j', (expectedResult, _, value) => { - expect(isVatWorkerServiceReply(value)).toBe(expectedResult); - }); - }); - - describe('terminate', () => { - it.each([ - [ - true, - 'valid message id with valid payload', - { id: 'm0', payload: terminateReplyPayload }, - ], - [ - false, - 'invalid id', - { id: 'vat-message-id', payload: terminateReplyPayload }, - ], - [false, 'numerical id', { id: 1, payload: terminateReplyPayload }], - [false, 'missing payload', { id: 'm0' }], - [ - true, - 'valid message id with error', - { - id: 'm0', - payload: withError(terminateReplyPayload, new Error('foo')), - }, - ], - [ - false, - 'valid message id with invalid error', - { id: 'm0', payload: withError(terminateReplyPayload, 404) }, - ], - ])('returns %j for %j', (expectedResult, _, value) => { - expect(isVatWorkerServiceReply(value)).toBe(expectedResult); - }); - }); - - describe('terminateAll', () => { - it.each([ - [ - true, - 'valid message id with valid payload', - { id: 'm0', payload: terminateAllReplyPayload }, - ], - [ - false, - 'invalid id', - { id: 'vat-message-id', payload: terminateAllReplyPayload }, - ], - [false, 'numerical id', { id: 1, payload: terminateAllReplyPayload }], - [false, 'missing payload', { id: 'm0' }], - [ - true, - 'valid message id with error', - { - id: 'm0', - payload: withError(terminateAllReplyPayload, new Error('foo')), - }, - ], - [ - false, - 'valid message id with invalid error', - { id: 'm0', payload: withError(terminateAllReplyPayload, 404) }, - ], - ])('returns %j for %j', (expectedResult, _, value) => { - expect(isVatWorkerServiceReply(value)).toBe(expectedResult); - }); - }); -}); diff --git a/packages/kernel/src/messages/vat-worker-service.ts b/packages/kernel/src/messages/vat-worker-service.ts deleted file mode 100644 index f5dcb56b9..000000000 --- a/packages/kernel/src/messages/vat-worker-service.ts +++ /dev/null @@ -1,72 +0,0 @@ -import { object, union, optional, is, literal } from '@metamask/superstruct'; -import type { Infer } from '@metamask/superstruct'; -import { ErrorStruct } from '@ocap/errors'; -import type { TypeGuard } from '@ocap/utils'; - -import { VatIdStruct, VatMessageIdStruct, VatConfigStruct } from '../types.ts'; - -export const VatWorkerServiceCommandMethod = { - launch: 'launch', - terminate: 'terminate', - terminateAll: 'terminateAll', -} as const; - -const VatWorkerServiceCommandStruct = object({ - id: VatMessageIdStruct, - payload: union([ - object({ - method: literal(VatWorkerServiceCommandMethod.launch), - params: object({ vatId: VatIdStruct, vatConfig: VatConfigStruct }), - }), - object({ - method: literal(VatWorkerServiceCommandMethod.terminate), - params: object({ vatId: VatIdStruct }), - }), - object({ - method: literal(VatWorkerServiceCommandMethod.terminateAll), - params: literal(null), - }), - ]), -}); - -const VatWorkerServiceCommandReplyStruct = object({ - id: VatMessageIdStruct, - payload: union([ - object({ - method: union([ - literal(VatWorkerServiceCommandMethod.launch), - literal(VatWorkerServiceCommandMethod.terminate), - ]), - params: object({ - vatId: VatIdStruct, - error: optional(ErrorStruct), - }), - }), - object({ - method: literal(VatWorkerServiceCommandMethod.terminateAll), - params: union([ - literal(null), - object({ - vatId: optional(VatIdStruct), - error: ErrorStruct, - }), - ]), - }), - ]), -}); - -export type VatWorkerServiceCommand = Infer< - typeof VatWorkerServiceCommandStruct ->; -export type VatWorkerServiceReply = Infer< - typeof VatWorkerServiceCommandReplyStruct ->; - -export const isVatWorkerServiceCommand: TypeGuard = ( - value: unknown, -): value is VatWorkerServiceCommand => is(value, VatWorkerServiceCommandStruct); - -export const isVatWorkerServiceReply: TypeGuard = ( - value: unknown, -): value is VatWorkerServiceReply => - is(value, VatWorkerServiceCommandReplyStruct); diff --git a/packages/kernel/src/rpc/index.test.ts b/packages/kernel/src/rpc/index.test.ts new file mode 100644 index 000000000..6cbb9870a --- /dev/null +++ b/packages/kernel/src/rpc/index.test.ts @@ -0,0 +1,9 @@ +import { describe, it, expect } from 'vitest'; + +import * as indexModule from './index.ts'; + +describe('index', () => { + it('has the expected exports', () => { + expect(Object.keys(indexModule).sort()).toStrictEqual(['vatWorkerService']); + }); +}); diff --git a/packages/kernel/src/rpc/index.ts b/packages/kernel/src/rpc/index.ts new file mode 100644 index 000000000..8bd8be319 --- /dev/null +++ b/packages/kernel/src/rpc/index.ts @@ -0,0 +1,3 @@ +export * as vatWorkerService from './vat-worker-service/index.ts'; + +export type * from './vat-worker-service/index.ts'; diff --git a/packages/kernel/src/rpc/vat-worker-service/index.ts b/packages/kernel/src/rpc/vat-worker-service/index.ts new file mode 100644 index 000000000..4a13a1a4f --- /dev/null +++ b/packages/kernel/src/rpc/vat-worker-service/index.ts @@ -0,0 +1,22 @@ +import type { MethodSpecRecord } from '@ocap/rpc-methods'; + +import { launchSpec } from './launch.ts'; +import { terminateSpec } from './terminate.ts'; +import { terminateAllSpec } from './terminateAll.ts'; + +// This module only has method specifications and no handlers, because the method +// implementations are highly platform-specific and do not warrant standalone +// implementations. + +export type VatWorkerServiceMethodSpecs = + | typeof launchSpec + | typeof terminateSpec + | typeof terminateAllSpec; + +export const methodSpecs: MethodSpecRecord = { + launch: launchSpec, + terminate: terminateSpec, + terminateAll: terminateAllSpec, +} as const; + +export type VatWorkerServiceMethod = VatWorkerServiceMethodSpecs['method']; diff --git a/packages/kernel/src/rpc/vat-worker-service/launch.ts b/packages/kernel/src/rpc/vat-worker-service/launch.ts new file mode 100644 index 000000000..5c5f65e13 --- /dev/null +++ b/packages/kernel/src/rpc/vat-worker-service/launch.ts @@ -0,0 +1,16 @@ +import { literal, object } from '@metamask/superstruct'; +import type { MethodSpec } from '@ocap/rpc-methods'; + +import { VatIdStruct, VatConfigStruct } from '../../types.ts'; +import type { VatId, VatConfig } from '../../types.ts'; + +type LaunchParams = { + vatId: VatId; + vatConfig: VatConfig; +}; + +export const launchSpec: MethodSpec<'launch', LaunchParams, null> = { + method: 'launch', + params: object({ vatId: VatIdStruct, vatConfig: VatConfigStruct }), + result: literal(null), +}; diff --git a/packages/kernel/src/rpc/vat-worker-service/terminate.ts b/packages/kernel/src/rpc/vat-worker-service/terminate.ts new file mode 100644 index 000000000..5bbc1dd2a --- /dev/null +++ b/packages/kernel/src/rpc/vat-worker-service/terminate.ts @@ -0,0 +1,11 @@ +import { object, literal } from '@metamask/superstruct'; +import type { MethodSpec } from '@ocap/rpc-methods'; + +import { VatIdStruct } from '../../types.ts'; +import type { VatId } from '../../types.ts'; + +export const terminateSpec: MethodSpec<'terminate', { vatId: VatId }, null> = { + method: 'terminate', + params: object({ vatId: VatIdStruct }), + result: literal(null), +}; diff --git a/packages/kernel/src/rpc/vat-worker-service/terminateAll.ts b/packages/kernel/src/rpc/vat-worker-service/terminateAll.ts new file mode 100644 index 000000000..982e9eb0e --- /dev/null +++ b/packages/kernel/src/rpc/vat-worker-service/terminateAll.ts @@ -0,0 +1,10 @@ +import { literal } from '@metamask/superstruct'; +import type { Json } from '@metamask/utils'; +import type { MethodSpec } from '@ocap/rpc-methods'; +import { EmptyJsonArray } from '@ocap/utils'; + +export const terminateAllSpec: MethodSpec<'terminateAll', Json[], null> = { + method: 'terminateAll', + params: EmptyJsonArray, + result: literal(null), +}; diff --git a/packages/kernel/src/types.ts b/packages/kernel/src/types.ts index e1a6135a6..56732edaa 100644 --- a/packages/kernel/src/types.ts +++ b/packages/kernel/src/types.ts @@ -199,7 +199,7 @@ export const VatMessageIdStruct = define( isVatMessageId, ); -export type VatWorkerService = { +export type VatWorkerManager = { /** * Launch a new worker with a specific vat id. * diff --git a/packages/kernel/tsconfig.build.json b/packages/kernel/tsconfig.build.json index 4944f89ce..dab88e388 100644 --- a/packages/kernel/tsconfig.build.json +++ b/packages/kernel/tsconfig.build.json @@ -9,6 +9,7 @@ }, "references": [ { "path": "../errors/tsconfig.build.json" }, + { "path": "../rpc-methods/tsconfig.build.json" }, { "path": "../streams/tsconfig.build.json" }, { "path": "../store/tsconfig.build.json" }, { "path": "../utils/tsconfig.build.json" } diff --git a/packages/kernel/tsconfig.json b/packages/kernel/tsconfig.json index 3a6f0cb5c..88083b5f3 100644 --- a/packages/kernel/tsconfig.json +++ b/packages/kernel/tsconfig.json @@ -5,6 +5,7 @@ }, "references": [ { "path": "../errors" }, + { "path": "../rpc-methods" }, { "path": "../store" }, { "path": "../streams" }, { "path": "../utils" }, diff --git a/packages/nodejs/src/index.ts b/packages/nodejs/src/index.ts index f20a4d8bc..8e86be70b 100644 --- a/packages/nodejs/src/index.ts +++ b/packages/nodejs/src/index.ts @@ -1,2 +1,2 @@ -export { NodejsVatWorkerService } from './kernel/VatWorkerService.ts'; +export { NodejsVatWorkerManager } from './kernel/VatWorkerManager.ts'; export { makeKernel } from './kernel/make-kernel.ts'; diff --git a/packages/nodejs/src/kernel/VatWorkerService.test.ts b/packages/nodejs/src/kernel/VatWorkerManager.test.ts similarity index 84% rename from packages/nodejs/src/kernel/VatWorkerService.test.ts rename to packages/nodejs/src/kernel/VatWorkerManager.test.ts index a2dc5f113..137fdad79 100644 --- a/packages/nodejs/src/kernel/VatWorkerService.test.ts +++ b/packages/nodejs/src/kernel/VatWorkerManager.test.ts @@ -4,7 +4,7 @@ import type { VatId } from '@ocap/kernel'; import { makeCounter } from '@ocap/utils'; import { describe, expect, it, vi } from 'vitest'; -import { NodejsVatWorkerService } from './VatWorkerService.ts'; +import { NodejsVatWorkerManager } from './VatWorkerManager.ts'; const mocks = vi.hoisted(() => ({ worker: { @@ -27,10 +27,10 @@ vi.mock('node:worker_threads', () => ({ Worker: vi.fn(() => mocks.worker), })); -describe('NodejsVatWorkerService', () => { +describe('NodejsVatWorkerManager', () => { it('constructs an instance without any arguments', () => { - const instance = new NodejsVatWorkerService({}); - expect(instance).toBeInstanceOf(NodejsVatWorkerService); + const instance = new NodejsVatWorkerManager({}); + expect(instance).toBeInstanceOf(NodejsVatWorkerManager); }); const workerFilePath = 'unused'; @@ -39,7 +39,7 @@ describe('NodejsVatWorkerService', () => { describe('launch', () => { it('creates a NodeWorker and returns a NodeWorkerDuplexStream', async () => { - const service = new NodejsVatWorkerService({ + const service = new NodejsVatWorkerManager({ workerFilePath, }); const testVatId: VatId = getTestVatId(); @@ -51,7 +51,7 @@ describe('NodejsVatWorkerService', () => { it('rejects if synchronize fails', async () => { const rejected = 'test-reject-value'; mocks.stream.synchronize.mockRejectedValue(rejected); - const service = new NodejsVatWorkerService({ workerFilePath }); + const service = new NodejsVatWorkerManager({ workerFilePath }); const testVatId: VatId = getTestVatId(); await expect(async () => await service.launch(testVatId)).rejects.toThrow( rejected, @@ -61,7 +61,7 @@ describe('NodejsVatWorkerService', () => { describe('terminate', () => { it('terminates the target vat', async () => { - const service = new NodejsVatWorkerService({ + const service = new NodejsVatWorkerManager({ workerFilePath, }); const testVatId: VatId = getTestVatId(); @@ -74,7 +74,7 @@ describe('NodejsVatWorkerService', () => { }); it('throws when terminating an unknown vat', async () => { - const service = new NodejsVatWorkerService({ + const service = new NodejsVatWorkerManager({ workerFilePath, }); const testVatId: VatId = getTestVatId(); @@ -87,7 +87,7 @@ describe('NodejsVatWorkerService', () => { describe('terminateAll', () => { it('terminates all vats', async () => { - const service = new NodejsVatWorkerService({ + const service = new NodejsVatWorkerManager({ workerFilePath, }); const vatIds: VatId[] = [getTestVatId(), getTestVatId(), getTestVatId()]; diff --git a/packages/nodejs/src/kernel/VatWorkerService.ts b/packages/nodejs/src/kernel/VatWorkerManager.ts similarity index 96% rename from packages/nodejs/src/kernel/VatWorkerService.ts rename to packages/nodejs/src/kernel/VatWorkerManager.ts index 6ddcecc7c..825a92708 100644 --- a/packages/nodejs/src/kernel/VatWorkerService.ts +++ b/packages/nodejs/src/kernel/VatWorkerManager.ts @@ -1,7 +1,7 @@ import { makePromiseKit } from '@endo/promise-kit'; import { isVatCommandReply } from '@ocap/kernel'; import type { - VatWorkerService, + VatWorkerManager, VatId, VatCommand, VatCommandReply, @@ -19,7 +19,7 @@ const DEFAULT_WORKER_FILE = new URL( import.meta.url, ).pathname; -export class NodejsVatWorkerService implements VatWorkerService { +export class NodejsVatWorkerManager implements VatWorkerManager { readonly #logger: Logger; readonly #workerFilePath: string; @@ -92,4 +92,4 @@ export class NodejsVatWorkerService implements VatWorkerService { } } } -harden(NodejsVatWorkerService); +harden(NodejsVatWorkerManager); diff --git a/packages/nodejs/src/kernel/make-kernel.ts b/packages/nodejs/src/kernel/make-kernel.ts index b2766cea2..646813a93 100644 --- a/packages/nodejs/src/kernel/make-kernel.ts +++ b/packages/nodejs/src/kernel/make-kernel.ts @@ -4,7 +4,7 @@ import { makeSQLKernelDatabase } from '@ocap/store/sqlite/nodejs'; import { NodeWorkerDuplexStream } from '@ocap/streams'; import { MessagePort as NodeMessagePort } from 'node:worker_threads'; -import { NodejsVatWorkerService } from './VatWorkerService.ts'; +import { NodejsVatWorkerManager } from './VatWorkerManager.ts'; /** * The main function for the kernel worker. @@ -31,7 +31,7 @@ export async function makeKernel({ KernelCommand, KernelCommandReply >(port); - const vatWorkerClient = new NodejsVatWorkerService({ workerFilePath }); + const vatWorkerClient = new NodejsVatWorkerManager({ workerFilePath }); // Initialize kernel store. const kernelDatabase = await makeSQLKernelDatabase({ dbFilename }); diff --git a/packages/nodejs/test/e2e/VatWorkerService.test.ts b/packages/nodejs/test/e2e/VatWorkerManager.test.ts similarity index 85% rename from packages/nodejs/test/e2e/VatWorkerService.test.ts rename to packages/nodejs/test/e2e/VatWorkerManager.test.ts index 832613f45..aac1d046d 100644 --- a/packages/nodejs/test/e2e/VatWorkerService.test.ts +++ b/packages/nodejs/test/e2e/VatWorkerManager.test.ts @@ -5,17 +5,17 @@ import { NodeWorkerDuplexStream } from '@ocap/streams'; import { makeCounter } from '@ocap/utils'; import { describe, expect, it, vi } from 'vitest'; -import { NodejsVatWorkerService } from '../../src/kernel/VatWorkerService.ts'; +import { NodejsVatWorkerManager } from '../../src/kernel/VatWorkerManager.ts'; import { getTestWorkerFile } from '../get-test-worker.ts'; -describe('NodejsVatWorkerService', () => { +describe('NodejsVatWorkerManager', () => { const testWorkerFile = getTestWorkerFile('stream-sync'); const vatIdCounter = makeCounter(); const getTestVatId = (): VatId => `v${vatIdCounter()}`; describe('launch', () => { it('creates a NodeWorker and returns a NodeWorkerDuplexStream', async () => { - const service = new NodejsVatWorkerService({ + const service = new NodejsVatWorkerManager({ workerFilePath: testWorkerFile, }); const testVatId: VatId = getTestVatId(); @@ -33,8 +33,8 @@ describe('NodejsVatWorkerService', () => { })), })); vi.resetModules(); - const NVWS = (await import('../../src/kernel/VatWorkerService.ts')) - .NodejsVatWorkerService; + const NVWS = (await import('../../src/kernel/VatWorkerManager.ts')) + .NodejsVatWorkerManager; const service = new NVWS({ workerFilePath: testWorkerFile }); const testVatId: VatId = getTestVatId(); @@ -46,7 +46,7 @@ describe('NodejsVatWorkerService', () => { describe('terminate', () => { it('terminates the target vat', async () => { - const service = new NodejsVatWorkerService({ + const service = new NodejsVatWorkerManager({ workerFilePath: testWorkerFile, }); const testVatId: VatId = getTestVatId(); @@ -59,7 +59,7 @@ describe('NodejsVatWorkerService', () => { }); it('throws when terminating an unknown vat', async () => { - const service = new NodejsVatWorkerService({ + const service = new NodejsVatWorkerManager({ workerFilePath: testWorkerFile, }); const testVatId: VatId = getTestVatId(); @@ -72,7 +72,7 @@ describe('NodejsVatWorkerService', () => { describe('terminateAll', () => { it('terminates all vats', async () => { - const service = new NodejsVatWorkerService({ + const service = new NodejsVatWorkerManager({ workerFilePath: testWorkerFile, }); const vatIds: VatId[] = [getTestVatId(), getTestVatId(), getTestVatId()]; diff --git a/packages/nodejs/test/e2e/vat-worker.test.ts b/packages/nodejs/test/e2e/vat-worker.test.ts index 95f507fd9..88ed9ce61 100644 --- a/packages/nodejs/test/e2e/vat-worker.test.ts +++ b/packages/nodejs/test/e2e/vat-worker.test.ts @@ -10,7 +10,7 @@ import { getTestWorkerFile } from '../get-test-worker.ts'; const { makePromiseKit } = makePromiseKitMock(); -describe('NodejsVatWorkerService', () => { +describe('NodejsVatWorkerManager', () => { let testWorkerFile: string; const vatIdCounter = makeCounter(); const getTestVatId = (): VatId => `v${vatIdCounter()}`; diff --git a/packages/rpc-methods/src/RpcClient.test.ts b/packages/rpc-methods/src/RpcClient.test.ts index c07f4d6ea..f9ef9f96d 100644 --- a/packages/rpc-methods/src/RpcClient.test.ts +++ b/packages/rpc-methods/src/RpcClient.test.ts @@ -1,4 +1,5 @@ import { jsonrpc2 } from '@metamask/utils'; +import { makeLogger } from '@ocap/utils'; import { describe, it, vi, expect } from 'vitest'; import { RpcClient } from './RpcClient.ts'; @@ -17,15 +18,15 @@ describe('RpcClient', () => { const sendMessage = vi.fn(); const client = new RpcClient(getMethods(), sendMessage, 'test'); const resultP = client.call('method1', ['test']); - client.handleResponse('test:1', { + client.handleResponse('test1', { jsonrpc: jsonrpc2, - id: 'test:1', + id: 'test1', result: null, }); expect(sendMessage).toHaveBeenCalledWith({ jsonrpc: jsonrpc2, - id: 'test:1', + id: 'test1', method: 'method1', params: ['test'], }); @@ -35,9 +36,9 @@ describe('RpcClient', () => { it('should throw an error for error responses', async () => { const client = new RpcClient(getMethods(), vi.fn(), 'test'); const resultP = client.call('method1', ['test']); - client.handleResponse('test:1', { + client.handleResponse('test1', { jsonrpc: jsonrpc2, - id: 'test:1', + id: 'test1', error: { code: -32000, message: 'test error', @@ -50,9 +51,9 @@ describe('RpcClient', () => { it('should throw an error for invalid results', async () => { const client = new RpcClient(getMethods(), vi.fn(), 'test'); const resultP = client.call('method1', ['test']); - client.handleResponse('test:1', { + client.handleResponse('test1', { jsonrpc: jsonrpc2, - id: 'test:1', + id: 'test1', result: 42, }); await expect(resultP).rejects.toThrow( @@ -63,18 +64,34 @@ describe('RpcClient', () => { it('should throw an error for invalid responses', async () => { const client = new RpcClient(getMethods(), vi.fn(), 'test'); const resultP = client.call('method1', ['test']); - client.handleResponse('test:1', 'invalid'); + client.handleResponse('test1', 'invalid'); await expect(resultP).rejects.toThrow('Invalid JSON-RPC response:'); }); }); + describe('callAndGetId', () => { + it('should call a method and return the id', async () => { + const client = new RpcClient(getMethods(), vi.fn(), 'test'); + const callP = client.callAndGetId('method1', ['test']); + client.handleResponse('test1', { + jsonrpc: jsonrpc2, + id: 'test1', + result: null, + }); + const [id, result] = await callP; + expect(id).toBe('test1'); + expect(result).toBeNull(); + }); + }); + describe('handleResponse', () => { it('should log an error if the message id is not found', () => { - const client = new RpcClient(getMethods(), vi.fn(), 'test'); - const logError = vi.spyOn(console, 'error'); - client.handleResponse('test:1', 'test'); + const logger = makeLogger('[test]'); + const client = new RpcClient(getMethods(), vi.fn(), 'test', logger); + const logError = vi.spyOn(logger, 'error'); + client.handleResponse('test1', 'test'); expect(logError).toHaveBeenCalledWith( - 'No unresolved message with id "test:1".', + 'Received response with unexpected id "test1".', ); }); }); diff --git a/packages/rpc-methods/src/RpcClient.ts b/packages/rpc-methods/src/RpcClient.ts index 7f4aee3b1..0128611c4 100644 --- a/packages/rpc-methods/src/RpcClient.ts +++ b/packages/rpc-methods/src/RpcClient.ts @@ -2,8 +2,8 @@ import { makePromiseKit } from '@endo/promise-kit'; import { assert as assertStruct } from '@metamask/superstruct'; import { isJsonRpcFailure, isJsonRpcSuccess } from '@metamask/utils'; import type { JsonRpcRequest, JsonRpcSuccess } from '@metamask/utils'; -import { makeCounter, stringify } from '@ocap/utils'; -import type { PromiseCallbacks } from '@ocap/utils'; +import { makeCounter, makeLogger, stringify } from '@ocap/utils'; +import type { Logger, PromiseCallbacks } from '@ocap/utils'; import type { MethodSpec, @@ -31,17 +31,25 @@ export class RpcClient< readonly #sendMessage: SendMessage; - constructor(methods: Methods, sendMessage: SendMessage, prefix: string) { + readonly #logger: Logger; + + constructor( + methods: Methods, + sendMessage: SendMessage, + prefix: string, + logger: Logger = makeLogger('[rpc client]'), + ) { this.#methods = methods; this.#sendMessage = sendMessage; this.#prefix = prefix; + this.#logger = logger; } - async call>( + async #call>( method: Method, params: ExtractParams, + id: string, ): Promise> { - const id = this.#nextMessageId(); const response = await this.#createMessage(id, { id, jsonrpc: '2.0', @@ -53,12 +61,41 @@ export class RpcClient< return response.result; } + /** + * Calls a JSON-RPC method and returns the result. + * + * @param method - The method to call. + * @param params - The parameters to pass to the method. + * @returns A promise that resolves to the result. + */ + async call>( + method: Method, + params: ExtractParams, + ): Promise> { + return await this.#call(method, params, this.#nextMessageId()); + } + + /** + * Calls a JSON-RPC method and returns the message id and the result. + * + * @param method - The method to call. + * @param params - The parameters to pass to the method. + * @returns A promise that resolves to a tuple of the message id and the result. + */ + async callAndGetId>( + method: Method, + params: ExtractParams, + ): Promise<[string, ExtractResult]> { + const id = this.#nextMessageId(); + return [id, await this.#call(method, params, id)]; + } + #assertResult>( method: Method, result: unknown, ): asserts result is ExtractResult { try { - // @ts-expect-error - TODO: For unknown reasons, TypeScript fails to recognize that + // @ts-expect-error: For unknown reasons, TypeScript fails to recognize that // `Method` must be a key of `this.#methods`. assertStruct(result, this.#methods[method].result); } catch (error) { @@ -81,10 +118,18 @@ export class RpcClient< return promise; } + /** + * Handles a JSON-RPC response to a previously made method call. + * + * @param messageId - The id of the message to handle. + * @param response - The response to handle. + */ handleResponse(messageId: string, response: unknown): void { const requestCallbacks = this.#unresolvedMessages.get(messageId); if (requestCallbacks === undefined) { - console.error(`No unresolved message with id "${messageId}".`); + this.#logger.error( + `Received response with unexpected id "${messageId}".`, + ); } else { this.#unresolvedMessages.delete(messageId); if (isJsonRpcSuccess(response)) { @@ -99,6 +144,11 @@ export class RpcClient< } } + /** + * Rejects all unresolved messages with an error. + * + * @param error - The error to reject the messages with. + */ rejectAll(error: Error): void { for (const [messageId, promiseCallback] of this.#unresolvedMessages) { promiseCallback?.reject(error); @@ -107,6 +157,6 @@ export class RpcClient< } #nextMessageId(): string { - return `${this.#prefix}:${this.#messageCounter()}`; + return `${this.#prefix}${this.#messageCounter()}`; } } diff --git a/packages/rpc-methods/src/RpcService.ts b/packages/rpc-methods/src/RpcService.ts index a5cc86916..c5336ec14 100644 --- a/packages/rpc-methods/src/RpcService.ts +++ b/packages/rpc-methods/src/RpcService.ts @@ -1,6 +1,5 @@ import { rpcErrors } from '@metamask/rpc-errors'; import type { Struct } from '@metamask/superstruct'; -import { assert as assertStruct } from '@metamask/superstruct'; import { hasProperty } from '@metamask/utils'; import type { Json, JsonRpcParams } from '@metamask/utils'; @@ -127,7 +126,7 @@ function assertParams( struct: Struct, ): asserts params is Params { try { - assertStruct(params, struct); + struct.assert(params); } catch (error) { throw new Error(`Invalid params: ${(error as Error).message}`); } diff --git a/packages/rpc-methods/src/types.ts b/packages/rpc-methods/src/types.ts index 83418e24b..1d924627a 100644 --- a/packages/rpc-methods/src/types.ts +++ b/packages/rpc-methods/src/types.ts @@ -23,10 +23,9 @@ export type MethodSpec< // eslint-disable-next-line @typescript-eslint/no-explicit-any type SpecConstraint = MethodSpec; -export type MethodSpecRecord = Record< - Methods['method'], - Methods ->; +export type MethodSpecRecord = { + [Key in Methods['method']]: Extract; +}; type SpecRecordConstraint = MethodSpecRecord; @@ -72,3 +71,11 @@ export type Handler< hooks: { [Key in keyof Hooks]: true }; implementation: HandlerFunction; }; + +// `any` can safely be used in constraints. +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type HandlerConstraint = Handler; + +export type HandlerRecord = { + [Key in Handlers['method']]: Extract; +}; diff --git a/vitest.config.ts b/vitest.config.ts index 78cea2223..e7c8ff542 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -76,22 +76,22 @@ export default defineConfig({ lines: 100, }, 'packages/errors/**': { - statements: 100, - functions: 100, + statements: 98.63, + functions: 95.23, branches: 92, - lines: 100, + lines: 98.63, }, 'packages/extension/**': { - statements: 80.33, - functions: 82.44, - branches: 77.2, - lines: 80.33, + statements: 79.21, + functions: 81.96, + branches: 75, + lines: 79.24, }, 'packages/kernel/**': { - statements: 86.44, - functions: 92.79, - branches: 71.18, - lines: 86.41, + statements: 86.33, + functions: 92.27, + branches: 70.62, + lines: 86.3, }, 'packages/nodejs/**': { statements: 72.91, diff --git a/yarn.lock b/yarn.lock index c3e3d2d42..96d48b165 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1954,12 +1954,12 @@ __metadata: "@arethetypeswrong/cli": "npm:^0.17.4" "@endo/eventual-send": "npm:^1.3.1" "@endo/marshal": "npm:^1.6.4" - "@endo/promise-kit": "npm:^1.1.10" "@metamask/auto-changelog": "npm:^5.0.1" "@metamask/eslint-config": "npm:^14.0.0" "@metamask/eslint-config-nodejs": "npm:^14.0.0" "@metamask/eslint-config-typescript": "npm:^14.0.0" "@metamask/json-rpc-engine": "npm:^10.0.3" + "@metamask/rpc-errors": "npm:^7.0.2" "@metamask/snaps-utils": "npm:^9.1.0" "@metamask/superstruct": "npm:^3.2.0" "@metamask/utils": "npm:^11.4.0" @@ -2084,6 +2084,7 @@ __metadata: "@metamask/utils": "npm:^11.4.0" "@ocap/cli": "workspace:^" "@ocap/errors": "workspace:^" + "@ocap/rpc-methods": "workspace:^" "@ocap/store": "workspace:^" "@ocap/streams": "workspace:^" "@ocap/test-utils": "workspace:^"