Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 31792ba

Browse files
committed
Return byte streams when tee()ing byte streams, closes #317
Closes #375
1 parent 72e5edc commit 31792ba

File tree

5 files changed

+141
-76
lines changed

5 files changed

+141
-76
lines changed

packages/core/src/standards/http.ts

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { Blob } from "buffer";
55
import type EventEmitter from "events";
66
import http from "http";
77
import {
8-
ReadableByteStreamController,
98
ReadableStream,
109
ReadableStreamDefaultReader,
1110
UnderlyingByteSource,
@@ -55,7 +54,7 @@ import {
5554
buildNotBufferSourceError,
5655
isBufferSource,
5756
} from "./helpers";
58-
import { kContentLength } from "./streams";
57+
import { _isByteStream, kContentLength } from "./streams";
5958

6059
// We need these for making Request's Headers immutable
6160
const fetchSymbols: {
@@ -162,24 +161,6 @@ const kInputGated = Symbol("kInputGated");
162161
const kFormDataFiles = Symbol("kFormDataFiles");
163162
const kCloned = Symbol("kCloned");
164163

165-
/** @internal */
166-
export function _isByteStream(
167-
stream: ReadableStream
168-
): stream is ReadableStream<Uint8Array> {
169-
// Try to determine if stream is a byte stream by inspecting its state.
170-
// It doesn't matter too much if the internal representation changes in the
171-
// future: this code shouldn't throw. Currently we only use this as an
172-
// optimisation to avoid creating a byte stream if it's already one.
173-
for (const symbol of Object.getOwnPropertySymbols(stream)) {
174-
if (symbol.description === "kState") {
175-
// @ts-expect-error symbol properties are not included in type definitions
176-
const controller = stream[symbol].controller;
177-
return controller instanceof ReadableByteStreamController;
178-
}
179-
}
180-
return false;
181-
}
182-
183164
const enumerableBodyKeys: (keyof Body<any>)[] = ["body", "bodyUsed", "headers"];
184165
export class Body<Inner extends BaseRequest | BaseResponse> {
185166
/** @internal */

packages/core/src/standards/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ export * from "./event";
77
export {
88
_headersFromIncomingRequest,
99
_kInner,
10-
_isByteStream,
1110
Body,
1211
withInputGating,
1312
withStringFormDataFiles,
@@ -48,6 +47,7 @@ export {
4847
FixedLengthStream,
4948
CompressionStream,
5049
DecompressionStream,
50+
_isByteStream,
5151
} from "./streams";
5252
export type { ArrayBufferViewConstructor } from "./streams";
5353
export * from "./navigator";

packages/core/src/standards/streams.ts

Lines changed: 75 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Transform } from "stream";
22
import {
3+
ReadableByteStreamController,
34
ReadableStream,
45
ReadableStreamBYOBReadResult,
56
ReadableStreamBYOBReader,
@@ -15,6 +16,57 @@ import {
1516
isBufferSource,
1617
} from "./helpers";
1718

19+
/** @internal */
20+
export function _isByteStream(
21+
stream: ReadableStream
22+
): stream is ReadableStream<Uint8Array> {
23+
// Try to determine if stream is a byte stream by inspecting its state.
24+
// It doesn't matter too much if the internal representation changes in the
25+
// future: this code shouldn't throw. Currently we only use this as an
26+
// optimisation to avoid creating a byte stream if it's already one.
27+
for (const symbol of Object.getOwnPropertySymbols(stream)) {
28+
if (symbol.description === "kState") {
29+
// @ts-expect-error symbol properties are not included in type definitions
30+
const controller = stream[symbol].controller;
31+
return controller instanceof ReadableByteStreamController;
32+
}
33+
}
34+
return false;
35+
}
36+
37+
function convertToByteStream(
38+
stream: ReadableStream<Uint8Array>,
39+
clone = false
40+
) {
41+
let reader: ReadableStreamDefaultReader<Uint8Array>;
42+
return new ReadableStream({
43+
type: "bytes",
44+
start() {
45+
reader = stream.getReader();
46+
},
47+
async pull(controller) {
48+
let result = await reader.read();
49+
while (!result.done && result.value.byteLength === 0) {
50+
result = await reader.read();
51+
}
52+
if (result.done) {
53+
queueMicrotask(() => {
54+
controller.close();
55+
// Not documented in MDN but if there's an ongoing request that's
56+
// waiting, we need to tell it that there were 0 bytes delivered so
57+
// that it unblocks and notices the end of stream.
58+
// @ts-expect-error `byobRequest` has type `undefined` in `@types/node`
59+
controller.byobRequest?.respond(0);
60+
});
61+
} else if (result.value.byteLength > 0) {
62+
if (clone) result.value = result.value.slice();
63+
controller.enqueue(result.value);
64+
}
65+
},
66+
cancel: (reason) => reader.cancel(reason),
67+
});
68+
}
69+
1870
export type ArrayBufferViewConstructor =
1971
| typeof Int8Array
2072
| typeof Uint8Array
@@ -83,6 +135,28 @@ ReadableStreamBYOBReader.prototype.readAtLeast = async function <
83135
return { value: value as any, done };
84136
};
85137

138+
// See comment above about manipulating the prototype.
139+
// Rewrite tee() to return byte streams when tee()ing byte streams:
140+
// https://github.com/cloudflare/miniflare/issues/317
141+
const originalTee = ReadableStream.prototype.tee;
142+
ReadableStream.prototype.tee = function () {
143+
if (!(this instanceof ReadableStream)) {
144+
throw new TypeError("Illegal invocation");
145+
}
146+
if (_isByteStream(this)) {
147+
const [stream1, stream2] = originalTee.call(this);
148+
// We need to clone chunks here, as either of the tee()ed streams might be
149+
// passed to `new Response()`, which will detach array buffers when reading:
150+
// https://github.com/cloudflare/miniflare/issues/375
151+
return [
152+
convertToByteStream(stream1, true /* clone */),
153+
convertToByteStream(stream2, true /* clone */),
154+
];
155+
} else {
156+
return originalTee.call(this);
157+
}
158+
};
159+
86160
const kTransformHook = Symbol("kTransformHook");
87161
const kFlushHook = Symbol("kFlushHook");
88162

@@ -119,41 +193,7 @@ export class IdentityTransformStream extends TransformStream<
119193

120194
get readable() {
121195
if (this.#readableByteStream !== undefined) return this.#readableByteStream;
122-
const readable = super.readable;
123-
let reader: ReadableStreamDefaultReader;
124-
return (this.#readableByteStream = new ReadableStream({
125-
type: "bytes",
126-
start() {
127-
reader = readable.getReader();
128-
},
129-
async pull(controller) {
130-
let { done, value } = await reader.read();
131-
// Make sure we eventually call a `controller` method, either because
132-
// we're done, or there's data to enqueue
133-
while (!done && value.byteLength === 0) {
134-
const result = await reader.read();
135-
done = result.done;
136-
value = result.value;
137-
}
138-
if (done) {
139-
queueMicrotask(() => {
140-
controller.close();
141-
// Not documented in MDN but if there's an ongoing request that's waiting,
142-
// we need to tell it that there were 0 bytes delivered so that it unblocks
143-
// and notices the end of stream.
144-
// @ts-expect-error `byobRequest` has type `undefined` in `@types/node`
145-
controller.byobRequest?.respond(0);
146-
});
147-
} else if (value.byteLength > 0) {
148-
// Ensure chunk is non-empty before enqueuing:
149-
// https://github.com/cloudflare/miniflare/issues/374
150-
controller.enqueue(value);
151-
}
152-
},
153-
cancel() {
154-
return reader.cancel();
155-
},
156-
}));
196+
return (this.#readableByteStream = convertToByteStream(super.readable));
157197
}
158198
}
159199

packages/core/test/standards/http.spec.ts

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import {
1010
Response,
1111
_getBodyLength,
1212
_getURLList,
13-
_isByteStream,
1413
createCompatFetch,
1514
fetch,
1615
logResponse,
@@ -100,24 +99,6 @@ test("Headers: getAll: returns separated Set-Cookie values", (t) => {
10099
t.deepEqual(headers.getAll("set-CoOkiE"), [cookie1, cookie2, cookie3]);
101100
});
102101

103-
test("_isByteStream: determines if a ReadableStream is a byte stream", (t) => {
104-
const regularStream = new ReadableStream({
105-
pull(controller) {
106-
controller.enqueue(new Uint8Array([1, 2, 3]));
107-
controller.close();
108-
},
109-
});
110-
const byteStream = new ReadableStream({
111-
type: "bytes",
112-
pull(controller) {
113-
controller.enqueue(new Uint8Array([1, 2, 3]));
114-
controller.close();
115-
},
116-
});
117-
t.false(_isByteStream(regularStream));
118-
t.true(_isByteStream(byteStream));
119-
});
120-
121102
// These tests also implicitly test withInputGating
122103
test("Body: body isn't input gated by default", async (t) => {
123104
const inputGate = new InputGate();

packages/core/test/standards/streams.spec.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import assert from "assert";
2-
import { arrayBuffer } from "stream/consumers";
2+
import { arrayBuffer, text } from "stream/consumers";
33
import {
44
ReadableStream,
55
ReadableStreamBYOBReadResult,
@@ -13,6 +13,7 @@ import {
1313
IdentityTransformStream,
1414
Request,
1515
Response,
16+
_isByteStream,
1617
} from "@miniflare/core";
1718
import { utf8Decode, utf8Encode } from "@miniflare/shared-test";
1819
import test, { Macro, ThrowsExpectation } from "ava";
@@ -56,6 +57,24 @@ async function* byobReadAtLeast<Ctor extends ArrayBufferViewConstructor>(
5657
}
5758
}
5859

60+
test("_isByteStream: determines if a ReadableStream is a byte stream", (t) => {
61+
const regularStream = new ReadableStream({
62+
pull(controller) {
63+
controller.enqueue(new Uint8Array([1, 2, 3]));
64+
controller.close();
65+
},
66+
});
67+
const byteStream = new ReadableStream({
68+
type: "bytes",
69+
pull(controller) {
70+
controller.enqueue(new Uint8Array([1, 2, 3]));
71+
controller.close();
72+
},
73+
});
74+
t.false(_isByteStream(regularStream));
75+
t.true(_isByteStream(byteStream));
76+
});
77+
5978
test("ReadableStreamBYOBReader: readAtLeast: reads at least n bytes", async (t) => {
6079
const stream = chunkedStream([[1, 2, 3], [4], [5, 6]]);
6180
const reads = byobReadAtLeast(stream, 4, 8, Uint8Array);
@@ -189,6 +208,50 @@ test("ReadableStreamBYOBReader: readAtLeast: throws if minimum number of bytes e
189208
});
190209
});
191210

211+
test("ReadableStream: tee: returns byte streams when teeing byte stream", async (t) => {
212+
// https://github.com/cloudflare/miniflare/issues/317
213+
const stream = chunkedStream([[1, 2, 3]]);
214+
t.true(_isByteStream(stream));
215+
const [stream1, stream2] = stream.tee();
216+
t.true(_isByteStream(stream1));
217+
t.true(_isByteStream(stream2));
218+
219+
// Check detaching array buffers on one stream doesn't affect the other
220+
// (note Response#body will detach chunks as reading)
221+
// https://github.com/cloudflare/miniflare/issues/375
222+
const body1 = await arrayBuffer(new Response(stream1).body as any);
223+
const body2 = await arrayBuffer(new Response(stream2).body as any);
224+
t.deepEqual(new Uint8Array(body1), new Uint8Array([1, 2, 3]));
225+
t.deepEqual(new Uint8Array(body2), new Uint8Array([1, 2, 3]));
226+
});
227+
test("ReadableStream: tee: returns regular stream when teeing regular stream", async (t) => {
228+
const stream = new ReadableStream<string>({
229+
start(controller) {
230+
controller.enqueue("value");
231+
controller.close();
232+
},
233+
});
234+
t.false(_isByteStream(stream));
235+
const [stream1, stream2] = stream.tee();
236+
t.false(_isByteStream(stream1));
237+
t.false(_isByteStream(stream2));
238+
t.is(await text(stream1 as any), "value");
239+
t.is(await text(stream2 as any), "value");
240+
});
241+
test("ReadableStream: tee: throws on illegal invocation", (t) => {
242+
const stream = new ReadableStream<string>({
243+
start(controller) {
244+
controller.close();
245+
},
246+
});
247+
// @ts-expect-error using comma expression to unbind this
248+
// noinspection CommaExpressionJS
249+
t.throws(() => (0, stream.tee)(), {
250+
instanceOf: TypeError,
251+
message: "Illegal invocation",
252+
});
253+
});
254+
192255
const identityMacro: Macro<[TransformStream]> = async (t, stream) => {
193256
const { readable, writable } = stream;
194257
const writer = writable.getWriter();

0 commit comments

Comments
 (0)