Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions packages/extension/src/background.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { isJsonRpcResponse } from '@metamask/utils';
import type { Json } from '@metamask/utils';
import type { JsonRpcResponse } from '@metamask/utils';
import { kernelMethodSpecs } from '@ocap/kernel/rpc';
import { Logger } from '@ocap/logger';
import { RpcClient } from '@ocap/rpc-methods';
import { ChromeRuntimeDuplexStream } from '@ocap/streams/browser';
import { delay } from '@ocap/utils';
import type { JsonRpcCall } from '@ocap/utils';

const OFFSCREEN_DOCUMENT_PATH = '/offscreen.html';

Expand All @@ -25,11 +26,10 @@ async function main(): Promise<void> {
// Without this delay, sending messages via the chrome.runtime API can fail.
await delay(50);

const offscreenStream = await ChromeRuntimeDuplexStream.make(
chrome.runtime,
'background',
'offscreen',
);
const offscreenStream = await ChromeRuntimeDuplexStream.make<
JsonRpcResponse,
JsonRpcCall
>(chrome.runtime, 'background', 'offscreen', isJsonRpcResponse);

const rpcClient = new RpcClient(
kernelMethodSpecs,
Expand All @@ -50,7 +50,8 @@ async function main(): Promise<void> {
value: ping,
},
sendMessage: {
value: async (message: Json) => await offscreenStream.write(message),
value: async (message: JsonRpcCall) =>
await offscreenStream.write(message),
},
});
harden(globalThis.kernel);
Expand All @@ -60,14 +61,8 @@ async function main(): Promise<void> {
ping().catch(logger.error);
});

// Handle replies from the offscreen document
for await (const message of offscreenStream) {
if (!isJsonRpcResponse(message)) {
logger.error('Background received unexpected message', message);
continue;
}
rpcClient.handleResponse(message.id as string, message);
}

await offscreenStream.drain(async (message) =>
rpcClient.handleResponse(message.id as string, message),
);
throw new Error('Offscreen connection closed unexpectedly');
}
1 change: 1 addition & 0 deletions packages/streams/src/browser/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ describe('index', () => {
'PostMessageWriter',
'initializeMessageChannel',
'receiveMessagePort',
'split',
]);
});
});
1 change: 1 addition & 0 deletions packages/streams/src/browser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ export type {
PostMessageEnvelope,
PostMessageTarget,
} from './PostMessageStream.ts';
export { split } from '../split.ts';
1 change: 1 addition & 0 deletions packages/streams/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ describe('index', () => {
'NodeWorkerDuplexStream',
'NodeWorkerReader',
'NodeWorkerWriter',
'split',
]);
});
});
1 change: 1 addition & 0 deletions packages/streams/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export {
NodeWorkerWriter,
NodeWorkerDuplexStream,
} from './node/NodeWorkerStream.ts';
export { split } from './split.ts';
199 changes: 199 additions & 0 deletions packages/streams/src/split.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import { describe, expect, it, vi } from 'vitest';

import { split } from './split.ts';
import { TestDuplexStream } from '../test/stream-mocks.ts';

describe('split', () => {
it('should forward values to the correct sub-stream', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
await stream.receiveInput('a');
await stream.receiveInput('b');
await stream.end();

expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
expect(await streamA.next()).toStrictEqual({ done: false, value: 'a' });
expect(await streamB.next()).toStrictEqual({ done: false, value: 'b' });
expect(await streamA.next()).toStrictEqual({
done: true,
value: undefined,
});
expect(await streamB.next()).toStrictEqual({
done: true,
value: undefined,
});
});

it('should end the sub-streams when the parent stream ends', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
await stream.end();

expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
expect(await streamA.next()).toStrictEqual({
done: true,
value: undefined,
});
expect(await streamB.next()).toStrictEqual({
done: true,
value: undefined,
});
});

it('should end all streams when a sub-stream ends', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
await streamA.return();

expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
expect(await streamA.next()).toStrictEqual({
done: true,
value: undefined,
});
expect(await streamB.next()).toStrictEqual({
done: true,
value: undefined,
});
});

it('should forward errors', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
const nextA = streamA.next();
const nextB = streamB.next();
await stream.throw(new Error('test'));

expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
await expect(nextA).rejects.toThrow('test');
await expect(nextB).rejects.toThrow('test');
});

it('should end all streams when a sub-stream errors', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
const nextA = streamA.next();
const nextB = streamB.next();
await streamA.throw(new Error('test'));

// We can't observe the error from the parent stream because it's being read
// in the body of split().
expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
await expect(nextA).rejects.toThrow('test');
await expect(nextB).rejects.toThrow('test');
});

it('should error if no predicates match', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
const nextA = streamA.next();
const nextB = streamB.next();
await stream.receiveInput('c');

expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
await expect(nextA).rejects.toThrow(
'Failed to match any predicate for value: "c"',
);
await expect(nextB).rejects.toThrow(
'Failed to match any predicate for value: "c"',
);
});

it('should allow writing to the sub-streams', async () => {
const dispatch = vi.fn();
const stream = await TestDuplexStream.make<string>(dispatch);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
await streamA.write('a');
await streamB.write('b');
await stream.end();

// The important thing is that the calls are sequential
expect(dispatch).toHaveBeenNthCalledWith(3, 'a');
expect(dispatch).toHaveBeenNthCalledWith(4, 'b');
});

it('should allow draining sub-streams', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
await streamA.write('a');
await streamB.write('b');
await stream.end();

await streamA.drain((value) => {
expect(value).toBe('a');
});
await streamB.drain((value) => {
expect(value).toBe('b');
});
});

it('should allow iterating over the sub-streams', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
await streamA.write('a');
await streamB.write('b');
await stream.end();

for await (const value of streamA) {
expect(value).toBe('a');
}
for await (const value of streamB) {
expect(value).toBe('b');
}
});

it('should allow piping the sub-streams to a sink', async () => {
const stream = await TestDuplexStream.make<string>(() => undefined);
const sinkDispatch = vi.fn();
const sink = await TestDuplexStream.make<string>(sinkDispatch);
const [streamA, streamB] = split(
stream,
(value) => value === 'a',
(value) => value === 'b',
);
await stream.receiveInput('a');
await stream.receiveInput('b');
await stream.end();
await streamA.pipe(sink);
await streamB.pipe(sink);
await sink.end();

expect(sinkDispatch).toHaveBeenNthCalledWith(3, 'a');
expect(sinkDispatch).toHaveBeenNthCalledWith(4, 'b');
});
});
Loading
Loading