Skip to content

Commit 659a47d

Browse files
authored
feat(streams): Add stream splitting (#503)
We removed stream multiplexing in #349 because we no longer had a use for it. With some of the plans we have for our logger, we once again find ourselves in a position where we'd like to differentiate traffic over a single stream. Multiplexing was pretty complex, so we'd prefer not to go back to it if possible. With these requirements in mind, this PR introduces "stream splitting" via the function `split()` exported from `@ocap/streams`. The idea is simple: pass `split()` a duplex stream and a number of type predicates, and receive an array of streams in return. Each stream represents the subset of the parent stream's traffic satisfying the corresponding predicate. Not very efficient for large numbers of channels or complex predicates, but our usage should be simple enough.
1 parent 295c98c commit 659a47d

File tree

8 files changed

+408
-20
lines changed

8 files changed

+408
-20
lines changed

packages/extension/src/background.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { isJsonRpcResponse } from '@metamask/utils';
2-
import type { Json } from '@metamask/utils';
2+
import type { JsonRpcResponse } from '@metamask/utils';
33
import { kernelMethodSpecs } from '@ocap/kernel/rpc';
44
import { Logger } from '@ocap/logger';
55
import { RpcClient } from '@ocap/rpc-methods';
66
import { ChromeRuntimeDuplexStream } from '@ocap/streams/browser';
77
import { delay } from '@ocap/utils';
8+
import type { JsonRpcCall } from '@ocap/utils';
89

910
const OFFSCREEN_DOCUMENT_PATH = '/offscreen.html';
1011

@@ -25,11 +26,10 @@ async function main(): Promise<void> {
2526
// Without this delay, sending messages via the chrome.runtime API can fail.
2627
await delay(50);
2728

28-
const offscreenStream = await ChromeRuntimeDuplexStream.make(
29-
chrome.runtime,
30-
'background',
31-
'offscreen',
32-
);
29+
const offscreenStream = await ChromeRuntimeDuplexStream.make<
30+
JsonRpcResponse,
31+
JsonRpcCall
32+
>(chrome.runtime, 'background', 'offscreen', isJsonRpcResponse);
3333

3434
const rpcClient = new RpcClient(
3535
kernelMethodSpecs,
@@ -50,7 +50,8 @@ async function main(): Promise<void> {
5050
value: ping,
5151
},
5252
sendMessage: {
53-
value: async (message: Json) => await offscreenStream.write(message),
53+
value: async (message: JsonRpcCall) =>
54+
await offscreenStream.write(message),
5455
},
5556
});
5657
harden(globalThis.kernel);
@@ -60,14 +61,8 @@ async function main(): Promise<void> {
6061
ping().catch(logger.error);
6162
});
6263

63-
// Handle replies from the offscreen document
64-
for await (const message of offscreenStream) {
65-
if (!isJsonRpcResponse(message)) {
66-
logger.error('Background received unexpected message', message);
67-
continue;
68-
}
69-
rpcClient.handleResponse(message.id as string, message);
70-
}
71-
64+
await offscreenStream.drain(async (message) =>
65+
rpcClient.handleResponse(message.id as string, message),
66+
);
7267
throw new Error('Offscreen connection closed unexpectedly');
7368
}

packages/streams/src/browser/index.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ describe('index', () => {
1616
'PostMessageWriter',
1717
'initializeMessageChannel',
1818
'receiveMessagePort',
19+
'split',
1920
]);
2021
});
2122
});

packages/streams/src/browser/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ export type {
2323
PostMessageEnvelope,
2424
PostMessageTarget,
2525
} from './PostMessageStream.ts';
26+
export { split } from '../split.ts';

packages/streams/src/index.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ describe('index', () => {
88
'NodeWorkerDuplexStream',
99
'NodeWorkerReader',
1010
'NodeWorkerWriter',
11+
'split',
1112
]);
1213
});
1314
});

packages/streams/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ export {
55
NodeWorkerWriter,
66
NodeWorkerDuplexStream,
77
} from './node/NodeWorkerStream.ts';
8+
export { split } from './split.ts';

packages/streams/src/split.test.ts

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import { describe, expect, it, vi } from 'vitest';
2+
3+
import { split } from './split.ts';
4+
import { TestDuplexStream } from '../test/stream-mocks.ts';
5+
6+
describe('split', () => {
7+
it('should forward values to the correct sub-stream', async () => {
8+
const stream = await TestDuplexStream.make<string>(() => undefined);
9+
const [streamA, streamB] = split(
10+
stream,
11+
(value) => value === 'a',
12+
(value) => value === 'b',
13+
);
14+
await stream.receiveInput('a');
15+
await stream.receiveInput('b');
16+
await stream.end();
17+
18+
expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
19+
expect(await streamA.next()).toStrictEqual({ done: false, value: 'a' });
20+
expect(await streamB.next()).toStrictEqual({ done: false, value: 'b' });
21+
expect(await streamA.next()).toStrictEqual({
22+
done: true,
23+
value: undefined,
24+
});
25+
expect(await streamB.next()).toStrictEqual({
26+
done: true,
27+
value: undefined,
28+
});
29+
});
30+
31+
it('should end the sub-streams when the parent stream ends', async () => {
32+
const stream = await TestDuplexStream.make<string>(() => undefined);
33+
const [streamA, streamB] = split(
34+
stream,
35+
(value) => value === 'a',
36+
(value) => value === 'b',
37+
);
38+
await stream.end();
39+
40+
expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
41+
expect(await streamA.next()).toStrictEqual({
42+
done: true,
43+
value: undefined,
44+
});
45+
expect(await streamB.next()).toStrictEqual({
46+
done: true,
47+
value: undefined,
48+
});
49+
});
50+
51+
it('should end all streams when a sub-stream ends', async () => {
52+
const stream = await TestDuplexStream.make<string>(() => undefined);
53+
const [streamA, streamB] = split(
54+
stream,
55+
(value) => value === 'a',
56+
(value) => value === 'b',
57+
);
58+
await streamA.return();
59+
60+
expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
61+
expect(await streamA.next()).toStrictEqual({
62+
done: true,
63+
value: undefined,
64+
});
65+
expect(await streamB.next()).toStrictEqual({
66+
done: true,
67+
value: undefined,
68+
});
69+
});
70+
71+
it('should forward errors', async () => {
72+
const stream = await TestDuplexStream.make<string>(() => undefined);
73+
const [streamA, streamB] = split(
74+
stream,
75+
(value) => value === 'a',
76+
(value) => value === 'b',
77+
);
78+
const nextA = streamA.next();
79+
const nextB = streamB.next();
80+
await stream.throw(new Error('test'));
81+
82+
expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
83+
await expect(nextA).rejects.toThrow('test');
84+
await expect(nextB).rejects.toThrow('test');
85+
});
86+
87+
it('should end all streams when a sub-stream errors', async () => {
88+
const stream = await TestDuplexStream.make<string>(() => undefined);
89+
const [streamA, streamB] = split(
90+
stream,
91+
(value) => value === 'a',
92+
(value) => value === 'b',
93+
);
94+
const nextA = streamA.next();
95+
const nextB = streamB.next();
96+
await streamA.throw(new Error('test'));
97+
98+
// We can't observe the error from the parent stream because it's being read
99+
// in the body of split().
100+
expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
101+
await expect(nextA).rejects.toThrow('test');
102+
await expect(nextB).rejects.toThrow('test');
103+
});
104+
105+
it('should error if no predicates match', async () => {
106+
const stream = await TestDuplexStream.make<string>(() => undefined);
107+
const [streamA, streamB] = split(
108+
stream,
109+
(value) => value === 'a',
110+
(value) => value === 'b',
111+
);
112+
const nextA = streamA.next();
113+
const nextB = streamB.next();
114+
await stream.receiveInput('c');
115+
116+
expect(await stream.next()).toStrictEqual({ done: true, value: undefined });
117+
await expect(nextA).rejects.toThrow(
118+
'Failed to match any predicate for value: "c"',
119+
);
120+
await expect(nextB).rejects.toThrow(
121+
'Failed to match any predicate for value: "c"',
122+
);
123+
});
124+
125+
it('should allow writing to the sub-streams', async () => {
126+
const dispatch = vi.fn();
127+
const stream = await TestDuplexStream.make<string>(dispatch);
128+
const [streamA, streamB] = split(
129+
stream,
130+
(value) => value === 'a',
131+
(value) => value === 'b',
132+
);
133+
await streamA.write('a');
134+
await streamB.write('b');
135+
await stream.end();
136+
137+
// The important thing is that the calls are sequential
138+
expect(dispatch).toHaveBeenNthCalledWith(3, 'a');
139+
expect(dispatch).toHaveBeenNthCalledWith(4, 'b');
140+
});
141+
142+
it('should allow draining sub-streams', async () => {
143+
const stream = await TestDuplexStream.make<string>(() => undefined);
144+
const [streamA, streamB] = split(
145+
stream,
146+
(value) => value === 'a',
147+
(value) => value === 'b',
148+
);
149+
await streamA.write('a');
150+
await streamB.write('b');
151+
await stream.end();
152+
153+
await streamA.drain((value) => {
154+
expect(value).toBe('a');
155+
});
156+
await streamB.drain((value) => {
157+
expect(value).toBe('b');
158+
});
159+
});
160+
161+
it('should allow iterating over the sub-streams', async () => {
162+
const stream = await TestDuplexStream.make<string>(() => undefined);
163+
const [streamA, streamB] = split(
164+
stream,
165+
(value) => value === 'a',
166+
(value) => value === 'b',
167+
);
168+
await streamA.write('a');
169+
await streamB.write('b');
170+
await stream.end();
171+
172+
for await (const value of streamA) {
173+
expect(value).toBe('a');
174+
}
175+
for await (const value of streamB) {
176+
expect(value).toBe('b');
177+
}
178+
});
179+
180+
it('should allow piping the sub-streams to a sink', async () => {
181+
const stream = await TestDuplexStream.make<string>(() => undefined);
182+
const sinkDispatch = vi.fn();
183+
const sink = await TestDuplexStream.make<string>(sinkDispatch);
184+
const [streamA, streamB] = split(
185+
stream,
186+
(value) => value === 'a',
187+
(value) => value === 'b',
188+
);
189+
await stream.receiveInput('a');
190+
await stream.receiveInput('b');
191+
await stream.end();
192+
await streamA.pipe(sink);
193+
await streamB.pipe(sink);
194+
await sink.end();
195+
196+
expect(sinkDispatch).toHaveBeenNthCalledWith(3, 'a');
197+
expect(sinkDispatch).toHaveBeenNthCalledWith(4, 'b');
198+
});
199+
});

0 commit comments

Comments
 (0)