Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 9c13559

Browse files
authored
Copy known-length when tee() ReadableStreams, closes #506 (#512)
Previously calling `ReadableStream#tee()` would discard the fixed-length from `FixedLengthStream`s and the body-stream marker from `Request`/`Response` streams. This meant attempting to upload `tee()`ed streams as R2 multipart parts would fail. This change updates the `tee()` implementation to copy these across.
1 parent c9acef1 commit 9c13559

File tree

4 files changed

+47
-10
lines changed

4 files changed

+47
-10
lines changed

packages/core/src/standards/http.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import {
5757
import {
5858
_isByteStream,
5959
convertToRegularStream,
60+
kBodyStreamBrand,
6061
kContentLength,
6162
} from "./streams";
6263

@@ -162,7 +163,6 @@ export function _headersFromIncomingRequest(
162163
export const _kInner = Symbol("kInner");
163164

164165
const kBodyStream = Symbol("kBodyStream");
165-
const kBodyStreamBrand = Symbol("kBodyStreamBrand");
166166
const kInputGated = Symbol("kInputGated");
167167
const kFormDataFiles = Symbol("kFormDataFiles");
168168
const kCloned = Symbol("kCloned");

packages/core/src/standards/streams.ts

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import {
1616
isBufferSource,
1717
} from "./helpers";
1818

19+
export const kBodyStreamBrand = Symbol("kBodyStreamBrand");
20+
1921
/** @internal */
2022
export function _isByteStream(
2123
stream: ReadableStream
@@ -177,18 +179,32 @@ ReadableStream.prototype.tee = function () {
177179
if (!(this instanceof ReadableStream)) {
178180
throw new TypeError("Illegal invocation");
179181
}
182+
let [stream1, stream2] = originalTee.call(this);
180183
if (_isByteStream(this)) {
181-
const [stream1, stream2] = originalTee.call(this);
182184
// We need to clone chunks here, as either of the tee()ed streams might be
183185
// passed to `new Response()`, which will detach array buffers when reading:
184186
// https://github.com/cloudflare/miniflare/issues/375
185-
return [
186-
convertToByteStream(stream1, true /* clone */),
187-
convertToByteStream(stream2, true /* clone */),
188-
];
189-
} else {
190-
return originalTee.call(this);
187+
stream1 = convertToByteStream(stream1, true /* clone */);
188+
stream2 = convertToByteStream(stream2, true /* clone */);
189+
}
190+
191+
// Copy known-length stream markers
192+
const branded = this as {
193+
[kBodyStreamBrand]?: unknown;
194+
[kContentLength]?: unknown;
195+
};
196+
const bodyBrand = branded[kBodyStreamBrand];
197+
if (bodyBrand !== undefined) {
198+
Object.defineProperty(stream1, kBodyStreamBrand, { value: bodyBrand });
199+
Object.defineProperty(stream2, kBodyStreamBrand, { value: bodyBrand });
200+
}
201+
const contentLength = branded[kContentLength];
202+
if (contentLength !== undefined) {
203+
Object.defineProperty(stream1, kContentLength, { value: contentLength });
204+
Object.defineProperty(stream2, kContentLength, { value: contentLength });
191205
}
206+
207+
return [stream1, stream2];
192208
};
193209

194210
const kTransformHook = Symbol("kTransformHook");

packages/core/test/standards/streams.spec.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import {
1313
IdentityTransformStream,
1414
Request,
1515
Response,
16+
_isBodyStream,
1617
_isByteStream,
1718
_isDisturbedStream,
19+
_isFixedLengthStream,
1820
} from "@miniflare/core";
1921
import { utf8Decode, utf8Encode } from "@miniflare/shared-test";
2022
import test, { Macro, ThrowsExpectation } from "ava";
@@ -268,6 +270,24 @@ test("ReadableStream: tee: returns regular stream when teeing regular stream", a
268270
t.is(await text(stream1 as any), "value");
269271
t.is(await text(stream2 as any), "value");
270272
});
273+
test("ReadableStream: tee: returns known length streams when teeing known length streams", async (t) => {
274+
const fixedLengthStream = new FixedLengthStream(3);
275+
let [stream1, stream2] = fixedLengthStream.readable.tee();
276+
t.true(_isFixedLengthStream(stream1));
277+
t.true(_isFixedLengthStream(stream2));
278+
279+
const req = new Request("http://localhost", { method: "POST", body: "body" });
280+
assert(req.body !== null);
281+
[stream1, stream2] = req.body.tee();
282+
t.true(_isBodyStream(stream1));
283+
t.true(_isBodyStream(stream2));
284+
285+
const res = new Response("body");
286+
assert(res.body !== null);
287+
[stream1, stream2] = res.body.tee();
288+
t.true(_isBodyStream(stream1));
289+
t.true(_isBodyStream(stream2));
290+
});
271291
test("ReadableStream: tee: throws on illegal invocation", (t) => {
272292
const stream = new ReadableStream<string>({
273293
start(controller) {

packages/r2/test/multipart.spec.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,9 @@ test("R2MultipartUpload: uploadPart", async (t) => {
177177
const response = new Response("value7");
178178
assert(request.body !== null && response.body !== null);
179179
await upload.uploadPart(5, readable);
180-
await upload.uploadPart(6, request.body);
181-
await upload.uploadPart(7, response.body);
180+
// Check `tee()`ing body inherits known length
181+
await upload.uploadPart(6, request.body.tee()[0]);
182+
await upload.uploadPart(7, response.body.tee()[1]);
182183
const unknownLengthReadable = new ReadableStream({
183184
type: "bytes",
184185
pull(controller) {

0 commit comments

Comments
 (0)