Skip to content

Commit 7136583

Browse files
committed
fix(middleware-flexible-checksums): buffer stream chunks to minimum required size
1 parent ebd03ac commit 7136583

File tree

8 files changed

+216
-15
lines changed

8 files changed

+216
-15
lines changed

packages/middleware-flexible-checksums/src/configuration.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,9 @@ export interface PreviouslyResolved {
6666
* Collects streams into buffers.
6767
*/
6868
streamCollector: StreamCollector;
69+
70+
/**
71+
* Minimum bytes from a stream to buffer into a chunk before passing to chunked encoding.
72+
*/
73+
requestStreamBufferSize: number;
6974
}

packages/middleware-flexible-checksums/src/flexibleChecksumsMiddleware.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
HandlerExecutionContext,
1010
MetadataBearer,
1111
} from "@smithy/types";
12+
import { createBufferedReadable } from "@smithy/util-stream";
1213

1314
import { PreviouslyResolved } from "./configuration";
1415
import { ChecksumAlgorithm, DEFAULT_CHECKSUM_ALGORITHM, RequestChecksumCalculation } from "./constants";
@@ -119,13 +120,18 @@ export const flexibleChecksumsMiddleware =
119120
const checksumAlgorithmFn = selectChecksumAlgorithmFunction(checksumAlgorithm, config);
120121
if (isStreaming(requestBody)) {
121122
const { getAwsChunkedEncodingStream, bodyLengthChecker } = config;
122-
updatedBody = getAwsChunkedEncodingStream(requestBody, {
123-
base64Encoder,
124-
bodyLengthChecker,
125-
checksumLocationName,
126-
checksumAlgorithmFn,
127-
streamHasher,
128-
});
123+
updatedBody = getAwsChunkedEncodingStream(
124+
config.requestStreamBufferSize
125+
? createBufferedReadable(requestBody, config.requestStreamBufferSize, context.logger)
126+
: requestBody,
127+
{
128+
base64Encoder,
129+
bodyLengthChecker,
130+
checksumLocationName,
131+
checksumAlgorithmFn,
132+
streamHasher,
133+
}
134+
);
129135
updatedHeaders = {
130136
...headers,
131137
"content-encoding": headers["content-encoding"]

packages/middleware-flexible-checksums/src/middleware-flexible-checksums.e2e.spec.ts

Lines changed: 144 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import { S3 } from "@aws-sdk/client-s3";
1+
import { S3, UploadPartCommandOutput } from "@aws-sdk/client-s3";
2+
import { Upload } from "@aws-sdk/lib-storage";
3+
import { FetchHttpHandler } from "@smithy/fetch-http-handler";
24
import type { HttpRequest, HttpResponse } from "@smithy/types";
3-
import { headStream } from "@smithy/util-stream";
5+
import { ChecksumStream, headStream } from "@smithy/util-stream";
46
import { Readable } from "node:stream";
5-
import { beforeAll, describe, expect, test as it } from "vitest";
7+
import { beforeAll, describe, expect, test as it, vi } from "vitest";
68

79
import { getIntegTestResources } from "../../../tests/e2e/get-integ-test-resources";
810

@@ -13,6 +15,26 @@ describe("S3 checksums", () => {
1315
let Key: string;
1416
let region: string;
1517
const expected = new Uint8Array([97, 98, 99, 100]);
18+
const logger = {
19+
debug: vi.fn(),
20+
info: vi.fn(),
21+
warn: vi.fn(),
22+
error: vi.fn(),
23+
};
24+
25+
function stream(size: number, chunkSize: number) {
26+
async function* generate() {
27+
while (size > 0) {
28+
const z = Math.min(size, chunkSize);
29+
yield "a".repeat(z);
30+
size -= z;
31+
}
32+
}
33+
return Readable.from(generate());
34+
}
35+
function webStream(size: number, chunkSize: number) {
36+
return Readable.toWeb(stream(size, chunkSize)) as unknown as ReadableStream;
37+
}
1638

1739
beforeAll(async () => {
1840
const integTestResourcesEnv = await getIntegTestResources();
@@ -21,7 +43,7 @@ describe("S3 checksums", () => {
2143
region = process?.env?.AWS_SMOKE_TEST_REGION as string;
2244
Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string;
2345

24-
s3 = new S3({ region });
46+
s3 = new S3({ logger, region, requestStreamBufferSize: 8 * 1024 });
2547
s3_noChecksum = new S3({
2648
region,
2749
requestChecksumCalculation: "WHEN_REQUIRED",
@@ -38,7 +60,7 @@ describe("S3 checksums", () => {
3860
expect(reqHeader).toEqual("CRC32");
3961
}
4062
if (resHeader) {
41-
expect(resHeader).toEqual("7YLNEQ==");
63+
expect(resHeader.length).toBeGreaterThanOrEqual(8);
4264
}
4365
return r;
4466
},
@@ -53,9 +75,125 @@ describe("S3 checksums", () => {
5375
});
5476

5577
it("an object should have checksum by default", async () => {
56-
await s3.getObject({ Bucket, Key });
78+
const get = await s3.getObject({ Bucket, Key });
79+
expect(get.Body).toBeInstanceOf(ChecksumStream);
5780
});
5881

82+
describe("PUT operations", () => {
83+
it("should assist user input streams by buffering to the minimum 8kb required by S3", async () => {
84+
await s3.putObject({
85+
Bucket,
86+
Key: Key + "small-chunks",
87+
Body: stream(24 * 1024, 8),
88+
ContentLength: 24 * 1024,
89+
});
90+
expect(logger.warn).toHaveBeenCalledWith(
91+
`@smithy/util-stream - stream chunk size 8 is below threshold of 8192, automatically buffering.`
92+
);
93+
const get = await s3.getObject({
94+
Bucket,
95+
Key: Key + "small-chunks",
96+
});
97+
expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(24 * 1024);
98+
});
99+
it("should be able to write an object with a webstream body (using fetch handler without checksum)", async () => {
100+
const handler = s3_noChecksum.config.requestHandler;
101+
s3_noChecksum.config.requestHandler = new FetchHttpHandler();
102+
await s3_noChecksum.putObject({
103+
Bucket,
104+
Key: Key + "small-chunks-webstream",
105+
Body: webStream(24 * 1024, 512),
106+
ContentLength: 24 * 1024,
107+
});
108+
s3_noChecksum.config.requestHandler = handler;
109+
const get = await s3.getObject({
110+
Bucket,
111+
Key: Key + "small-chunks-webstream",
112+
});
113+
expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(24 * 1024);
114+
});
115+
it("@aws-sdk/lib-storage Upload should allow webstreams to be used", async () => {
116+
await new Upload({
117+
client: s3,
118+
params: {
119+
Bucket,
120+
Key: Key + "small-chunks-webstream-mpu",
121+
Body: webStream(6 * 1024 * 1024, 512),
122+
},
123+
}).done();
124+
const get = await s3.getObject({
125+
Bucket,
126+
Key: Key + "small-chunks-webstream-mpu",
127+
});
128+
expect((await get.Body?.transformToByteArray())?.byteLength).toEqual(6 * 1024 * 1024);
129+
});
130+
it("should allow streams to be used in a manually orchestrated MPU", async () => {
131+
const cmpu = await s3.createMultipartUpload({
132+
Bucket,
133+
Key: Key + "-mpu",
134+
});
135+
136+
const MB = 1024 * 1024;
137+
const up = [] as UploadPartCommandOutput[];
138+
139+
try {
140+
up.push(
141+
await s3.uploadPart({
142+
Bucket,
143+
Key: Key + "-mpu",
144+
UploadId: cmpu.UploadId,
145+
Body: stream(5 * MB, 1024),
146+
PartNumber: 1,
147+
ContentLength: 5 * MB,
148+
}),
149+
await s3.uploadPart({
150+
Bucket,
151+
Key: Key + "-mpu",
152+
UploadId: cmpu.UploadId,
153+
Body: stream(MB, 64),
154+
PartNumber: 2,
155+
ContentLength: MB,
156+
})
157+
);
158+
expect(logger.warn).toHaveBeenCalledWith(
159+
`@smithy/util-stream - stream chunk size 1024 is below threshold of 8192, automatically buffering.`
160+
);
161+
expect(logger.warn).toHaveBeenCalledWith(
162+
`@smithy/util-stream - stream chunk size 64 is below threshold of 8192, automatically buffering.`
163+
);
164+
165+
await s3.completeMultipartUpload({
166+
Bucket,
167+
Key: Key + "-mpu",
168+
UploadId: cmpu.UploadId,
169+
MultipartUpload: {
170+
Parts: up.map((part, i) => {
171+
return {
172+
PartNumber: i + 1,
173+
ETag: part.ETag,
174+
};
175+
}),
176+
},
177+
});
178+
179+
const go = await s3.getObject({
180+
Bucket,
181+
Key: Key + "-mpu",
182+
});
183+
expect((await go.Body?.transformToByteArray())?.byteLength).toEqual(6 * MB);
184+
185+
expect(go.$metadata.httpStatusCode).toEqual(200);
186+
} catch (e) {
187+
await s3.abortMultipartUpload({
188+
UploadId: cmpu.UploadId,
189+
Bucket,
190+
Key: Key + "-mpu",
191+
});
192+
throw e;
193+
}
194+
});
195+
}, 45_000);
196+
59197
describe("the stream returned by S3::getObject should function interchangeably between ChecksumStream and default streams", () => {
60198
it("when collecting the stream", async () => {
61199
const defaultStream = (await s3_noChecksum.getObject({ Bucket, Key })).Body as Readable;

packages/middleware-flexible-checksums/src/resolveFlexibleChecksumsConfig.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,25 @@ export interface FlexibleChecksumsInputConfig {
1818
* Determines when checksum validation will be performed on response payloads.
1919
*/
2020
responseChecksumValidation?: ResponseChecksumValidation | Provider<ResponseChecksumValidation>;
21+
22+
/**
23+
* Default 65536.
24+
*
25+
* Minimum number of bytes to buffer into a chunk when processing input streams
26+
* with chunked encoding (that is, when request checksums are enabled).
27+
* A minimum of 8kb = 8 * 1024 is required, and 64kb or higher is recommended.
28+
*
29+
* See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html.
30+
*
31+
* To turn off this feature, configure the value to zero or false.
32+
*/
33+
requestStreamBufferSize?: number | false;
2134
}
2235

2336
export interface FlexibleChecksumsResolvedConfig {
2437
requestChecksumCalculation: Provider<RequestChecksumCalculation>;
2538
responseChecksumValidation: Provider<ResponseChecksumValidation>;
39+
requestStreamBufferSize: number;
2640
}
2741

2842
export const resolveFlexibleChecksumsConfig = <T>(
@@ -35,4 +49,5 @@ export const resolveFlexibleChecksumsConfig = <T>(
3549
responseChecksumValidation: normalizeProvider(
3650
input.responseChecksumValidation ?? DEFAULT_RESPONSE_CHECKSUM_VALIDATION
3751
),
52+
requestStreamBufferSize: Number(input.requestStreamBufferSize ?? 64 * 1024),
3853
});

packages/middleware-sdk-s3/src/check-content-length-header.spec.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,24 @@ describe("checkContentLengthHeaderMiddleware", () => {
130130

131131
expect(spy).not.toHaveBeenCalled();
132132
});
133+
134+
it("does not warn if uploading a payload of known length via alternate header x-amz-decoded-content-length", async () => {
135+
const handler = checkContentLengthHeader()(mockNextHandler, {});
136+
137+
await handler({
138+
request: {
139+
method: null,
140+
protocol: null,
141+
hostname: null,
142+
path: null,
143+
query: {},
144+
headers: {
145+
"x-amz-decoded-content-length": "5",
146+
},
147+
},
148+
input: {},
149+
});
150+
151+
expect(spy).not.toHaveBeenCalled();
152+
});
133153
});

packages/middleware-sdk-s3/src/check-content-length-header.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
} from "@smithy/types";
1313

1414
const CONTENT_LENGTH_HEADER = "content-length";
15+
const DECODED_CONTENT_LENGTH_HEADER = "x-amz-decoded-content-length";
1516

1617
/**
1718
* @internal
@@ -28,7 +29,7 @@ export function checkContentLengthHeader(): FinalizeRequestMiddleware<any, any>
2829
const { request } = args;
2930

3031
if (HttpRequest.isInstance(request)) {
31-
if (!(CONTENT_LENGTH_HEADER in request.headers)) {
32+
if (!(CONTENT_LENGTH_HEADER in request.headers) && !(DECODED_CONTENT_LENGTH_HEADER in request.headers)) {
3233
const message = `Are you using a Stream of unknown length as the Body of a PutObject request? Consider using Upload instead from @aws-sdk/lib-storage.`;
3334
if (typeof context?.logger?.warn === "function" && !(context.logger instanceof NoOpLogger)) {
3435
context.logger.warn(message);

private/aws-client-api-test/src/client-interface-tests/client-s3/impl/initializeWithMaximalConfiguration.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ export const initializeWithMaximalConfiguration = () => {
130130
requestChecksumCalculation: DEFAULT_REQUEST_CHECKSUM_CALCULATION,
131131
responseChecksumValidation: DEFAULT_RESPONSE_CHECKSUM_VALIDATION,
132132
userAgentAppId: "testApp",
133+
requestStreamBufferSize: 8 * 1024,
133134
};
134135

135136
const s3 = new S3Client(config);

supplemental-docs/CLIENTS.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,8 @@ See also https://aws.amazon.com/blogs/developer/middleware-stack-modular-aws-sdk
753753
754754
### S3
755755
756-
`followRegionRedirects`:
756+
#### `followRegionRedirects`:
757+
757758
This feature was previously called the S3 Global Client. Setting this option to true enables failed requests to be retried with a corrected region when receiving a permanent redirect error with status 301. Note that this can result in additional latency owing to the retried request. This feature should only be used as a last resort if you do not know the region of your bucket(s) ahead of time.
758759
759760
```ts
@@ -763,6 +764,20 @@ new S3Client({
763764
});
764765
```
765766
767+
#### `requestChecksumCalculation` and `responseChecksumValidation`:
768+
769+
These may be set to `WHEN_REQUIRED` or `WHEN_SUPPORTED`. See https://github.com/aws/aws-sdk-js-v3/issues/6810.
770+
771+
#### `requestStreamBufferSize`:
772+
773+
This has a default value of `64 * 1024` bytes, or 64kb. This only comes into play when request checksums are enabled.
774+
When this is enabled, user input streams that emit chunks having size less than the value configured will be buffered
775+
until the buffer exceeds the desired size before continuing to flow.
776+
777+
This may be set to `false` to disable, or a numeric byte size.
778+
779+
See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html.
780+
766781
### SQS
767782
768783
#### Using Queue Names with SQS Client

0 commit comments

Comments
 (0)