diff --git a/.changeset/shaggy-walls-sniff.md b/.changeset/shaggy-walls-sniff.md new file mode 100644 index 000000000..2d1ef7579 --- /dev/null +++ b/.changeset/shaggy-walls-sniff.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/aws": patch +--- + +Reduces allocations and copies of streams diff --git a/packages/open-next/src/core/routing/util.ts b/packages/open-next/src/core/routing/util.ts index abce39236..c27ba0ec8 100644 --- a/packages/open-next/src/core/routing/util.ts +++ b/packages/open-next/src/core/routing/util.ts @@ -217,13 +217,12 @@ export function convertBodyToReadableStream( ) { if (method === "GET" || method === "HEAD") return undefined; if (!body) return undefined; - const readable = new ReadableStream({ + return new ReadableStream({ start(controller) { controller.enqueue(body); controller.close(); }, }); - return readable; } enum CommonHeaders { diff --git a/packages/open-next/src/utils/binary.ts b/packages/open-next/src/utils/binary.ts index 4ca324285..bff113a2d 100644 --- a/packages/open-next/src/utils/binary.ts +++ b/packages/open-next/src/utils/binary.ts @@ -62,6 +62,6 @@ const commonBinaryMimeTypes = new Set([ export function isBinaryContentType(contentType?: string | null) { if (!contentType) return false; - const value = contentType?.split(";")[0] ?? ""; + const value = contentType.split(";")[0]; return commonBinaryMimeTypes.has(value); } diff --git a/packages/open-next/src/utils/fetch.ts b/packages/open-next/src/utils/fetch.ts index 8378fafbf..853f853ae 100644 --- a/packages/open-next/src/utils/fetch.ts +++ b/packages/open-next/src/utils/fetch.ts @@ -18,7 +18,6 @@ export function customFetchClient(client: AwsClient) { * This is necessary otherwise we get some error : SocketError: other side closed * https://github.com/nodejs/undici/issues/583#issuecomment-855384858 */ - const clonedResponse = response.clone(); - return clonedResponse; + return response.clone(); }; } diff --git a/packages/open-next/src/utils/stream.ts b/packages/open-next/src/utils/stream.ts index 4361b5738..52e834f63 100644 --- a/packages/open-next/src/utils/stream.ts +++ b/packages/open-next/src/utils/stream.ts @@ -1,43 +1,70 @@ -import { Readable } from "node:stream"; -import type { ReadableStream } from "node:stream/web"; +import { ReadableStream } from "node:stream/web"; -export function fromReadableStream( +export async function fromReadableStream( stream: ReadableStream, base64?: boolean, ): Promise { - const reader = stream.getReader(); const chunks: Uint8Array[] = []; + let totalLength = 0; - return new Promise((resolve, reject) => { - function pump() { - reader - .read() - .then(({ done, value }) => { - if (done) { - resolve(Buffer.concat(chunks).toString(base64 ? "base64" : "utf8")); - return; - } - chunks.push(value); - pump(); - }) - .catch(reject); - } - pump(); - }); + for await (const chunk of stream) { + chunks.push(chunk); + totalLength += chunk.length; + } + + if (chunks.length === 0) { + return ""; + } + + if (chunks.length === 1) { + return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8"); + } + + // Pre-allocate buffer with exact size to avoid reallocation + const buffer = Buffer.alloc(totalLength); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.length; + } + + return buffer.toString(base64 ? "base64" : "utf8"); } export function toReadableStream( value: string, isBase64?: boolean, ): ReadableStream { - return Readable.toWeb( - Readable.from(Buffer.from(value, isBase64 ? "base64" : "utf8")), + return new ReadableStream( + { + pull(controller) { + // Defer the Buffer.from conversion to when the stream is actually read. + controller.enqueue(Buffer.from(value, isBase64 ? "base64" : "utf8")); + controller.close(); + }, + }, + { highWaterMark: 0 }, ); } +let maybeSomethingBuffer: Buffer | undefined; + export function emptyReadableStream(): ReadableStream { if (process.env.OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE === "true") { - return Readable.toWeb(Readable.from([Buffer.from("SOMETHING")])); + return new ReadableStream( + { + pull(controller) { + maybeSomethingBuffer ??= Buffer.from("SOMETHING"); + controller.enqueue(maybeSomethingBuffer); + controller.close(); + }, + }, + { highWaterMark: 0 }, + ); } - return Readable.toWeb(Readable.from([])); + return new ReadableStream({ + start(controller) { + controller.close(); + }, + }); }