Skip to content

Commit 987d3ad

Browse files
authored
Merge pull request #6 from lukachad/debugging
Debugging
2 parents 21dffe4 + 49022b9 commit 987d3ad

File tree

4 files changed

+89
-48
lines changed

4 files changed

+89
-48
lines changed

lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ describe(S3TransferManager.name, () => {
6161
it(`should download an object of size ${size} with mode ${mode}`, async () => {
6262
const totalSizeMB = size * 1024 * 1024;
6363
const Body = data(totalSizeMB);
64-
const Key = `${mode}-size`;
64+
const Key = `${mode}-${size}`;
6565

6666
await new Upload({
6767
client,
@@ -199,7 +199,7 @@ describe(S3TransferManager.name, () => {
199199
internalEventHandler.afterInitialGetObject = async () => {};
200200
};
201201

202-
await tm.download(
202+
const downloadResponse = await tm.download(
203203
{ Bucket, Key },
204204
{
205205
eventListeners: {
@@ -213,6 +213,7 @@ describe(S3TransferManager.name, () => {
213213
},
214214
}
215215
);
216+
await downloadResponse.Body?.transformToByteArray();
216217
expect.fail("Download should have failed due to ETag mismatch");
217218
} catch (error) {
218219
expect(transferFailed).toBe(true);

lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -365,9 +365,6 @@ export class S3TransferManager implements IS3TransferManager {
365365

366366
let partCount = 1;
367367
if (initialPart.PartsCount! > 1) {
368-
const concurrentRequests = [];
369-
const concurrentRequestInputs = [];
370-
371368
for (let part = 2; part <= initialPart.PartsCount!; part++) {
372369
this.checkAborted(transferOptions);
373370
const getObjectRequest = {
@@ -387,25 +384,16 @@ export class S3TransferManager implements IS3TransferManager {
387384
};
388385
}
389386
return response.Body!;
387+
})
388+
.catch((error) => {
389+
this.dispatchTransferFailedEvent(getObjectRequest, totalSize, error as Error);
390+
throw error;
390391
});
391-
392-
concurrentRequests.push(getObject);
393-
concurrentRequestInputs.push(getObjectRequest);
392+
streams.push(getObject);
393+
requests.push(getObjectRequest);
394394
partCount++;
395395
}
396396

397-
try {
398-
// Add promise streams to streams array ONLY if all are resolved
399-
const responses = await Promise.all(concurrentRequests);
400-
for (let i = 0; i < responses.length; i++) {
401-
streams.push(Promise.resolve(responses[i]));
402-
requests.push(concurrentRequestInputs[i]);
403-
}
404-
} catch (error) {
405-
this.dispatchTransferFailedEvent(request, totalSize, error as Error);
406-
throw error;
407-
}
408-
409397
if (partCount !== initialPart.PartsCount) {
410398
throw new Error(
411399
`The number of parts downloaded (${partCount}) does not match the expected number (${initialPart.PartsCount})`
@@ -529,9 +517,6 @@ export class S3TransferManager implements IS3TransferManager {
529517
remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0;
530518
let actualRequestCount = 1;
531519

532-
const concurrentRequests = [];
533-
const concurrentRequestInputs = [];
534-
535520
while (remainingLength > 0) {
536521
this.checkAborted(transferOptions);
537522

@@ -553,31 +538,21 @@ export class S3TransferManager implements IS3TransferManager {
553538
};
554539
}
555540
return response.Body!;
541+
})
542+
.catch((error) => {
543+
this.dispatchTransferFailedEvent(getObjectRequest, totalSize, error);
544+
throw error;
556545
});
557546

558-
concurrentRequests.push(getObject);
559-
concurrentRequestInputs.push(getObjectRequest);
547+
streams.push(getObject);
548+
requests.push(getObjectRequest);
560549
actualRequestCount++;
561550

562551
left = right + 1;
563552
right = Math.min(left + S3TransferManager.MIN_PART_SIZE - 1, maxRange);
564553
remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0;
565554
}
566555

567-
if (concurrentRequests.length > 0) {
568-
try {
569-
// Add promise streams to streams array ONLY if all are resolved
570-
const responses = await Promise.all(concurrentRequests);
571-
for (let i = 0; i < responses.length; i++) {
572-
streams.push(Promise.resolve(responses[i]));
573-
requests.push(concurrentRequestInputs[i]);
574-
}
575-
} catch (error) {
576-
this.dispatchTransferFailedEvent(request, totalSize, error as Error);
577-
throw error;
578-
}
579-
}
580-
581556
if (expectedRequestCount !== actualRequestCount) {
582557
throw new Error(
583558
`The number of ranged GET requests sent (${actualRequestCount}) does not match the expected number (${expectedRequestCount})`

lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { isReadableStream, sdkStreamMixin } from "@smithy/util-stream";
33

44
import { JoinStreamIterationEvents } from "./types";
55

6+
/**
7+
* @internal
8+
*/
69
export async function joinStreams(
710
streams: Promise<StreamingBlobPayloadOutputTypes>[],
811
eventListeners?: JoinStreamIterationEvents
@@ -23,16 +26,26 @@ export async function joinStreams(
2326
}
2427
}
2528

29+
/**
30+
* @internal
31+
*/
2632
export async function* iterateStreams(
27-
streams: Promise<StreamingBlobPayloadOutputTypes>[],
33+
promises: Promise<StreamingBlobPayloadOutputTypes>[],
2834
eventListeners?: JoinStreamIterationEvents
2935
): AsyncIterable<StreamingBlobPayloadOutputTypes, void, void> {
3036
let bytesTransferred = 0;
3137
let index = 0;
32-
for (const streamPromise of streams) {
33-
const stream = await streamPromise;
38+
for (const streamPromise of promises) {
39+
let stream: Awaited<(typeof promises)[0]>;
40+
try {
41+
stream = await streamPromise;
42+
} catch (e) {
43+
await destroy(promises);
44+
eventListeners?.onFailure?.(e, index);
45+
throw e;
46+
}
47+
3448
if (isReadableStream(stream)) {
35-
// TODO: May need to acquire reader before reaching the stream
3649
const reader = stream.getReader();
3750
try {
3851
while (true) {
@@ -56,3 +69,20 @@ export async function* iterateStreams(
5669
}
5770
eventListeners?.onCompletion?.(bytesTransferred, index - 1);
5871
}
72+
73+
/**
74+
* @internal
75+
*/
76+
async function destroy(promises: Promise<StreamingBlobPayloadOutputTypes>[]): Promise<void> {
77+
await Promise.all(
78+
promises.map(async (streamPromise) => {
79+
return streamPromise
80+
.then((stream) => {
81+
if (isReadableStream(stream)) {
82+
return stream.cancel();
83+
}
84+
})
85+
.catch((e: unknown) => {});
86+
})
87+
);
88+
}

lib/lib-storage/src/s3-transfer-manager/join-streams.ts

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import { Readable } from "stream";
44

55
import { JoinStreamIterationEvents } from "./types";
66

7-
// TODO: check all types. needs to join nodejs and browser together
7+
/**
8+
* @internal
9+
*/
810
export async function joinStreams(
911
streams: Promise<StreamingBlobPayloadOutputTypes>[],
1012
eventListeners?: JoinStreamIterationEvents
@@ -21,20 +23,33 @@ export async function joinStreams(
2123
});
2224
return sdkStreamMixin(newReadableStream);
2325
} else {
26+
// TODO: The following line is a temp fix to handle error thrown in async iterable.
27+
// We should find a better solution to improve performance.
28+
await Promise.all(streams);
2429
return sdkStreamMixin(Readable.from(iterateStreams(streams, eventListeners)));
2530
}
2631
}
2732

33+
/**
34+
* @internal
35+
*/
2836
export async function* iterateStreams(
29-
streams: Promise<StreamingBlobPayloadOutputTypes>[],
37+
promises: Promise<StreamingBlobPayloadOutputTypes>[],
3038
eventListeners?: JoinStreamIterationEvents
3139
): AsyncIterable<StreamingBlobPayloadOutputTypes, void, void> {
3240
let bytesTransferred = 0;
3341
let index = 0;
34-
for (const streamPromise of streams) {
35-
const stream = await streamPromise;
42+
for (const streamPromise of promises) {
43+
let stream: Awaited<(typeof promises)[0]>;
44+
try {
45+
stream = await streamPromise;
46+
} catch (e) {
47+
await destroy(promises);
48+
eventListeners?.onFailure?.(e, index);
49+
throw e;
50+
}
51+
3652
if (isReadableStream(stream)) {
37-
// TODO: May need to acquire reader before reaching the stream
3853
const reader = stream.getReader();
3954
try {
4055
while (true) {
@@ -65,3 +80,23 @@ export async function* iterateStreams(
6580
}
6681
eventListeners?.onCompletion?.(bytesTransferred, index - 1);
6782
}
83+
84+
/**
85+
* @internal
86+
*/
87+
async function destroy(promises: Promise<StreamingBlobPayloadOutputTypes>[]): Promise<void> {
88+
await Promise.all(
89+
promises.map(async (streamPromise) => {
90+
return streamPromise
91+
.then((stream) => {
92+
if (stream instanceof Readable) {
93+
stream.destroy();
94+
return;
95+
} else if (isReadableStream(stream)) {
96+
return stream.cancel();
97+
}
98+
})
99+
.catch((e: unknown) => {});
100+
})
101+
);
102+
}

0 commit comments

Comments
 (0)