Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/shaggy-walls-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": patch
---

Reduces allocations and copies of streams
8 changes: 4 additions & 4 deletions packages/open-next/src/core/routing/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ export function convertRes(res: OpenNextNodeResponse): InternalResult {
const isBase64Encoded =
isBinaryContentType(headers["content-type"]) ||
!!headers["content-encoding"];
let index = 0;
const body = new ReadableStream({
pull(controller) {
if (!res._chunks || res._chunks.length === 0) {
if (!res._chunks || index >= res._chunks.length) {
controller.close();
return;
}

controller.enqueue(res._chunks.shift());
controller.enqueue(res._chunks[index++]);
},
});
return {
Expand Down Expand Up @@ -217,13 +218,12 @@ export function convertBodyToReadableStream(
) {
if (method === "GET" || method === "HEAD") return undefined;
if (!body) return undefined;
const readable = new ReadableStream({
return new ReadableStream({
start(controller) {
controller.enqueue(body);
controller.close();
},
});
return readable;
}

enum CommonHeaders {
Expand Down
2 changes: 1 addition & 1 deletion packages/open-next/src/utils/binary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ const commonBinaryMimeTypes = new Set([
export function isBinaryContentType(contentType?: string | null) {
if (!contentType) return false;

const value = contentType?.split(";")[0] ?? "";
const value = contentType.split(";").at(0);
return commonBinaryMimeTypes.has(value);
}
3 changes: 1 addition & 2 deletions packages/open-next/src/utils/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ export function customFetchClient(client: AwsClient) {
* This is necessary otherwise we get some error : SocketError: other side closed
* https://github.com/nodejs/undici/issues/583#issuecomment-855384858
*/
const clonedResponse = response.clone();
return clonedResponse;
return response.clone();
};
}
75 changes: 52 additions & 23 deletions packages/open-next/src/utils/stream.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,72 @@
import { Readable } from "node:stream";
import type { ReadableStream } from "node:stream/web";
import { ReadableStream } from "node:stream/web";

export function fromReadableStream(
export async function fromReadableStream(
stream: ReadableStream<Uint8Array>,
base64?: boolean,
): Promise<string> {
const reader = stream.getReader();
const chunks: Uint8Array[] = [];
let totalLength = 0;

return new Promise((resolve, reject) => {
function pump() {
reader
.read()
.then(({ done, value }) => {
if (done) {
resolve(Buffer.concat(chunks).toString(base64 ? "base64" : "utf8"));
return;
}
chunks.push(value);
pump();
})
.catch(reject);
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
totalLength += value.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does there need to be validation that value actually is a typed array?

}
pump();
});

if (chunks.length === 0) {
return "";
}

if (chunks.length === 1) {
return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8");
}

// Pre-allocate buffer with exact size to avoid reallocation
const buffer = Buffer.allocUnsafe(totalLength);
let offset = 0;
for (const chunk of chunks) {
buffer.set(chunk, offset);
offset += chunk.length;
}

return buffer.toString(base64 ? "base64" : "utf8");
} finally {
reader.releaseLock();
}
}

export function toReadableStream(
value: string,
isBase64?: boolean,
): ReadableStream {
return Readable.toWeb(
Readable.from(Buffer.from(value, isBase64 ? "base64" : "utf8")),
);
const buffer = Buffer.from(value, isBase64 ? "base64" : "utf8");

return new ReadableStream({
start(controller) {
controller.enqueue(buffer);
controller.close();
},
});
}

let maybeSomethingBuffer: Buffer | undefined;

export function emptyReadableStream(): ReadableStream {
if (process.env.OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE === "true") {
return Readable.toWeb(Readable.from([Buffer.from("SOMETHING")]));
return new ReadableStream({
start(controller) {
maybeSomethingBuffer ??= Buffer.from("SOMETHING");
controller.enqueue(maybeSomethingBuffer);
controller.close();
},
});
}
return Readable.toWeb(Readable.from([]));
return new ReadableStream({
start(controller) {
controller.close();
},
});
}
Loading