Skip to content

Commit 8dd3dd1

Browse files
committed
improve performance of streams
1 parent 3428a5d commit 8dd3dd1

File tree

5 files changed

+65
-31
lines changed

5 files changed

+65
-31
lines changed

.changeset/shaggy-walls-sniff.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@opennextjs/aws": patch
3+
---
4+
5+
Reduces allocations and copies of streams

packages/open-next/src/core/routing/util.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,15 @@ export function convertRes(res: OpenNextNodeResponse): InternalResult {
105105
const isBase64Encoded =
106106
isBinaryContentType(headers["content-type"]) ||
107107
!!headers["content-encoding"];
108+
let index = 0;
108109
const body = new ReadableStream({
109110
pull(controller) {
110-
if (!res._chunks || res._chunks.length === 0) {
111+
if (!res._chunks || index >= res._chunks.length) {
111112
controller.close();
112113
return;
113114
}
114115

115-
controller.enqueue(res._chunks.shift());
116+
controller.enqueue(res._chunks[index++]);
116117
},
117118
});
118119
return {
@@ -217,13 +218,12 @@ export function convertBodyToReadableStream(
217218
) {
218219
if (method === "GET" || method === "HEAD") return undefined;
219220
if (!body) return undefined;
220-
const readable = new ReadableStream({
221+
return new ReadableStream({
221222
start(controller) {
222223
controller.enqueue(body);
223224
controller.close();
224225
},
225226
});
226-
return readable;
227227
}
228228

229229
enum CommonHeaders {

packages/open-next/src/utils/binary.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ const commonBinaryMimeTypes = new Set([
6060
]);
6161

6262
export function isBinaryContentType(contentType?: string | null) {
63-
if (!contentType) return false;
63+
if (contentType == null) return false;
6464

65-
const value = contentType?.split(";")[0] ?? "";
65+
const value = contentType.split(";").at(0);
66+
if (value == null) return false;
6667
return commonBinaryMimeTypes.has(value);
6768
}

packages/open-next/src/utils/fetch.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ export function customFetchClient(client: AwsClient) {
1818
* This is necessary otherwise we get some error : SocketError: other side closed
1919
* https://github.com/nodejs/undici/issues/583#issuecomment-855384858
2020
*/
21-
const clonedResponse = response.clone();
22-
return clonedResponse;
21+
return response.clone();
2322
};
2423
}
Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,72 @@
1-
import { Readable } from "node:stream";
2-
import type { ReadableStream } from "node:stream/web";
1+
import { ReadableStream } from "node:stream/web";
32

4-
export function fromReadableStream(
3+
export async function fromReadableStream(
54
stream: ReadableStream<Uint8Array>,
65
base64?: boolean,
76
): Promise<string> {
87
const reader = stream.getReader();
98
const chunks: Uint8Array[] = [];
9+
let totalLength = 0;
1010

11-
return new Promise((resolve, reject) => {
12-
function pump() {
13-
reader
14-
.read()
15-
.then(({ done, value }) => {
16-
if (done) {
17-
resolve(Buffer.concat(chunks).toString(base64 ? "base64" : "utf8"));
18-
return;
19-
}
20-
chunks.push(value);
21-
pump();
22-
})
23-
.catch(reject);
11+
try {
12+
while (true) {
13+
const { done, value } = await reader.read();
14+
if (done) break;
15+
chunks.push(value);
16+
totalLength += value.length;
2417
}
25-
pump();
26-
});
18+
19+
if (chunks.length === 0) {
20+
return "";
21+
}
22+
23+
if (chunks.length === 1) {
24+
return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8");
25+
}
26+
27+
// Pre-allocate buffer with exact size to avoid reallocation
28+
const buffer = Buffer.allocUnsafe(totalLength);
29+
let offset = 0;
30+
for (const chunk of chunks) {
31+
buffer.set(chunk, offset);
32+
offset += chunk.length;
33+
}
34+
35+
return buffer.toString(base64 ? "base64" : "utf8");
36+
} finally {
37+
reader.releaseLock();
38+
}
2739
}
2840

2941
export function toReadableStream(
3042
value: string,
3143
isBase64?: boolean,
3244
): ReadableStream {
33-
return Readable.toWeb(
34-
Readable.from(Buffer.from(value, isBase64 ? "base64" : "utf8")),
35-
);
45+
const buffer = Buffer.from(value, isBase64 ? "base64" : "utf8");
46+
47+
return new ReadableStream({
48+
start(controller) {
49+
controller.enqueue(buffer);
50+
controller.close();
51+
},
52+
});
3653
}
3754

55+
let maybeSomethingBuffer: Buffer | undefined;
56+
3857
export function emptyReadableStream(): ReadableStream {
3958
if (process.env.OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE === "true") {
40-
return Readable.toWeb(Readable.from([Buffer.from("SOMETHING")]));
59+
return new ReadableStream({
60+
start(controller) {
61+
maybeSomethingBuffer ??= Buffer.from("SOMETHING");
62+
controller.enqueue(maybeSomethingBuffer);
63+
controller.close();
64+
},
65+
});
4166
}
42-
return Readable.toWeb(Readable.from([]));
67+
return new ReadableStream({
68+
start(controller) {
69+
controller.close();
70+
},
71+
});
4372
}

0 commit comments

Comments
 (0)