Skip to content

Commit 21fdc36

Browse files
authored
Merge branch 'transfer-manager' into add-documentation
2 parents 82cc0b5 + 987d3ad commit 21fdc36

File tree

5 files changed

+77
-59
lines changed

5 files changed

+77
-59
lines changed

lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ describe(S3TransferManager.name, () => {
6161
it(`should download an object of size ${size} with mode ${mode}`, async () => {
6262
const totalSizeMB = size * 1024 * 1024;
6363
const Body = data(totalSizeMB);
64-
const Key = `${mode}-size`;
64+
const Key = `${mode}-${size}`;
6565

6666
await new Upload({
6767
client,
@@ -199,7 +199,7 @@ describe(S3TransferManager.name, () => {
199199
internalEventHandler.afterInitialGetObject = async () => {};
200200
};
201201

202-
await tm.download(
202+
const downloadResponse = await tm.download(
203203
{ Bucket, Key },
204204
{
205205
eventListeners: {
@@ -213,6 +213,7 @@ describe(S3TransferManager.name, () => {
213213
},
214214
}
215215
);
216+
await downloadResponse.Body?.transformToByteArray();
216217
expect.fail("Download should have failed due to ETag mismatch");
217218
} catch (error) {
218219
expect(transferFailed).toBe(true);

lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -431,9 +431,6 @@ export class S3TransferManager implements IS3TransferManager {
431431

432432
let partCount = 1;
433433
if (initialPart.PartsCount! > 1) {
434-
const concurrentRequests = [];
435-
const concurrentRequestInputs = [];
436-
437434
for (let part = 2; part <= initialPart.PartsCount!; part++) {
438435
this.checkAborted(transferOptions);
439436
const getObjectRequest = {
@@ -453,25 +450,16 @@ export class S3TransferManager implements IS3TransferManager {
453450
};
454451
}
455452
return response.Body!;
453+
})
454+
.catch((error) => {
455+
this.dispatchTransferFailedEvent(getObjectRequest, totalSize, error as Error);
456+
throw error;
456457
});
457-
458-
concurrentRequests.push(getObject);
459-
concurrentRequestInputs.push(getObjectRequest);
458+
streams.push(getObject);
459+
requests.push(getObjectRequest);
460460
partCount++;
461461
}
462462

463-
try {
464-
// Add promise streams to streams array ONLY if all are resolved
465-
const responses = await Promise.all(concurrentRequests);
466-
for (let i = 0; i < responses.length; i++) {
467-
streams.push(Promise.resolve(responses[i]));
468-
requests.push(concurrentRequestInputs[i]);
469-
}
470-
} catch (error) {
471-
this.dispatchTransferFailedEvent(request, totalSize, error as Error);
472-
throw error;
473-
}
474-
475463
if (partCount !== initialPart.PartsCount) {
476464
throw new Error(
477465
`The number of parts downloaded (${partCount}) does not match the expected number (${initialPart.PartsCount})`
@@ -600,9 +588,6 @@ export class S3TransferManager implements IS3TransferManager {
600588
remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0;
601589
let actualRequestCount = 1;
602590

603-
const concurrentRequests = [];
604-
const concurrentRequestInputs = [];
605-
606591
while (remainingLength > 0) {
607592
this.checkAborted(transferOptions);
608593

@@ -624,31 +609,21 @@ export class S3TransferManager implements IS3TransferManager {
624609
};
625610
}
626611
return response.Body!;
612+
})
613+
.catch((error) => {
614+
this.dispatchTransferFailedEvent(getObjectRequest, totalSize, error);
615+
throw error;
627616
});
628617

629-
concurrentRequests.push(getObject);
630-
concurrentRequestInputs.push(getObjectRequest);
618+
streams.push(getObject);
619+
requests.push(getObjectRequest);
631620
actualRequestCount++;
632621

633622
left = right + 1;
634623
right = Math.min(left + S3TransferManager.MIN_PART_SIZE - 1, maxRange);
635624
remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0;
636625
}
637626

638-
if (concurrentRequests.length > 0) {
639-
try {
640-
// Add promise streams to streams array ONLY if all are resolved
641-
const responses = await Promise.all(concurrentRequests);
642-
for (let i = 0; i < responses.length; i++) {
643-
streams.push(Promise.resolve(responses[i]));
644-
requests.push(concurrentRequestInputs[i]);
645-
}
646-
} catch (error) {
647-
this.dispatchTransferFailedEvent(request, totalSize, error as Error);
648-
throw error;
649-
}
650-
}
651-
652627
if (expectedRequestCount !== actualRequestCount) {
653628
throw new Error(
654629
`The number of ranged GET requests sent (${actualRequestCount}) does not match the expected number (${expectedRequestCount})`

lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,22 @@ export async function joinStreams(
3434
* @internal
3535
*/
3636
export async function* iterateStreams(
37-
streams: Promise<StreamingBlobPayloadOutputTypes>[],
37+
promises: Promise<StreamingBlobPayloadOutputTypes>[],
3838
eventListeners?: JoinStreamIterationEvents
3939
): AsyncIterable<StreamingBlobPayloadOutputTypes, void, void> {
4040
let bytesTransferred = 0;
4141
let index = 0;
42-
for (const streamPromise of streams) {
43-
const stream = await streamPromise;
42+
for (const streamPromise of promises) {
43+
let stream: Awaited<(typeof promises)[0]>;
44+
try {
45+
stream = await streamPromise;
46+
} catch (e) {
47+
await destroy(promises);
48+
eventListeners?.onFailure?.(e, index);
49+
throw e;
50+
}
51+
4452
if (isReadableStream(stream)) {
45-
// TODO: May need to acquire reader before reaching the stream
4653
const reader = stream.getReader();
4754
try {
4855
while (true) {
@@ -66,3 +73,20 @@ export async function* iterateStreams(
6673
}
6774
eventListeners?.onCompletion?.(bytesTransferred, index - 1);
6875
}
76+
77+
/**
78+
* @internal
79+
*/
80+
async function destroy(promises: Promise<StreamingBlobPayloadOutputTypes>[]): Promise<void> {
81+
await Promise.all(
82+
promises.map(async (streamPromise) => {
83+
return streamPromise
84+
.then((stream) => {
85+
if (isReadableStream(stream)) {
86+
return stream.cancel();
87+
}
88+
})
89+
.catch((e: unknown) => {});
90+
})
91+
);
92+
}

lib/lib-storage/src/s3-transfer-manager/join-streams.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ export async function joinStreams(
2525
});
2626
return sdkStreamMixin(newReadableStream);
2727
} else {
28+
// TODO: The following line is a temp fix to handle error thrown in async iterable.
29+
// We should find a better solution to improve performance.
30+
await Promise.all(streams);
2831
return sdkStreamMixin(Readable.from(iterateStreams(streams, eventListeners)));
2932
}
3033
}
@@ -35,15 +38,22 @@ export async function joinStreams(
3538
* @internal
3639
*/
3740
export async function* iterateStreams(
38-
streams: Promise<StreamingBlobPayloadOutputTypes>[],
41+
promises: Promise<StreamingBlobPayloadOutputTypes>[],
3942
eventListeners?: JoinStreamIterationEvents
4043
): AsyncIterable<StreamingBlobPayloadOutputTypes, void, void> {
4144
let bytesTransferred = 0;
4245
let index = 0;
43-
for (const streamPromise of streams) {
44-
const stream = await streamPromise;
46+
for (const streamPromise of promises) {
47+
let stream: Awaited<(typeof promises)[0]>;
48+
try {
49+
stream = await streamPromise;
50+
} catch (e) {
51+
await destroy(promises);
52+
eventListeners?.onFailure?.(e, index);
53+
throw e;
54+
}
55+
4556
if (isReadableStream(stream)) {
46-
// TODO: May need to acquire reader before reaching the stream
4757
const reader = stream.getReader();
4858
try {
4959
while (true) {
@@ -74,3 +84,23 @@ export async function* iterateStreams(
7484
}
7585
eventListeners?.onCompletion?.(bytesTransferred, index - 1);
7686
}
87+
88+
/**
89+
* @internal
90+
*/
91+
async function destroy(promises: Promise<StreamingBlobPayloadOutputTypes>[]): Promise<void> {
92+
await Promise.all(
93+
promises.map(async (streamPromise) => {
94+
return streamPromise
95+
.then((stream) => {
96+
if (stream instanceof Readable) {
97+
stream.destroy();
98+
return;
99+
} else if (isReadableStream(stream)) {
100+
return stream.cancel();
101+
}
102+
})
103+
.catch((e: unknown) => {});
104+
})
105+
);
106+
}

yarn.lock

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31118,17 +31118,6 @@ __metadata:
3111831118
languageName: node
3111931119
linkType: hard
3112031120

31121-
"aws-sdk-client-mock@npm:^4.1.0":
31122-
version: 4.1.0
31123-
resolution: "aws-sdk-client-mock@npm:4.1.0"
31124-
dependencies:
31125-
"@types/sinon": "npm:^17.0.3"
31126-
sinon: "npm:^18.0.1"
31127-
tslib: "npm:^2.1.0"
31128-
checksum: 10c0/045caad0cff0ffeb08e69849dcae51aac8999163c58d71220bf47a82c237aabab2abf92bf6bf3bd7666e6e8984513c628e01a89eafa46fb230201d6587bc01e9
31129-
languageName: node
31130-
linkType: hard
31131-
3113231121
"aws-sdk-js-v3@workspace:.":
3113331122
version: 0.0.0-use.local
3113431123
resolution: "aws-sdk-js-v3@workspace:."
@@ -31150,7 +31139,6 @@ __metadata:
3115031139
"@typescript-eslint/eslint-plugin": "npm:5.55.0"
3115131140
"@typescript-eslint/parser": "npm:5.55.0"
3115231141
async: "npm:3.2.4"
31153-
aws-sdk-client-mock: "npm:^4.1.0"
3115431142
concurrently: "npm:7.0.0"
3115531143
decomment: "npm:0.9.5"
3115631144
downlevel-dts: "npm:0.10.1"

0 commit comments

Comments
 (0)