Skip to content

Commit 5f3dc4b

Browse files
authored
improve performance of streams (#996)
1 parent 549d6a7 commit 5f3dc4b

File tree

5 files changed

+59
-29
lines changed

5 files changed

+59
-29
lines changed

.changeset/shaggy-walls-sniff.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@opennextjs/aws": patch
3+
---
4+
5+
Reduces allocations and copies of streams

packages/open-next/src/core/routing/util.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,13 +217,12 @@ export function convertBodyToReadableStream(
217217
) {
218218
if (method === "GET" || method === "HEAD") return undefined;
219219
if (!body) return undefined;
220-
const readable = new ReadableStream({
220+
return new ReadableStream({
221221
start(controller) {
222222
controller.enqueue(body);
223223
controller.close();
224224
},
225225
});
226-
return readable;
227226
}
228227

229228
enum CommonHeaders {

packages/open-next/src/utils/binary.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,6 @@ const commonBinaryMimeTypes = new Set([
6262
export function isBinaryContentType(contentType?: string | null) {
6363
if (!contentType) return false;
6464

65-
const value = contentType?.split(";")[0] ?? "";
65+
const value = contentType.split(";")[0];
6666
return commonBinaryMimeTypes.has(value);
6767
}

packages/open-next/src/utils/fetch.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ export function customFetchClient(client: AwsClient) {
1818
* This is necessary otherwise we get some error : SocketError: other side closed
1919
* https://github.com/nodejs/undici/issues/583#issuecomment-855384858
2020
*/
21-
const clonedResponse = response.clone();
22-
return clonedResponse;
21+
return response.clone();
2322
};
2423
}
Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,70 @@
1-
import { Readable } from "node:stream";
2-
import type { ReadableStream } from "node:stream/web";
1+
import { ReadableStream } from "node:stream/web";
32

4-
export function fromReadableStream(
3+
export async function fromReadableStream(
54
stream: ReadableStream<Uint8Array>,
65
base64?: boolean,
76
): Promise<string> {
8-
const reader = stream.getReader();
97
const chunks: Uint8Array[] = [];
8+
let totalLength = 0;
109

11-
return new Promise((resolve, reject) => {
12-
function pump() {
13-
reader
14-
.read()
15-
.then(({ done, value }) => {
16-
if (done) {
17-
resolve(Buffer.concat(chunks).toString(base64 ? "base64" : "utf8"));
18-
return;
19-
}
20-
chunks.push(value);
21-
pump();
22-
})
23-
.catch(reject);
24-
}
25-
pump();
26-
});
10+
for await (const chunk of stream) {
11+
chunks.push(chunk);
12+
totalLength += chunk.length;
13+
}
14+
15+
if (chunks.length === 0) {
16+
return "";
17+
}
18+
19+
if (chunks.length === 1) {
20+
return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8");
21+
}
22+
23+
// Pre-allocate buffer with exact size to avoid reallocation
24+
const buffer = Buffer.alloc(totalLength);
25+
let offset = 0;
26+
for (const chunk of chunks) {
27+
buffer.set(chunk, offset);
28+
offset += chunk.length;
29+
}
30+
31+
return buffer.toString(base64 ? "base64" : "utf8");
2732
}
2833

2934
export function toReadableStream(
3035
value: string,
3136
isBase64?: boolean,
3237
): ReadableStream {
33-
return Readable.toWeb(
34-
Readable.from(Buffer.from(value, isBase64 ? "base64" : "utf8")),
38+
return new ReadableStream(
39+
{
40+
pull(controller) {
41+
// Defer the Buffer.from conversion to when the stream is actually read.
42+
controller.enqueue(Buffer.from(value, isBase64 ? "base64" : "utf8"));
43+
controller.close();
44+
},
45+
},
46+
{ highWaterMark: 0 },
3547
);
3648
}
3749

50+
let maybeSomethingBuffer: Buffer | undefined;
51+
3852
export function emptyReadableStream(): ReadableStream {
3953
if (process.env.OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE === "true") {
40-
return Readable.toWeb(Readable.from([Buffer.from("SOMETHING")]));
54+
return new ReadableStream(
55+
{
56+
pull(controller) {
57+
maybeSomethingBuffer ??= Buffer.from("SOMETHING");
58+
controller.enqueue(maybeSomethingBuffer);
59+
controller.close();
60+
},
61+
},
62+
{ highWaterMark: 0 },
63+
);
4164
}
42-
return Readable.toWeb(Readable.from([]));
65+
return new ReadableStream({
66+
start(controller) {
67+
controller.close();
68+
},
69+
});
4370
}

0 commit comments

Comments
 (0)