Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 857d942

Browse files
committed
Implement FixedLengthStream, closes #123
1 parent a1cff3c commit 857d942

File tree

6 files changed

+225
-5
lines changed

6 files changed

+225
-5
lines changed

packages/core/src/plugins/core.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import {
3939
AbortSignal,
4040
DOMException,
4141
FetchEvent,
42+
FixedLengthStream,
4243
Request,
4344
Response,
4445
ScheduledEvent,
@@ -365,6 +366,7 @@ export class CorePlugin extends Plugin<CoreOptions> implements CoreOptions {
365366
WritableStream,
366367
WritableStreamDefaultController,
367368
WritableStreamDefaultWriter,
369+
FixedLengthStream,
368370

369371
Event,
370372
EventTarget,

packages/core/src/standards/http.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import {
4848
buildNotBufferSourceError,
4949
isBufferSource,
5050
} from "./helpers";
51+
import { kContentLength } from "./streams";
5152

5253
const inspect = Symbol.for("nodejs.util.inspect.custom");
5354
const nonEnumerable = Object.create(null);
@@ -334,6 +335,14 @@ export class Request extends Body<BaseRequest> {
334335
}
335336
this.#cf = cf ? nonCircularClone(cf) : undefined;
336337

338+
// If body is a FixedLengthStream, set Content-Length to its expected length
339+
const contentLength: number | undefined = (init?.body as any)?.[
340+
kContentLength
341+
];
342+
if (contentLength !== undefined) {
343+
this.headers.set("content-length", contentLength.toString());
344+
}
345+
337346
makeEnumerable(Request.prototype, this, enumerableRequestKeys);
338347
}
339348

@@ -491,6 +500,12 @@ export class Response<
491500
this.#status = status;
492501
this.#webSocket = webSocket;
493502

503+
// If body is a FixedLengthStream, set Content-Length to its expected length
504+
const contentLength: number | undefined = (body as any)?.[kContentLength];
505+
if (contentLength !== undefined) {
506+
this.headers.set("content-length", contentLength.toString());
507+
}
508+
494509
makeEnumerable(Response.prototype, this, enumerableResponseKeys);
495510
Object.defineProperty(this, kWaitUntil, nonEnumerable);
496511
}

packages/core/src/standards/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ export {
2020
logResponse,
2121
} from "./http";
2222
export type { RequestInfo, RequestInit, ResponseInit, HRTime } from "./http";
23-
export * from "./streams";
23+
export { FixedLengthStream } from "./streams";
24+
export type { ArrayBufferViewConstructor } from "./streams";
2425
export * from "./timers";

packages/core/src/standards/streams.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import {
22
ReadableStreamBYOBReadResult,
33
ReadableStreamBYOBReader,
4+
TransformStream,
45
} from "stream/web";
6+
import {
7+
bufferSourceToArray,
8+
buildNotBufferSourceError,
9+
isBufferSource,
10+
} from "./helpers";
511

612
export type ArrayBufferViewConstructor =
713
| typeof Int8Array
@@ -70,3 +76,55 @@ ReadableStreamBYOBReader.prototype.readAtLeast = async function <
7076
const value = new ctor(buffer, byteOffset, read / bytesPerElement);
7177
return { value: value as any, done };
7278
};
79+
80+
export const kContentLength = Symbol("kContentLength");
81+
82+
export class FixedLengthStream extends TransformStream<Uint8Array, Uint8Array> {
83+
constructor(expectedLength: number) {
84+
// noinspection SuspiciousTypeOfGuard
85+
if (typeof expectedLength !== "number" || expectedLength < 0) {
86+
throw new TypeError(
87+
"FixedLengthStream requires a non-negative integer expected length."
88+
);
89+
}
90+
91+
// Keep track of the number of bytes written
92+
let written = 0;
93+
super({
94+
transform(chunk, controller) {
95+
// Make sure this chunk is an ArrayBuffer(View)
96+
if (isBufferSource(chunk)) {
97+
const array = bufferSourceToArray(chunk);
98+
99+
// Throw if written too many bytes
100+
written += array.byteLength;
101+
if (written > expectedLength) {
102+
return controller.error(
103+
new TypeError(
104+
"Attempt to write too many bytes through a FixedLengthStream."
105+
)
106+
);
107+
}
108+
109+
controller.enqueue(array);
110+
} else {
111+
controller.error(new TypeError(buildNotBufferSourceError(chunk)));
112+
}
113+
},
114+
flush(controller) {
115+
// Throw if not written enough bytes on close
116+
if (written < expectedLength) {
117+
controller.error(
118+
new TypeError(
119+
"FixedLengthStream did not see all expected bytes before close()."
120+
)
121+
);
122+
}
123+
},
124+
});
125+
126+
// When used as Request/Response body, override the Content-Length header
127+
// with the expectedLength
128+
(this.readable as any)[kContentLength] = expectedLength;
129+
}
130+
}

packages/core/test/plugins/core.spec.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ test("CorePlugin: setup: includes web standards", async (t) => {
295295
t.true(typeof globals.WritableStream === "function");
296296
t.true(typeof globals.WritableStreamDefaultController === "function");
297297
t.true(typeof globals.WritableStreamDefaultWriter === "function");
298+
t.true(typeof globals.FixedLengthStream === "function");
298299

299300
t.true(typeof globals.Event === "function");
300301
t.true(typeof globals.EventTarget === "function");

packages/core/test/standards/streams.spec.ts

Lines changed: 147 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
import assert from "assert";
22
import { ReadableStream, ReadableStreamBYOBReadResult } from "stream/web";
3-
import { ArrayBufferViewConstructor } from "@miniflare/core";
4-
import test from "ava";
5-
6-
import "@miniflare/core";
3+
import {
4+
ArrayBufferViewConstructor,
5+
FixedLengthStream,
6+
Request,
7+
Response,
8+
} from "@miniflare/core";
9+
import { utf8Encode } from "@miniflare/shared-test";
10+
import test, { ThrowsExpectation } from "ava";
711

812
function chunkedStream(chunks: number[][]): ReadableStream<Uint8Array> {
913
return new ReadableStream({
@@ -176,3 +180,142 @@ test("ReadableStreamBYOBReader: readAtLeast: throws if minimum number of bytes e
176180
message: "Minimum bytes to read (4) exceeds size of buffer (3).",
177181
});
178182
});
183+
184+
test("FixedLengthStream: requires non-negative integer expected length", (t) => {
185+
const expectations: ThrowsExpectation = {
186+
instanceOf: TypeError,
187+
message:
188+
"FixedLengthStream requires a non-negative integer expected length.",
189+
};
190+
// @ts-expect-error intentionally testing incorrect types
191+
t.throws(() => new FixedLengthStream(), expectations);
192+
t.throws(() => new FixedLengthStream(-42), expectations);
193+
new FixedLengthStream(0);
194+
});
195+
test("FixedLengthStream: throws if too many bytes written", async (t) => {
196+
const { readable, writable } = new FixedLengthStream(3);
197+
const writer = writable.getWriter();
198+
// noinspection ES6MissingAwait
199+
void writer.write(new Uint8Array([1, 2]));
200+
// noinspection ES6MissingAwait
201+
void writer.write(new Uint8Array([3, 4]));
202+
203+
const reader = readable.getReader();
204+
t.deepEqual((await reader.read()).value, new Uint8Array([1, 2]));
205+
await t.throwsAsync(reader.read(), {
206+
instanceOf: TypeError,
207+
message: "Attempt to write too many bytes through a FixedLengthStream.",
208+
});
209+
});
210+
test("FixedLengthStream: throws if too few bytes written", async (t) => {
211+
const { readable, writable } = new FixedLengthStream(3);
212+
const writer = writable.getWriter();
213+
// noinspection ES6MissingAwait
214+
void writer.write(new Uint8Array([1, 2]));
215+
// noinspection ES6MissingAwait
216+
const closePromise = writer.close();
217+
218+
const reader = readable.getReader();
219+
t.deepEqual((await reader.read()).value, new Uint8Array([1, 2]));
220+
await t.throwsAsync(closePromise, {
221+
instanceOf: TypeError,
222+
message: "FixedLengthStream did not see all expected bytes before close().",
223+
});
224+
});
225+
test("FixedLengthStream: behaves as identity transform if just right number of bytes written", async (t) => {
226+
const { readable, writable } = new FixedLengthStream(3);
227+
const writer = writable.getWriter();
228+
// noinspection ES6MissingAwait
229+
void writer.write(new Uint8Array([1, 2]));
230+
// noinspection ES6MissingAwait
231+
void writer.write(new Uint8Array([3]));
232+
// noinspection ES6MissingAwait
233+
void writer.close();
234+
235+
const reader = readable.getReader();
236+
t.deepEqual((await reader.read()).value, new Uint8Array([1, 2]));
237+
t.deepEqual((await reader.read()).value, new Uint8Array([3]));
238+
t.true((await reader.read()).done);
239+
});
240+
test("FixedLengthStream: throws on string chunks", async (t) => {
241+
const { readable, writable } = new FixedLengthStream(5);
242+
const writer = writable.getWriter();
243+
// noinspection ES6MissingAwait
244+
void writer.write(
245+
// @ts-expect-error intentionally testing incorrect types
246+
"how much chunk would a chunk-chuck chuck if a chunk-chuck could chuck chunk?"
247+
);
248+
249+
const reader = readable.getReader();
250+
await t.throwsAsync(reader.read(), {
251+
instanceOf: TypeError,
252+
message:
253+
"This TransformStream is being used as a byte stream, " +
254+
"but received a string on its writable side. " +
255+
"If you wish to write a string, you'll probably want to " +
256+
"explicitly UTF-8-encode it with TextEncoder.",
257+
});
258+
});
259+
test("FixedLengthStream: throws on non-ArrayBuffer/ArrayBufferView chunks", async (t) => {
260+
const { readable, writable } = new FixedLengthStream(5);
261+
const writer = writable.getWriter();
262+
// @ts-expect-error intentionally testing incorrect types
263+
// noinspection ES6MissingAwait
264+
void writer.write(42);
265+
266+
const reader = readable.getReader();
267+
await t.throwsAsync(reader.read(), {
268+
instanceOf: TypeError,
269+
message:
270+
"This TransformStream is being used as a byte stream, " +
271+
"but received an object of non-ArrayBuffer/ArrayBufferView " +
272+
"type on its writable side.",
273+
});
274+
});
275+
function buildFixedLengthReadableStream(length: number) {
276+
const { readable, writable } = new FixedLengthStream(length);
277+
const writer = writable.getWriter();
278+
if (length > 0) void writer.write(utf8Encode("".padStart(length, "x")));
279+
void writer.close();
280+
return readable;
281+
}
282+
test("FixedLengthStream: sets Content-Length header on Request", async (t) => {
283+
let body = buildFixedLengthReadableStream(3);
284+
let req = new Request("http://localhost", { method: "POST", body });
285+
t.is(req.headers.get("Content-Length"), "3");
286+
t.is(await req.text(), "xxx");
287+
288+
// Check overrides existing Content-Length header
289+
body = buildFixedLengthReadableStream(3);
290+
req = new Request("http://localhost", {
291+
method: "POST",
292+
body,
293+
headers: { "Content-Length": "2" },
294+
});
295+
t.is(req.headers.get("Content-Length"), "3");
296+
t.is(await req.text(), "xxx");
297+
298+
// Check still includes header with 0 expected length
299+
body = buildFixedLengthReadableStream(0);
300+
req = new Request("http://localhost", { method: "POST", body });
301+
t.is(req.headers.get("Content-Length"), "0");
302+
t.is(await req.text(), "");
303+
});
304+
test("FixedLengthStream: sets Content-Length header on Response", async (t) => {
305+
let body = buildFixedLengthReadableStream(3);
306+
let res = new Response(body);
307+
t.is(res.headers.get("Content-Length"), "3");
308+
t.is(await res.text(), "xxx");
309+
310+
// Check overrides existing Content-Length header
311+
body = buildFixedLengthReadableStream(3);
312+
res = new Response(body, { headers: { "Content-Length": "2" } });
313+
t.is(res.headers.get("Content-Length"), "3");
314+
t.is(await res.text(), "xxx");
315+
316+
// Check still includes header with 0 expected length
317+
body = buildFixedLengthReadableStream(0);
318+
res = new Response(body);
319+
t.is(res.headers.get("Content-Length"), "0");
320+
t.is(await res.text(), "");
321+
});

0 commit comments

Comments
 (0)