Skip to content

Commit efedb20

Browse files
authored
fix(util-stream): handle empty upstreams (#1525)
1 parent cc1095b commit efedb20

7 files changed

+154
-6
lines changed

.changeset/red-pumpkins-pretend.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@smithy/util-stream": patch
3+
---
4+
5+
handle case of empty upstream

packages/util-stream/src/createBufferedReadable.spec.ts

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,71 @@ describe("Buffered Readable stream", () => {
2323
}
2424
return Readable.from(generate());
2525
}
26+
27+
function patternedByteStream(size: number, chunkSize: number) {
28+
let n = 0;
29+
const data = Array(size);
30+
for (let i = 0; i < size; ++i) {
31+
data[i] = n++ % 255;
32+
}
33+
let dataCursor = 0;
34+
35+
async function* generate() {
36+
while (size > 0) {
37+
const z = Math.min(size, chunkSize);
38+
const bytes = new Uint8Array(data.slice(dataCursor, dataCursor + z));
39+
size -= z;
40+
dataCursor += z;
41+
yield bytes;
42+
}
43+
}
44+
return {
45+
stream: Readable.from(size === 0 ? Buffer.from("") : generate()),
46+
array: new Uint8Array(data),
47+
};
48+
}
49+
2650
const logger = {
2751
debug: vi.fn(),
2852
info: vi.fn(),
2953
warn: vi.fn(),
3054
error() {},
3155
};
3256

57+
const KB = 1024;
58+
59+
const dataSizes = [0, 10, 101, 1_001, 10_001, 100_001];
60+
const chunkSizes = [1, 8, 16, 32, 64, 128, 1024, 8 * 1024, 64 * 1024, 1024 * 1024];
61+
const bufferSizes = [0, 1024, 8 * 1024, 32 * 1024, 64 * 1024, 1024 * 1024];
62+
63+
for (const dataSize of dataSizes) {
64+
for (const chunkSize of chunkSizes) {
65+
for (const bufferSize of bufferSizes) {
66+
it(`should maintain data integrity for data=${dataSize} chunk=${chunkSize} min-buffer=${bufferSize}`, async () => {
67+
const { stream, array } = patternedByteStream(dataSize, chunkSize);
68+
const bufferedStream = createBufferedReadable(stream, bufferSize);
69+
const collected = await headStream(bufferedStream, Infinity);
70+
expect(collected).toEqual(array);
71+
});
72+
}
73+
}
74+
}
75+
76+
for (const [dataSize, chunkSize, bufferSize] of [
77+
[10 * KB, 1 * KB, 0 * KB],
78+
[10 * KB, 1 * KB, 1 * KB],
79+
[10 * KB, 1 * KB, 2.1 * KB],
80+
[10 * KB, 1 * KB, 4 * KB],
81+
[10 * KB, 2 * KB, 1 * KB],
82+
]) {
83+
it(`should maintain data integrity for data=${dataSize} chunk=${chunkSize} min-buffer=${bufferSize}`, async () => {
84+
const { stream, array } = patternedByteStream(dataSize, chunkSize);
85+
const bufferedStream = createBufferedReadable(stream, bufferSize);
86+
const collected = await headStream(bufferedStream, Infinity);
87+
expect(collected).toEqual(array);
88+
});
89+
}
90+
3391
it("should join upstream chunks if they are too small (stringStream)", async () => {
3492
const upstream = stringStream(1024, 8);
3593
const downstream = createBufferedReadable(upstream, 64);
@@ -73,4 +131,4 @@ describe("Buffered Readable stream", () => {
73131
expect(downstreamChunkCount).toEqual(22);
74132
expect(logger.warn).toHaveBeenCalled();
75133
});
76-
});
134+
}, 30_000);

packages/util-stream/src/createBufferedReadable.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ export function createBufferedReadable(
7070
}
7171
});
7272
upstream.on("end", () => {
73-
downstream.push(flush(buffers, mode));
73+
if (mode !== -1) {
74+
const remainder = flush(buffers, mode);
75+
if (sizeOf(remainder) > 0) {
76+
downstream.push(remainder);
77+
}
78+
}
7479
downstream.push(null);
7580
});
7681

packages/util-stream/src/createBufferedReadableStream.browser.spec.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Readable } from "node:stream";
22
import { describe, expect, test as it, vi } from "vitest";
33

44
import { createBufferedReadable } from "./createBufferedReadableStream";
5+
import { headStream } from "./headStream.browser";
56

67
describe("Buffered ReadableStream", () => {
78
function stringStream(size: number, chunkSize: number) {
@@ -22,7 +23,28 @@ describe("Buffered ReadableStream", () => {
2223
}
2324
return Readable.toWeb(Readable.from(generate()));
2425
}
26+
function patternedByteStream(size: number, chunkSize: number) {
27+
let n = 0;
28+
const data = Array(size);
29+
for (let i = 0; i < size; ++i) {
30+
data[i] = n++ % 255;
31+
}
32+
let dataCursor = 0;
2533

34+
async function* generate() {
35+
while (size > 0) {
36+
const z = Math.min(size, chunkSize);
37+
const bytes = new Uint8Array(data.slice(dataCursor, dataCursor + z));
38+
size -= z;
39+
dataCursor += z;
40+
yield bytes;
41+
}
42+
}
43+
return {
44+
stream: Readable.toWeb(Readable.from(size === 0 ? Buffer.from("") : generate())) as unknown as ReadableStream,
45+
array: new Uint8Array(data),
46+
};
47+
}
2648
const logger = {
2749
debug: vi.fn(),
2850
info: vi.fn(),
@@ -104,4 +126,21 @@ describe("Buffered ReadableStream", () => {
104126
expect(downstreamChunkCount).toEqual(22);
105127
expect(logger.warn).toHaveBeenCalled();
106128
});
129+
130+
const dataSizes = [0, 10, 101, 1_001, 10_001, 100_001];
131+
const chunkSizes = [1, 8, 16, 32, 64, 128, 1024, 8 * 1024, 64 * 1024, 1024 * 1024];
132+
const bufferSizes = [0, 1024, 8 * 1024, 32 * 1024, 64 * 1024, 1024 * 1024];
133+
134+
for (const dataSize of dataSizes) {
135+
for (const chunkSize of chunkSizes) {
136+
for (const bufferSize of bufferSizes) {
137+
it(`should maintain data integrity for data=${dataSize} chunk=${chunkSize} min-buffer=${bufferSize}`, async () => {
138+
const { stream, array } = patternedByteStream(dataSize, chunkSize);
139+
const bufferedStream = createBufferedReadable(stream, bufferSize);
140+
const collected = await headStream(bufferedStream, Infinity);
141+
expect(collected).toEqual(array);
142+
});
143+
}
144+
}
145+
}
107146
});

packages/util-stream/src/createBufferedReadableStream.spec.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Readable } from "node:stream";
22
import { describe, expect, test as it, vi } from "vitest";
33

44
import { createBufferedReadableStream } from "./createBufferedReadableStream";
5+
import { headStream } from "./headStream";
56

67
describe("Buffered ReadableStream", () => {
78
function stringStream(size: number, chunkSize: number) {
@@ -22,7 +23,28 @@ describe("Buffered ReadableStream", () => {
2223
}
2324
return Readable.toWeb(Readable.from(generate()));
2425
}
26+
function patternedByteStream(size: number, chunkSize: number) {
27+
let n = 0;
28+
const data = Array(size);
29+
for (let i = 0; i < size; ++i) {
30+
data[i] = n++ % 255;
31+
}
32+
let dataCursor = 0;
2533

34+
async function* generate() {
35+
while (size > 0) {
36+
const z = Math.min(size, chunkSize);
37+
const bytes = new Uint8Array(data.slice(dataCursor, dataCursor + z));
38+
size -= z;
39+
dataCursor += z;
40+
yield bytes;
41+
}
42+
}
43+
return {
44+
stream: Readable.toWeb(Readable.from(size === 0 ? Buffer.from("") : generate())) as unknown as ReadableStream,
45+
array: new Uint8Array(data),
46+
};
47+
}
2648
const logger = {
2749
debug: vi.fn(),
2850
info: vi.fn(),
@@ -104,4 +126,21 @@ describe("Buffered ReadableStream", () => {
104126
expect(downstreamChunkCount).toEqual(22);
105127
expect(logger.warn).toHaveBeenCalled();
106128
});
129+
130+
const dataSizes = [0, 10, 101, 1_001, 10_001, 100_001];
131+
const chunkSizes = [1, 8, 16, 32, 64, 128, 1024, 8 * 1024, 64 * 1024, 1024 * 1024];
132+
const bufferSizes = [0, 1024, 8 * 1024, 32 * 1024, 64 * 1024, 1024 * 1024];
133+
134+
for (const dataSize of dataSizes) {
135+
for (const chunkSize of chunkSizes) {
136+
for (const bufferSize of bufferSizes) {
137+
it(`should maintain data integrity for data=${dataSize} chunk=${chunkSize} min-buffer=${bufferSize}`, async () => {
138+
const { stream, array } = patternedByteStream(dataSize, chunkSize);
139+
const bufferedStream = createBufferedReadableStream(stream, bufferSize);
140+
const collected = await headStream(bufferedStream, Infinity);
141+
expect(collected).toEqual(array);
142+
});
143+
}
144+
}
145+
}
107146
});

packages/util-stream/src/createBufferedReadableStream.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ export function createBufferedReadableStream(upstream: ReadableStream, size: num
2727
const chunk = value;
2828

2929
if (done) {
30-
const remainder = flush(buffers, mode);
31-
if (sizeOf(remainder) > 0) {
32-
controller.enqueue(remainder);
30+
if (mode !== -1) {
31+
const remainder = flush(buffers, mode);
32+
if (sizeOf(remainder) > 0) {
33+
controller.enqueue(remainder);
34+
}
3335
}
3436
controller.close();
3537
} else {

scripts/check-dependencies.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ const node_libraries = [
6363
...new Set(
6464
[...(contents.toString().match(/(from |import\()"(.*?)";/g) || [])]
6565
.map((_) => _.replace(/from "/g, "").replace(/";$/, ""))
66-
.filter((_) => !_.startsWith(".") && !node_libraries.includes(_))
66+
.filter((_) => !_.startsWith(".") && !node_libraries.includes(_) && !_.startsWith("node:"))
6767
)
6868
);
6969

0 commit comments

Comments
 (0)