Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 73f06ea

Browse files
committed
Add support for (De)CompressionStream, closes #206
1 parent a50b968 commit 73f06ea

File tree

6 files changed

+241
-80
lines changed

6 files changed

+241
-80
lines changed

packages/core/src/plugins/core.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ import {
1616
WritableStreamDefaultController,
1717
WritableStreamDefaultWriter,
1818
} from "stream/web";
19+
// Import all of `stream/web` so we don't get a syntax error when trying to
20+
// import `(De)CompressionStream` on Node < 17.0.0. We can't import dynamically
21+
// either as `CorePlugin` construction is synchronous.
22+
import webStreams from "stream/web";
1923
import { URL, URLSearchParams } from "url";
2024
import { TextDecoder, TextEncoder } from "util";
2125
import { deserialize, serialize } from "v8";
@@ -39,7 +43,9 @@ import { URLPattern } from "urlpattern-polyfill";
3943
import { MiniflareCoreError } from "../error";
4044
import {
4145
AbortSignal,
46+
CompressionStream,
4247
DOMException,
48+
DecompressionStream,
4349
FetchEvent,
4450
FixedLengthStream,
4551
Navigator,
@@ -405,6 +411,14 @@ export class CorePlugin extends Plugin<CoreOptions> implements CoreOptions {
405411
const blockGlobalTimers = !this.globalTimers;
406412
const crypto = createCrypto(!this.globalRandom);
407413

414+
// `(De)CompressionStream`s were added in Node.js 17.0.0, and added to the
415+
// global scope in Node.js 18.0.0. Our minimum supported version is 16.7.0,
416+
// so we implement basic versions ourselves, preferring Node's if available.
417+
const CompressionStreamImpl =
418+
webStreams.CompressionStream ?? CompressionStream;
419+
const DecompressionStreamImpl =
420+
webStreams.DecompressionStream ?? DecompressionStream;
421+
408422
// Build globals object
409423
// noinspection JSDeprecatedSymbols
410424
this.#globals = {
@@ -458,6 +472,8 @@ export class CorePlugin extends Plugin<CoreOptions> implements CoreOptions {
458472
WritableStreamDefaultController,
459473
WritableStreamDefaultWriter,
460474
FixedLengthStream,
475+
CompressionStream: CompressionStreamImpl,
476+
DecompressionStream: DecompressionStreamImpl,
461477

462478
Event,
463479
EventTarget,

packages/core/src/standards/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ export type {
4242
ResponseType,
4343
ResponseRedirectStatus,
4444
} from "./http";
45-
export { FixedLengthStream } from "./streams";
45+
export {
46+
FixedLengthStream,
47+
CompressionStream,
48+
DecompressionStream,
49+
} from "./streams";
4650
export type { ArrayBufferViewConstructor } from "./streams";
4751
export * from "./navigator";
4852
export * from "./timers";

packages/core/src/standards/streams.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import type { Transform } from "stream";
12
import {
23
ReadableStreamBYOBReadResult,
34
ReadableStreamBYOBReader,
45
TransformStream,
6+
Transformer,
57
} from "stream/web";
8+
import zlib from "zlib";
69
import {
710
bufferSourceToArray,
811
buildNotBufferSourceError,
@@ -128,3 +131,62 @@ export class FixedLengthStream extends TransformStream<Uint8Array, Uint8Array> {
128131
(this.readable as any)[kContentLength] = expectedLength;
129132
}
130133
}
134+
135+
function createTransformerFromTransform(transform: Transform): Transformer {
136+
// TODO: backpressure? see https://github.com/nodejs/node/blob/440d95a878a1a19bf72a2685fc8fc0f47100b510/lib/internal/webstreams/adapters.js#L538
137+
return {
138+
start(controller) {
139+
transform.on("data", (chunk) => {
140+
controller.enqueue(new Uint8Array(chunk));
141+
});
142+
transform.on("error", (error) => {
143+
controller.error(error);
144+
});
145+
},
146+
transform(chunk) {
147+
transform.write(chunk);
148+
},
149+
flush() {
150+
return new Promise((resolve) => {
151+
transform.once("close", () => {
152+
transform.removeAllListeners();
153+
resolve();
154+
});
155+
transform.end();
156+
});
157+
},
158+
};
159+
}
160+
161+
// `(De)CompressionStream`s were added in Node.js 17.0.0. Our minimum supported
162+
// version is 16.7.0, so we implement basic versions ourselves, preferring to
163+
// use Node's if available.
164+
165+
export class CompressionStream extends TransformStream<Uint8Array, Uint8Array> {
166+
constructor(format: "gzip" | "deflate") {
167+
if (format !== "gzip" && format !== "deflate") {
168+
throw new TypeError(
169+
"The compression format must be either 'deflate' or 'gzip'."
170+
);
171+
}
172+
const transform =
173+
format === "gzip" ? zlib.createGzip() : zlib.createDeflate();
174+
super(createTransformerFromTransform(transform));
175+
}
176+
}
177+
178+
export class DecompressionStream extends TransformStream<
179+
Uint8Array,
180+
Uint8Array
181+
> {
182+
constructor(format: "gzip" | "deflate") {
183+
if (format !== "gzip" && format !== "deflate") {
184+
throw new TypeError(
185+
"The compression format must be either 'deflate' or 'gzip'."
186+
);
187+
}
188+
const transform =
189+
format === "gzip" ? zlib.createGunzip() : zlib.createInflate();
190+
super(createTransformerFromTransform(transform));
191+
}
192+
}

packages/core/test/plugins/core.spec.ts

Lines changed: 106 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import assert from "assert";
22
import fs from "fs/promises";
3-
43
import path from "path";
4+
import { text } from "stream/consumers";
5+
import type { CompressionStream, DecompressionStream } from "stream/web";
56
import { CorePlugin, Request, Response, Scheduler } from "@miniflare/core";
67
import {
78
Compatibility,
@@ -16,6 +17,7 @@ import {
1617
parsePluginWranglerConfig,
1718
useServer,
1819
useTmp,
20+
utf8Encode,
1921
} from "@miniflare/shared-test";
2022
import test, { ThrowsExpectation } from "ava";
2123
import { File, FormData } from "undici";
@@ -303,82 +305,84 @@ test("CorePlugin: setup: includes web standards", async (t) => {
303305
const { globals } = await plugin.setup();
304306
assert(globals);
305307

306-
t.true(typeof globals.console === "object");
307-
308-
t.true(typeof globals.setTimeout === "function");
309-
t.true(typeof globals.setInterval === "function");
310-
t.true(typeof globals.clearTimeout === "function");
311-
t.true(typeof globals.clearInterval === "function");
312-
t.true(typeof globals.queueMicrotask === "function");
313-
t.true(typeof globals.scheduler.wait === "function");
314-
315-
t.true(typeof globals.atob === "function");
316-
t.true(typeof globals.btoa === "function");
317-
318-
t.true(typeof globals.crypto === "object");
319-
t.true(typeof globals.CryptoKey === "function");
320-
t.true(typeof globals.TextDecoder === "function");
321-
t.true(typeof globals.TextEncoder === "function");
322-
323-
t.true(typeof globals.fetch === "function");
324-
t.true(typeof globals.Headers === "function");
325-
t.true(typeof globals.Request === "function");
326-
t.true(typeof globals.Response === "function");
327-
t.true(typeof globals.FormData === "function");
328-
t.true(typeof globals.Blob === "function");
329-
t.true(typeof globals.File === "function");
330-
t.true(typeof globals.URL === "function");
331-
t.true(typeof globals.URLSearchParams === "function");
332-
t.true(typeof globals.URLPattern === "function");
333-
334-
t.true(typeof globals.ByteLengthQueuingStrategy === "function");
335-
t.true(typeof globals.CountQueuingStrategy === "function");
336-
t.true(typeof globals.ReadableByteStreamController === "function");
337-
t.true(typeof globals.ReadableStream === "function");
338-
t.true(typeof globals.ReadableStreamBYOBReader === "function");
339-
t.true(typeof globals.ReadableStreamBYOBRequest === "function");
340-
t.true(typeof globals.ReadableStreamDefaultController === "function");
341-
t.true(typeof globals.ReadableStreamDefaultReader === "function");
342-
t.true(typeof globals.TransformStream === "function");
343-
t.true(typeof globals.TransformStreamDefaultController === "function");
344-
t.true(typeof globals.WritableStream === "function");
345-
t.true(typeof globals.WritableStreamDefaultController === "function");
346-
t.true(typeof globals.WritableStreamDefaultWriter === "function");
347-
t.true(typeof globals.FixedLengthStream === "function");
348-
349-
t.true(typeof globals.Event === "function");
350-
t.true(typeof globals.EventTarget === "function");
351-
t.true(typeof globals.AbortController === "function");
352-
t.true(typeof globals.AbortSignal === "function");
353-
t.true(typeof globals.FetchEvent === "function");
354-
t.true(typeof globals.ScheduledEvent === "function");
355-
356-
t.true(typeof globals.DOMException === "function");
357-
t.true(typeof globals.WorkerGlobalScope === "function");
358-
359-
t.true(typeof globals.structuredClone === "function");
360-
361-
t.true(typeof globals.ArrayBuffer === "function");
362-
t.true(typeof globals.Atomics === "object");
363-
t.true(typeof globals.BigInt64Array === "function");
364-
t.true(typeof globals.BigUint64Array === "function");
365-
t.true(typeof globals.DataView === "function");
366-
t.true(typeof globals.Date === "function");
367-
t.true(typeof globals.Float32Array === "function");
368-
t.true(typeof globals.Float64Array === "function");
369-
t.true(typeof globals.Int8Array === "function");
370-
t.true(typeof globals.Int16Array === "function");
371-
t.true(typeof globals.Int32Array === "function");
372-
t.true(typeof globals.Map === "function");
373-
t.true(typeof globals.Set === "function");
374-
t.true(typeof globals.SharedArrayBuffer === "function");
375-
t.true(typeof globals.Uint8Array === "function");
376-
t.true(typeof globals.Uint8ClampedArray === "function");
377-
t.true(typeof globals.Uint16Array === "function");
378-
t.true(typeof globals.Uint32Array === "function");
379-
t.true(typeof globals.WeakMap === "function");
380-
t.true(typeof globals.WeakSet === "function");
381-
t.true(typeof globals.WebAssembly === "object");
308+
t.is(typeof globals.console, "object");
309+
310+
t.is(typeof globals.setTimeout, "function");
311+
t.is(typeof globals.setInterval, "function");
312+
t.is(typeof globals.clearTimeout, "function");
313+
t.is(typeof globals.clearInterval, "function");
314+
t.is(typeof globals.queueMicrotask, "function");
315+
t.is(typeof globals.scheduler.wait, "function");
316+
317+
t.is(typeof globals.atob, "function");
318+
t.is(typeof globals.btoa, "function");
319+
320+
t.is(typeof globals.crypto, "object");
321+
t.is(typeof globals.CryptoKey, "function");
322+
t.is(typeof globals.TextDecoder, "function");
323+
t.is(typeof globals.TextEncoder, "function");
324+
325+
t.is(typeof globals.fetch, "function");
326+
t.is(typeof globals.Headers, "function");
327+
t.is(typeof globals.Request, "function");
328+
t.is(typeof globals.Response, "function");
329+
t.is(typeof globals.FormData, "function");
330+
t.is(typeof globals.Blob, "function");
331+
t.is(typeof globals.File, "function");
332+
t.is(typeof globals.URL, "function");
333+
t.is(typeof globals.URLSearchParams, "function");
334+
t.is(typeof globals.URLPattern, "function");
335+
336+
t.is(typeof globals.ByteLengthQueuingStrategy, "function");
337+
t.is(typeof globals.CountQueuingStrategy, "function");
338+
t.is(typeof globals.ReadableByteStreamController, "function");
339+
t.is(typeof globals.ReadableStream, "function");
340+
t.is(typeof globals.ReadableStreamBYOBReader, "function");
341+
t.is(typeof globals.ReadableStreamBYOBRequest, "function");
342+
t.is(typeof globals.ReadableStreamDefaultController, "function");
343+
t.is(typeof globals.ReadableStreamDefaultReader, "function");
344+
t.is(typeof globals.TransformStream, "function");
345+
t.is(typeof globals.TransformStreamDefaultController, "function");
346+
t.is(typeof globals.WritableStream, "function");
347+
t.is(typeof globals.WritableStreamDefaultController, "function");
348+
t.is(typeof globals.WritableStreamDefaultWriter, "function");
349+
t.is(typeof globals.FixedLengthStream, "function");
350+
t.is(typeof globals.CompressionStream, "function");
351+
t.is(typeof globals.DecompressionStream, "function");
352+
353+
t.is(typeof globals.Event, "function");
354+
t.is(typeof globals.EventTarget, "function");
355+
t.is(typeof globals.AbortController, "function");
356+
t.is(typeof globals.AbortSignal, "function");
357+
t.is(typeof globals.FetchEvent, "function");
358+
t.is(typeof globals.ScheduledEvent, "function");
359+
360+
t.is(typeof globals.DOMException, "function");
361+
t.is(typeof globals.WorkerGlobalScope, "function");
362+
363+
t.is(typeof globals.structuredClone, "function");
364+
365+
t.is(typeof globals.ArrayBuffer, "function");
366+
t.is(typeof globals.Atomics, "object");
367+
t.is(typeof globals.BigInt64Array, "function");
368+
t.is(typeof globals.BigUint64Array, "function");
369+
t.is(typeof globals.DataView, "function");
370+
t.is(typeof globals.Date, "function");
371+
t.is(typeof globals.Float32Array, "function");
372+
t.is(typeof globals.Float64Array, "function");
373+
t.is(typeof globals.Int8Array, "function");
374+
t.is(typeof globals.Int16Array, "function");
375+
t.is(typeof globals.Int32Array, "function");
376+
t.is(typeof globals.Map, "function");
377+
t.is(typeof globals.Set, "function");
378+
t.is(typeof globals.SharedArrayBuffer, "function");
379+
t.is(typeof globals.Uint8Array, "function");
380+
t.is(typeof globals.Uint8ClampedArray, "function");
381+
t.is(typeof globals.Uint16Array, "function");
382+
t.is(typeof globals.Uint32Array, "function");
383+
t.is(typeof globals.WeakMap, "function");
384+
t.is(typeof globals.WeakSet, "function");
385+
t.is(typeof globals.WebAssembly, "object");
382386

383387
t.true(globals.MINIFLARE);
384388
});
@@ -540,6 +544,7 @@ test("CorePlugin: setup: uses actual time if option enabled", async (t) => {
540544
});
541545
});
542546

547+
// Test standards with basic-Miniflare and Node implementations
543548
test("CorePlugin: setup: structuredClone: creates deep-copy of value", async (t) => {
544549
const plugin = new CorePlugin(ctx);
545550
const { globals } = await plugin.setup();
@@ -554,6 +559,30 @@ test("CorePlugin: setup: structuredClone: creates deep-copy of value", async (t)
554559
t.not(thing, copy);
555560
t.deepEqual(thing, copy);
556561
});
562+
test("CorePlugin: setup: (De)CompressionStream: (de)compresses data", async (t) => {
563+
const plugin = new CorePlugin(ctx);
564+
const { globals } = await plugin.setup();
565+
assert(globals);
566+
567+
const CompressionStreamImpl: typeof CompressionStream =
568+
globals.CompressionStream;
569+
const DecompressionStreamImpl: typeof DecompressionStream =
570+
globals.DecompressionStream;
571+
572+
const compressor = new CompressionStreamImpl("gzip");
573+
const decompressor = new DecompressionStreamImpl("gzip");
574+
const data = "".padStart(1024, "x");
575+
const writer = compressor.writable.getWriter();
576+
// noinspection ES6MissingAwait
577+
void writer.write(utf8Encode(data));
578+
// noinspection ES6MissingAwait
579+
void writer.close();
580+
const decompressed = await text(
581+
// @ts-expect-error ReadableStream types are incompatible
582+
compressor.readable.pipeThrough(decompressor)
583+
);
584+
t.is(decompressed, data);
585+
});
557586

558587
test("CorePlugin: processedModuleRules: processes rules includes default module rules", (t) => {
559588
const plugin = new CorePlugin(ctx, {

0 commit comments

Comments
 (0)