diff --git a/.vscode/settings.json b/.vscode/settings.json index 67b515c68994..9f41be91d5d0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,5 +11,6 @@ "source.fixAll.eslint": "explicit" }, "typescript.tsdk": "node_modules/typescript/lib", - "vitest.disableWorkspaceWarning": true + "vitest.disableWorkspaceWarning": true, + "java.configuration.updateBuildConfiguration": "interactive" } diff --git a/lib/lib-storage/package.json b/lib/lib-storage/package.json index 92376bc6c283..d246bbf7984b 100644 --- a/lib/lib-storage/package.json +++ b/lib/lib-storage/package.json @@ -15,8 +15,10 @@ "clean": "rimraf ./dist-* && rimraf *.tsbuildinfo", "extract:docs": "api-extractor run --local", "test": "yarn g:vitest run", - "test:e2e": "yarn g:vitest run -c vitest.config.e2e.ts --mode development", "test:watch": "yarn g:vitest watch", + "test:browser": "yarn g:vitest run -c vitest.config.browser.ts", + "test:browser:watch": "yarn g:vitest watch -c vitest.config.browser.ts", + "test:e2e": "yarn g:vitest run -c vitest.config.e2e.ts --mode development", "test:e2e:watch": "yarn g:vitest watch -c vitest.config.e2e.ts" }, "engines": { @@ -59,6 +61,7 @@ }, "browser": { "./dist-es/runtimeConfig": "./dist-es/runtimeConfig.browser", + "./dist-es/s3-transfer-manager/join-streams": "./dist-es/s3-transfer-manager/join-streams.browser", "fs": false, "stream": "stream-browserify" }, diff --git a/lib/lib-storage/src/index.ts b/lib/lib-storage/src/index.ts index 4a6222b183e6..b4b0787ddb15 100644 --- a/lib/lib-storage/src/index.ts +++ b/lib/lib-storage/src/index.ts @@ -1,2 +1,3 @@ export * from "./Upload"; +export * from "./s3-transfer-manager/index"; export * from "./types"; diff --git a/lib/lib-storage/src/lib-storage.e2e.spec.ts b/lib/lib-storage/src/lib-storage.e2e.spec.ts index deb2c75bcb3b..ff29627e101b 100644 --- a/lib/lib-storage/src/lib-storage.e2e.spec.ts +++ b/lib/lib-storage/src/lib-storage.e2e.spec.ts @@ -6,7 +6,8 @@ import { afterAll, beforeAll, describe, expect, test as it } from "vitest"; import { getIntegTestResources } from "../../../tests/e2e/get-integ-test-resources"; -describe("@aws-sdk/lib-storage", () => { +// todo(s3-transfer-manager): unskip +describe.skip("@aws-sdk/lib-storage", () => { describe.each([undefined, "WHEN_REQUIRED", "WHEN_SUPPORTED"])( "requestChecksumCalculation: %s", (requestChecksumCalculation) => { diff --git a/lib/lib-storage/src/s3-transfer-manager/README.md b/lib/lib-storage/src/s3-transfer-manager/README.md new file mode 100644 index 000000000000..08b32a902e55 --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/README.md @@ -0,0 +1,560 @@ +# @aws-sdk/lib-storage/s3-transfer-manager + +> 🚧 **Package Currently Under Development** + +[![NPM version](https://img.shields.io/npm/v/@aws-sdk/lib-storage/latest.svg)](https://www.npmjs.com/package/@aws-sdk/lib-storage) +[![NPM downloads](https://img.shields.io/npm/dm/@aws-sdk/lib-storage.svg)](https://www.npmjs.com/package/@aws-sdk/lib-storage) + +# Overview + +S3TransferManager is a high level library that helps users interact with S3 +for their most common use cases that involve multiple API operations through SDK JS V3. +S3TransferManager provides the following features: + +- automatic [multipart upload](#upload) to S3 +- automatic [multipart download](#download) from S3 +- upload all files in a directory to an S3 bucket recursively or non-recursively (see [upload all](#uploadall)) +- download all objects in a bucket to a local directory recursively or non-recursively (see [download all](#downloadall)) +- transfer progress listener (see [Event Listeners](#event-listeners)) + +## Installation + +`npm install @aws-sdk/lib-storage` + +# Getting Started + +### Import + +To begin using `S3TransferManager`, you must import it through `@aws-sdk/lib-storage`. You can also specify your own `S3Client` to use with `S3TransferManager`. Example: + +```js +import { S3Client } from "@aws-sdk/client-s3"; +import { S3TransferManager } from "@aws-sdk/lib-storage"; +``` + +### Creating a TransferManager Instance + +When creating an instance, takes an optional `S3TransferManagerConfig` object (see [Constructor Options](#constructor-options)). Minimal instantiation of a `S3TransferManager`: + +```js +// Create S3 client +const s3Client = new S3Client({ region: "us-east-1" }); + +// Create transfer manager +const tm = new S3TransferManager({ + s3ClientInstance: s3Client, +}); +``` + +### Basic Usage + +Basic use of `download()` (await required): + +```js +const download = await tm.download({ + Bucket, + Key, +}); + +const data = await download.Body?.transformToByteArray(); +console.log(`Downloaded ${data.byteLength} bytes`); +``` + +## Configuration + +- **s3ClientInstance** - The S3 client instance to use for requests +- **targetPartSizeBytes** - Target part size for multipart transfers (default: 8MB) +- **multipartUploadThresholdBytes** - Size threshold for multipart upload (default: 16MB) +- **checksumValidationEnabled** - Enable/disable checksum validation for downloads (default: true) +- **multipartDownloadType** - Download strategy: "RANGE" or "PART" (default: "RANGE") +- [**eventListeners**](#event-listeners) - Transfer progress listeners + +### Constructor Options + +The S3TransferManager constructor accepts an optional `S3TransferManagerConfig` object with the following optional properties: + +| Option | Type | Default | Description | +| ------------------------------- | ------------------------ | ----------------- | ------------------------------------------------- | +| `s3ClientInstance` | `S3Client` | `new S3Client()` | S3 client instance for API calls | +| `targetPartSizeBytes` | `number` | `8388608` (8MB) | Target size for each part in multipart operations | +| `multipartUploadThresholdBytes` | `number` | `16777216` (16MB) | File size threshold to trigger multipart upload | +| `checksumValidationEnabled` | `boolean` | `true` | Enable checksum validation for data integrity | +| `checksumAlgorithm` | `ChecksumAlgorithm` | `"CRC32"` | Algorithm used for checksum calculation | +| `multipartDownloadType` | `"PART" \| "RANGE"` | `"PART"` | Strategy for multipart downloads | +| `eventListeners` | `TransferEventListeners` | `{}` | Event listeners for transfer progress | + +**Example:** + +```js +const myInitiatedHandler = ({ request }) => { + console.log(`Started: ${request.Key}`); +}; + +const myProgressHandler = ({ snapshot }) => { + const percent = snapshot.totalBytes ? (snapshot.transferredBytes / snapshot.totalBytes) * 100 : 0; + console.log(`Progress: ${percent.toFixed(1)}%`); +}; + +// Transfer Manager with optional config properties +const tm = new S3TransferManager({ + s3ClientInstance: new S3Client({ region: "us-west-2" }), + targetPartSizeBytes: 10 * 1024 * 1024, // 10MB + multipartUploadThresholdBytes: 20 * 1024 * 1024, // 20MB + checksumValidationEnabled: false, + checksumAlgorithm: "SHA256", + multipartDownloadType: "RANGE", + eventListeners: { + transferInitiated: [myInitiatedHandler], + bytesTransferred: [myProgressHandler], + }, +}); +``` + +# Methods + +## upload() + +> 🚧 **Under Development** +> +> Documentation will be available when this feature is implemented. + +## download() + +Downloads objects from S3 using multipart download with two modes: + +**PART Mode:** + +- Optimized for objects uploaded via multipart upload. +- Uses S3's native PartNumber parameter to download parts concurrently. + +**RANGE Mode:** + +- Works with any S3 object regardless of upload method. +- Uses HTTP Range headers to split objects into chunks for concurrent download. + +Both modes join separate streams into a single stream and support Readable/ReadableStream for Node.js and browsers. + +**Parameters:** + +- `Bucket` (required) - S3 bucket name +- `Key` (required) - Object key/path +- `Range` - Byte range for partial downloads (e.g., "bytes=0-1023") + +**Transfer Options:** + +- `abortSignal` - AbortController signal for cancellation +- `eventListeners` - Progress tracking callbacks + +> For complete parameter list, see [GetObjectCommandInput](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/s3/command/GetObjectCommand/) documentation. + +**Features:** + +- AbortController support for cancellation +- Event listeners: `transferInitiated`, `bytesTransferred`, `transferComplete`, `transferFailed` +- ETag validation ensures object consistency during download +- Automatic boundary/range validation with error handling + +**Validation:** + +Both modes validate data integrity: + +- **PART**: Validates part boundaries match expected byte ranges. +- **RANGE**: Validates byte ranges match expected values. +- Uses `IfMatch` header with initial ETag to ensure object consistency. +- Throws errors and cancels download on validation failures. + +We do not recommend updating the object you're downloading mid-download as this may throw a [Precondition Failed error](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/http-412-precondition-failed.html). + +### Download Examples: + +**PART Download:** + +```js +// Configure for PART mode +const tm = new S3TransferManager({ + s3ClientInstance: s3Client, + multipartDownloadType: "PART", +}); + +const download = await tm.download({ + Bucket: "my-bucket", + Key: "large-file.zip", +}); + +const data = await download.Body?.transformToByteArray(); +``` + +**RANGE Download:** + +```js +// Configure for RANGE mode +const tm = new S3TransferManager({ + s3ClientInstance: s3Client, + multipartDownloadType: "RANGE", +}); + +const download = await tm.download({ + Bucket: "my-bucket", + Key: "document.pdf", +}); + +const data = await download.Body?.transformToByteArray(); +``` + +**RANGE Download with Specific Range:** + +```js +const tm = new S3TransferManager({ + s3ClientInstance: s3Client, + multipartDownloadType: "RANGE", +}); + +// Download first 1MB only +const download = await tm.download({ + Bucket: "my-bucket", + Key: "video.mp4", + Range: "bytes=0-1048575", +}); + +const data = await download.Body?.transformToByteArray(); +``` + +#### uploadAll() + +> 🚧 **Under Development** +> +> Documentation will be available when this feature is implemented. + +#### downloadAll() + +> 🚧 **Under Development** +> +> Documentation will be available when this feature is implemented. + +## Event Handling + +**Event Types and Data:** + +Event listeners receive a single event object with the following properties: + +- **`transferInitiated`** - Fired once when transfer begins + + - `request` - Original transfer request (Bucket, Key, etc.) + - `snapshot` - Initial progress state (`transferredBytes: 0`, `totalBytes` if known) + +- **`bytesTransferred`** - Fired during transfer progress with each chunk + + - `request` - Original transfer request + - `snapshot` - Current progress (`transferredBytes`, `totalBytes`, `transferredFiles` for directory transfers) + +- **`transferComplete`** - Fired once when transfer succeeds + + - `request` - Original transfer request + - `snapshot` - Final progress state + - `response` - Complete S3 response with metadata + +- **`transferFailed`** - Fired once when transfer fails + - `request` - Original transfer request + - `snapshot` - Progress state at time of failure + +**Creating Callback Functions:** + +Event callbacks receive a single event object. Use destructuring to access specific properties: + +```js +// Basic function - access specific properties +function transferComplete({ request, snapshot, response }) { + console.log(`Transfer completed: ${request.Key}`); + console.log(`Total bytes: ${snapshot.transferredBytes}`); + console.log(`Response status: ${response.$metadata?.httpStatusCode}`); +} + +// Arrow function - inline usage +const progressHandler = ({ snapshot }) => { + const percent = snapshot.totalBytes ? (snapshot.transferredBytes / snapshot.totalBytes) * 100 : 0; + console.log(`Progress: ${percent.toFixed(1)}%`); +}; + +// Object with handleEvent method +const transferLogger = { + handleEvent: ({ request, snapshot }) => { + console.log(`${request.Key}: ${snapshot.transferredBytes} bytes transferred`); + }, +}; +``` + +### addEventListener() + +Registers event listeners for transfer lifecycle monitoring. It uses familiar EventTarget API patterns. + +**Parameters:** + +- `type` - Event type to listen for +- `callback` - Function or object with `handleEvent` method +- `options` - Optional configuration: + - `once: boolean` - Remove listener after first execution + - `signal: AbortSignal` - Auto-remove listener when signal aborts + +**Example:** + +```js +function progressBar({ request, snapshot }) { + const percent = snapshot.totalBytes ? (snapshot.transferredBytes / snapshot.totalBytes) * 100 : 0; + let barLength = percent / 2; + let progressBar = "["; + for (let i = 0; i < 50; i++) { + if (barLength > 0) { + progressBar += "#"; + barLength--; + } else { + progressBar += "-"; + } + } + progressBar += "]"; + + process.stdout.clearLine(0); + process.stdout.cursorTo(0); + process.stdout.write(`Downloading... ${progressBar} ${percent.toFixed(0)}%`); +} + +tm.addEventListener("bytesTransferred", progressBar); + +// One-time listener +tm.addEventListener( + "transferComplete", + (event) => { + console.log(`\nTransfer completed: ${event.request.Key}`); + }, + { once: true } +); +``` + +### removeEventListener() + +Removes a previously registered event listener from the specified event type. + +**Important:** If you plan to remove event listeners during transfer lifecycle, define your callback as a named function or variable as you cannot remove anonymous functions. + +**Parameters:** + +- `type` - Event type to stop listening for +- `callback` - The exact function reference that was previously registered +- `options` - Optional configuration (currently unused) + +**Example:** + +```js +// Can be removed +const progressHandler = (event) => console.log("Progress:", event.snapshot); + +tm.addEventListener("bytesTransferred", progressHandler); +tm.removeEventListener("bytesTransferred", progressHandler); // Works + +// Cannot be removed +tm.addEventListener("bytesTransferred", (event) => console.log("Progress:", event.snapshot)); +tm.removeEventListener("bytesTransferred", (event) => console.log("Progress:", event.snapshot)); // Won't work - different function reference +``` + +### dispatchEvent() + +Dispatches events to registered listeners. Primarily used internally but available for custom event handling. + +**Parameters:** + +- `event` - Event object with `type` property matching registered listeners + +**Returns:** + +- `boolean` - Always returns `true` (follows EventTarget API) + +**Example:** + +```js +const customEvent = new Event("transferInitiated"); +customEvent.snapshot = { transferredBytes: 0, totalBytes: 1000 }; +transferManager.dispatchEvent(customEvent); +``` + +## Transfer Options + +### AbortSignal + +Use the standard AbortController (included in AWS SDK JS V3's HttpHandlerOptions) to cancel downloads at any time during transfer. + +**Timeout-Based Cancellation:** + +```js +const controller = new AbortController(); + +// Auto-cancel after 30 seconds +const timeoutId = setTimeout(() => { + controller.abort(); + console.log("Download timed out"); +}, 30000); + +try { + const download = await tm.download({ Bucket: "my-bucket", Key: "data.json" }, { abortSignal: controller.signal }); + + clearTimeout(timeoutId); // Cancel timeout on success + const data = await download.Body?.transformToByteArray(); +} catch (error) { + clearTimeout(timeoutId); + if (error.name === "AbortError") { + console.log("Operation was aborted"); + } +} +``` + +**User-Triggered Cancellation:** + +```js +const controller = new AbortController(); + +// UI cancel button +document.getElementById("cancelBtn").onclick = () => { + controller.abort(); + console.log("Download cancelled by user"); +}; + +// Start download +try { + const download = await tm.download({ Bucket: "my-bucket", Key: "video.mp4" }, { abortSignal: controller.signal }); + + const data = await download.Body?.transformToByteArray(); + console.log("Download completed"); +} catch (error) { + if (error.name === "AbortError") { + console.log("Download was cancelled"); + } +} +``` + +### Event Listeners + +Event listeners can be configured at two levels: **client-level** (applies to all transfers) and **request-level** (applies to specific transfers). (see [Event Handling](#event-handling)) + +In the following code we will define basic callback functions that will be used in the proceeding examples: + +```js +const downloadingKey = ({ request }) => { + console.log(`Started: ${request.Key}`); +}; + +const progressBar = ({ snapshot }) => { + const percent = snapshot.totalBytes ? (snapshot.transferredBytes / snapshot.totalBytes) * 100 : 0; + console.log(`Progress: ${percent.toFixed(1)}%`); +}; + +const transferComplete = ({ request, snapshot }) => { + console.log(`Completed: ${request.Key} (${snapshot.transferredBytes} bytes)`); +}; + +const transferFailed = ({ request }) => { + console.log(`Failed: ${request.Key}`); +}; +``` + +**Client-Level Event Listeners:** + +You can configure the event listeners when creating your Transfer Manager instance. These listeners apply to all transfers made with this instance. + +```js +const tm = new S3TransferManager({ + s3ClientInstance: s3Client, + multipartDownloadType: "RANGE", + checksumValidationEnabled: true, + eventListeners: { + transferInitiated: [downloadingKey], + bytesTransferred: [progressBar], + transferComplete: [ + { + handleEvent: ({ request, snapshot, response }) => { + console.log(`Transfer completed: ${request.Key}`); + console.log(`Total bytes: ${snapshot.transferredBytes}`); + }, + }, + ], + transferFailed: [transferFailed], + }, +}); + +// All downloads will use these event listeners +const download1 = await tm.download({ Bucket: "my-bucket", Key: "file1.txt" }); +const download2 = await tm.download({ Bucket: "my-bucket", Key: "file2.txt" }); +``` + +**Request-Level Event Listeners:** + +You can add event listeners for individual requests like this. Note adding event listeners at request-level will supplement any event listeners defined at client-level. So if you add the same callback at client and request level they will duplicate when the respective event occurs. + +```js +const download = await tm.download( + { + Bucket: "my-bucket", + Key: "large-file.zip", + Range: `bytes=0-${5 * 1024 * 1024 - 1}`, + }, + { + eventListeners: { + transferInitiated: [downloadingKey], + bytesTransferred: [ + { + handleEvent: ({ request, snapshot }) => { + const percent = snapshot.totalBytes ? (snapshot.transferredBytes / snapshot.totalBytes) * 100 : 0; + console.log(`Progress: ${percent.toFixed(1)}%`); + }, + }, + ], + transferComplete: [transferComplete], + transferFailed: [transferFailed], + }, + } +); +``` + +**Practical Example of Combining Both Levels:** + +Because request-level listeners are added to client-level listeners (not replaced), it allows for global logging plus request-specific handling. + +```js +const globalErrorHandler = ({ request }) => { + console.error(`Global error: ${request.Key} failed`); +}; + +const videoProgressBar = ({ snapshot }) => { + const percent = snapshot.totalBytes ? (snapshot.transferredBytes / snapshot.totalBytes) * 100 : 0; + console.log(`Video download: ${percent.toFixed(1)}%`); +}; + +// Client-level: global logging +const tm = new S3TransferManager({ + s3ClientInstance: s3Client, + eventListeners: { + transferInitiated: [ + { + handleEvent: ({ request }) => { + console.log(`Global: Started ${request.Key}`); + }, + }, + ], + transferFailed: [globalErrorHandler], + }, +}); + +// Request-level: specific progress tracking +const download = await tm.download( + { Bucket: "my-bucket", Key: "video.mp4" }, + { + eventListeners: { + bytesTransferred: [videoProgressBar], // Added to global listeners + transferComplete: [ + { + handleEvent: ({ request, response }) => { + console.log(`Video ${request.Key} completed with status ${response.$metadata?.httpStatusCode}`); + }, + }, + ], + }, + } +); +``` diff --git a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.browser.spec.ts b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.browser.spec.ts new file mode 100644 index 000000000000..e314b2efb2e8 --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.browser.spec.ts @@ -0,0 +1,148 @@ +import { StreamingBlobPayloadOutputTypes } from "@smithy/types"; +import { sdkStreamMixin } from "@smithy/util-stream"; +import { describe, expect, it, vi } from "vitest"; + +import { joinStreams } from "./join-streams.browser"; + +describe("join-streams tests", () => { + const createReadableStreamWithContent = (content: Uint8Array) => + new ReadableStream({ + start(controller) { + controller.enqueue(content); + controller.close(); + }, + }); + + const createEmptyReadableStream = () => + new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const consumeReadableStream = async (stream: any): Promise => { + const reader = stream.getReader(); + const chunks: Uint8Array[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + + return chunks; + }; + + const testCases = [ + { + name: "ReadableStream", + createStream: () => new ReadableStream(), + createWithContent: createReadableStreamWithContent, + createEmpty: createEmptyReadableStream, + consume: consumeReadableStream, + isInstance: (stream: any) => typeof stream.getReader === "function", + }, + ]; + + testCases.forEach(({ name, createStream, createWithContent, createEmpty, consume, isInstance }) => { + describe(`joinStreams() with ${name}`, () => { + it("should return single stream when only one stream is provided", async () => { + const stream = createStream(); + const mixedStream = sdkStreamMixin(stream); + const result = await joinStreams([Promise.resolve(mixedStream as StreamingBlobPayloadOutputTypes)]); + + expect(result).toBeDefined(); + expect(result).not.toBe(stream); + expect(isInstance(result)).toBe(true); + }); + + it("should join multiple streams into a single stream", async () => { + const contents = [ + new Uint8Array([67, 104, 117, 110, 107, 32, 49]), // "Chunk 1" + new Uint8Array([67, 104, 117, 110, 107, 32, 50]), // "Chunk 2" + new Uint8Array([67, 104, 117, 110, 107, 32, 51]), // "Chunk 3" + ]; + + const streams = contents.map((content) => + Promise.resolve(sdkStreamMixin(createWithContent(content)) as StreamingBlobPayloadOutputTypes) + ); + + const joinedStream = await joinStreams(streams); + + const chunks = await consume(joinedStream); + + expect(chunks.length).toBe(contents.length); + chunks.forEach((chunk, i) => { + expect(chunk).toEqual(contents[i]); + }); + }); + + it("should handle consecutive calls of joining multiple streams into a single stream", async () => { + for (let i = 0; i <= 3; i++) { + const contents = [ + new Uint8Array([67, 104, 117, 110, 107, 32, 49]), // "Chunk 1" + new Uint8Array([67, 104, 117, 110, 107, 32, 50]), // "Chunk 2" + new Uint8Array([67, 104, 117, 110, 107, 32, 51]), // "Chunk 3" + ]; + + const streams = contents.map((content) => + Promise.resolve(sdkStreamMixin(createWithContent(content)) as StreamingBlobPayloadOutputTypes) + ); + + const joinedStream = await joinStreams(streams); + + const chunks = await consume(joinedStream); + + expect(chunks.length).toBe(contents.length); + chunks.forEach((chunk, i) => { + expect(chunk).toEqual(contents[i]); + }); + } + }); + + it("should handle streams with no data", async () => { + const streams = [ + Promise.resolve(sdkStreamMixin(createEmpty()) as StreamingBlobPayloadOutputTypes), + Promise.resolve(sdkStreamMixin(createEmpty()) as StreamingBlobPayloadOutputTypes), + ]; + + const joinedStream = await joinStreams(streams); + + const chunks = await consume(joinedStream); + + expect(chunks.length).toBe(0); + }); + + it("should report progress via eventListeners", async () => { + const streams = [ + Promise.resolve( + sdkStreamMixin(createWithContent(new Uint8Array([100, 97, 116, 97]))) as StreamingBlobPayloadOutputTypes + ), // "data" + Promise.resolve( + sdkStreamMixin(createWithContent(new Uint8Array([109, 111, 114, 101]))) as StreamingBlobPayloadOutputTypes + ), // "more" + ]; + + const onBytesSpy = vi.fn(); + const onCompletionSpy = vi.fn(); + + const joinedStream = await joinStreams(streams, { + onBytes: onBytesSpy, + onCompletion: onCompletionSpy, + }); + + await consume(joinedStream); + + expect(onBytesSpy).toHaveBeenCalled(); + expect(onCompletionSpy).toHaveBeenCalledWith(expect.any(Number), 1); + }); + + it("should throw error for unsupported stream types", async () => { + const blob = new Blob(["test"]); + await expect( + joinStreams([Promise.resolve(blob as unknown as StreamingBlobPayloadOutputTypes)]) + ).rejects.toThrow("Unsupported Stream Type"); + }); + }); + }); +}); diff --git a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts new file mode 100644 index 000000000000..fad4e76bc671 --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts @@ -0,0 +1,322 @@ +import { GetObjectCommandOutput, S3 } from "@aws-sdk/client-s3"; +import { beforeAll, describe, expect, test as it } from "vitest"; + +import { getIntegTestResources } from "../../../../tests/e2e/get-integ-test-resources"; +import { Upload } from "../Upload"; +import { internalEventHandler, S3TransferManager } from "./S3TransferManager"; +import type { S3TransferManagerConfig } from "./types"; + +describe(S3TransferManager.name, () => { + const chunk = "01234567"; + + function data(bytes: number) { + let buffer = ""; + while (buffer.length < bytes) { + buffer += chunk; + } + return buffer.slice(0, bytes); + } + + function check(str = "") { + while (str.length > 0) { + expect(str.slice(0, 8)).toEqual(chunk); + str = str.slice(8); + } + } + + let client: S3; + let tmPart: S3TransferManager; + let tmRange: S3TransferManager; + let Bucket: string; + let region: string; + + beforeAll(async () => { + const integTestResourcesEnv = await getIntegTestResources(); + Object.assign(process.env, integTestResourcesEnv); + + region = process?.env?.AWS_SMOKE_TEST_REGION as string; + Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string; + void getIntegTestResources; + + client = new S3({ + region, + }); + tmPart = new S3TransferManager({ + s3ClientInstance: client, + multipartDownloadType: "PART", + }); + tmRange = new S3TransferManager({ + s3ClientInstance: client, + multipartDownloadType: "RANGE", + }); + }, 120_000); + + describe("multi part download", () => { + const modes = ["PART", "RANGE"] as S3TransferManagerConfig["multipartDownloadType"][]; + // 6 = 1 part, 11 = 2 part, 19 = 3 part + const sizes = [6, 11, 19, 0] as number[]; + + for (const mode of modes) { + for (const size of sizes) { + it(`should download an object of size ${size} with mode ${mode}`, async () => { + const totalSizeMB = size * 1024 * 1024; + const Body = data(totalSizeMB); + const Key = `${mode}-${size}`; + + await new Upload({ + client, + params: { Bucket, Key, Body }, + }).done(); + + const tm: S3TransferManager = mode === "PART" ? tmPart : tmRange; + + const expectBasicTransfer = (request: any, snapshot: any) => { + expect(request.Bucket).toEqual(Bucket); + expect(request.Key).toEqual(Key); + expect(snapshot.totalBytes).toEqual(totalSizeMB); + }; + + let bytesTransferred = 0; + let handleEventCalled = false; + const download = await tm.download( + { Bucket, Key }, + { + eventListeners: { + transferInitiated: [ + ({ request, snapshot }) => { + expectBasicTransfer(request, snapshot); + expect(snapshot.transferredBytes).toEqual(0); + }, + ], + bytesTransferred: [ + ({ request, snapshot }) => { + expectBasicTransfer(request, snapshot); + bytesTransferred = snapshot.transferredBytes; + expect(snapshot.transferredBytes).toEqual(bytesTransferred); + }, + ], + transferComplete: [ + ({ request, snapshot, response }) => { + expectBasicTransfer(request, snapshot); + expect(snapshot.transferredBytes).toEqual(totalSizeMB); + expect(response.ETag).toBeDefined(); + expect((response as GetObjectCommandOutput).ContentLength).toEqual(totalSizeMB); + }, + { + handleEvent: (event: any) => { + handleEventCalled = true; + expect(event.request.Bucket).toEqual(Bucket); + expect(event.response).toBeDefined(); + }, + }, + ], + }, + } + ); + const serialized = await download.Body?.transformToString(); + check(serialized); + + expect(download.ContentLength).toEqual(totalSizeMB); + expect(download.ContentRange).toEqual(`bytes 0-${totalSizeMB - 1}/${totalSizeMB}`); + expect(bytesTransferred).toEqual(Body.length); + expect(handleEventCalled).toEqual(true); + }, 60_000); + } + } + }); + + describe("RANGE tests", () => { + const uploadTypes = ["multipart", "single"] as const; + const ranges = ["bytes=0-5242879", "bytes=0-10485759"]; + + for (const uploadType of uploadTypes) { + for (const range of ranges) { + it(`should download ${uploadType} uploaded object with range ${range}`, async () => { + const totalSizeMB = 12 * 1024 * 1024; // 12MB + const Body = data(totalSizeMB); + const Key = `RANGE-${uploadType}-${range.replace(/[^0-9]/g, "")}`; + + // Upload based on type + if (uploadType === "multipart") { + await new Upload({ + client, + params: { Bucket, Key, Body }, + }).done(); + } else { + await client.putObject({ Bucket, Key, Body }); + } + + const tm: S3TransferManager = tmRange; + const rangeEnd = parseInt(range.split("-")[1]); + const expectedBytes = rangeEnd + 1; + + const download = await tm.download({ Bucket, Key, Range: range }); + const serialized = await download.Body?.transformToString(); + check(serialized); + + expect(download.ContentLength).toEqual(expectedBytes); + expect(download.ContentRange).toEqual(`bytes 0-${rangeEnd}/${rangeEnd + 1}`); + }, 60_000); + } + } + }); + + describe("error handling", () => { + const modes = ["PART", "RANGE"] as S3TransferManagerConfig["multipartDownloadType"][]; + + for (const mode of modes) { + it(`should fail when ETag changes during a ${mode} download`, async () => { + const totalSizeMB = 20 * 1024 * 1024; + const Body = data(totalSizeMB); + const Key = `${mode}-etag-test`; + + if (mode === "PART") { + await new Upload({ + client, + params: { Bucket, Key, Body }, + }).done(); + } else { + await client.putObject({ Bucket, Key, Body }); + } + + let transferFailed = false; + const tm: S3TransferManager = mode === "PART" ? tmPart : tmRange; + + try { + internalEventHandler.afterInitialGetObject = async () => { + try { + if (mode === "PART") { + await new Upload({ + client, + params: { Bucket, Key, Body: data(20 * 1024 * 1024 - 8) }, + }).done(); + } else { + await client.putObject({ Bucket, Key, Body: data(20 * 1024 * 1024 - 8) }); + } + } catch (err) { + // ignore errors + } + internalEventHandler.afterInitialGetObject = async () => {}; + }; + + const downloadResponse = await tm.download( + { Bucket, Key }, + { + eventListeners: { + transferInitiated: [], + bytesTransferred: [], + transferFailed: [ + () => { + transferFailed = true; + }, + ], + }, + } + ); + await downloadResponse.Body?.transformToByteArray(); + expect.fail("Download should have failed due to ETag mismatch"); + } catch (error) { + expect(transferFailed).toBe(true); + expect(error.name).toEqual("PreconditionFailed"); + } finally { + internalEventHandler.afterInitialGetObject = async () => {}; + } + }, 60_000); + } + }); + + describe("download with abortController ", () => { + const modes = ["PART"] as S3TransferManagerConfig["multipartDownloadType"][]; + for (const mode of modes) { + it(`should cancel ${mode} download on abort()`, async () => { + const totalSizeMB = 10 * 1024 * 1024; + const Body = data(totalSizeMB); + const Key = `${mode}-size`; + await new Upload({ + client, + params: { Bucket, Key, Body }, + }).done(); + const tm: S3TransferManager = mode === "PART" ? tmPart : tmRange; + const controller = new AbortController(); + try { + await tm.download( + { Bucket, Key }, + { + abortSignal: controller.signal, + eventListeners: { + transferInitiated: [ + () => { + controller.abort(); + }, + ], + }, + } + ); + expect.fail("Download should have been aborted"); + } catch (error) { + expect(error.name).toEqual("AbortError"); + } + }, 60_000); + } + }); + + describe("Required compliance download single object tests", () => { + async function complianceTests( + objectType: "single" | "multipart", + multipartType: "PART" | "RANGE", + range: string | undefined, + partNumber: 2 | undefined + ) { + const Body = data(12 * 1024 * 1024); + const Key = `${objectType}${multipartType}${range}${partNumber}`; + const DEFAULT_PART_SIZE = 8 * 1024 * 1024; + + if (multipartType === "PART") { + await new Upload({ + client, + partSize: DEFAULT_PART_SIZE, + params: { + Bucket, + Key, + Body, + }, + }).done(); + } else { + await client.putObject({ + Bucket, + Key, + Body, + }); + } + + const tm: S3TransferManager = multipartType === "PART" ? tmPart : tmRange; + + const download = await tm.download({ + Bucket, + Key, + Range: range, + PartNumber: partNumber, + }); + const serialized = await download.Body?.transformToString(); + check(serialized); + if (partNumber) { + expect(serialized?.length).toEqual(4 * 1024 * 1024); // Part 1 is 8MB Part 2 is 4MB + } else { + expect(serialized?.length).toEqual(Body.length); + } + } + + it("single object: multipartDownloadType = PART, range = 0-12MB, partNumber = null", async () => { + await complianceTests("single", "PART", `bytes=0-${12 * 1024 * 1024}`, undefined); + }, 60_000); + it("multipart object: multipartDownloadType = RANGE, range = 0-12MB, partNumber = null", async () => { + await complianceTests("multipart", "RANGE", `bytes=0-${12 * 1024 * 1024}`, undefined); + }, 60_000); + it("single object: multipartDownloadType = PART, range = null, partNumber = null", async () => { + await complianceTests("single", "PART", undefined, undefined); + }, 60_000); + it("single object: multipartDownloadType = RANGE, range = null, partNumber = null", async () => { + await complianceTests("single", "RANGE", undefined, undefined); + }, 60_000); + }); +}); diff --git a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.spec.ts b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.spec.ts new file mode 100644 index 000000000000..051e8dfc4bed --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.spec.ts @@ -0,0 +1,849 @@ +import { S3, S3Client } from "@aws-sdk/client-s3"; +import { TransferCompleteEvent, TransferEvent } from "@aws-sdk/lib-storage/dist-types/s3-transfer-manager/types"; +import { StreamingBlobPayloadOutputTypes } from "@smithy/types"; +import { Readable } from "stream"; +import { beforeAll, beforeEach, describe, expect, test as it, vi } from "vitest"; + +import { getIntegTestResources } from "../../../../tests/e2e/get-integ-test-resources"; +import { joinStreams } from "./join-streams"; +import { S3TransferManager } from "./S3TransferManager"; + +describe("S3TransferManager Unit Tests", () => { + let client: S3; + let Bucket: string; + let region: string; + + beforeAll(async () => { + const integTestResourcesEnv = await getIntegTestResources(); + Object.assign(process.env, integTestResourcesEnv); + + region = process?.env?.AWS_SMOKE_TEST_REGION as string; + Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string; + void getIntegTestResources; + + client = new S3({ + region, + responseChecksumValidation: "WHEN_REQUIRED", + }); + }); + + describe("S3TransferManager Constructor", () => { + it("Should create an instance of S3TransferManager with defaults given no parameters", () => { + const tm = new S3TransferManager() as any; + + expect(tm.s3ClientInstance).toBeInstanceOf(S3Client); + expect(tm.targetPartSizeBytes).toBe(8 * 1024 * 1024); + expect(tm.multipartUploadThresholdBytes).toBe(16 * 1024 * 1024); + expect(tm.checksumValidationEnabled).toBe(true); + expect(tm.checksumAlgorithm).toBe("CRC32"); + expect(tm.multipartDownloadType).toBe("PART"); + expect(tm.eventListeners).toEqual({ + transferInitiated: [], + bytesTransferred: [], + transferComplete: [], + transferFailed: [], + }); + }); + + it("Should create an instance of S3TransferManager with all given parameters", () => { + const eventListeners = { + transferInitiated: [() => console.log("transferInitiated")], + bytesTransferred: [() => console.log("bytesTransferred")], + transferComplete: [() => console.log("transferComplete")], + transferFailed: [() => console.log("transferFailed")], + }; + const tm = new S3TransferManager({ + s3ClientInstance: client, + targetPartSizeBytes: 8 * 1024 * 1024, + checksumValidationEnabled: true, + checksumAlgorithm: "CRC32", + multipartDownloadType: "RANGE", + eventListeners: eventListeners, + }) as any; + + expect(tm.s3ClientInstance).toBe(client); + expect(tm.targetPartSizeBytes).toBe(8 * 1024 * 1024); + expect(tm.checksumValidationEnabled).toBe(true); + expect(tm.checksumAlgorithm).toBe("CRC32"); + expect(tm.multipartDownloadType).toBe("RANGE"); + expect(tm.eventListeners).toEqual(eventListeners); + }); + + it("Should throw an error given targetPartSizeBytes smaller than minimum", () => { + expect(() => { + new S3TransferManager({ + targetPartSizeBytes: 2 * 1024 * 1024, + }); + }).toThrow(`targetPartSizeBytes must be at least ${5 * 1024 * 1024} bytes`); + }); + }); + + describe("EventListener functions", () => { + let tm: S3TransferManager; + + function initiated(event: TransferEvent) { + return { + request: event.request, + snapshot: event.snapshot, + }; + } + function transferring(event: TransferEvent) { + return { + request: event.request, + snapshot: event.snapshot, + }; + } + function completed(event: TransferCompleteEvent) { + return { + request: event.request, + snapshot: event.snapshot, + response: event.response, + }; + } + function failed(event: TransferEvent) { + return { + request: event.request, + snapshot: event.snapshot, + }; + } + + beforeEach(async () => { + tm = new S3TransferManager({ + s3ClientInstance: client, + }); + }); + + describe("addEventListener()", () => { + it("Should register callbacks for all supported event types", () => { + tm.addEventListener("transferInitiated", initiated); + tm.addEventListener("bytesTransferred", transferring); + tm.addEventListener("transferComplete", completed); + tm.addEventListener("transferFailed", failed); + + expect((tm as any).eventListeners).toEqual({ + transferInitiated: [initiated], + bytesTransferred: [transferring], + transferComplete: [completed], + transferFailed: [failed], + }); + }); + + it("Should handle registering the same listener multiple times", () => { + const callback1 = vi.fn(); + tm.addEventListener("transferInitiated", callback1); + tm.addEventListener("transferInitiated", callback1); + + expect((tm as any).eventListeners.transferInitiated).toEqual([callback1, callback1]); + }); + + it("Should handle different callbacks for the same event type", () => { + const callback1 = vi.fn(); + const callback2 = vi.fn(); + + tm.addEventListener("bytesTransferred", callback1); + tm.addEventListener("bytesTransferred", callback2); + + expect((tm as any).eventListeners.bytesTransferred).toEqual([callback1, callback2]); + }); + + it("Should handle object-style callbacks", () => { + const objectCallback = { + handleEvent: vi.fn(), + }; + tm.addEventListener("transferInitiated", objectCallback as any); + + expect((tm as any).eventListeners.transferInitiated).toEqual([objectCallback]); + }); + + it("Should handle a mix of object-style callbacks and function for the same event", () => { + const callback = vi.fn(); + const objectCallback = { + handleEvent: vi.fn(), + }; + tm.addEventListener("transferInitiated", objectCallback as any); + tm.addEventListener("transferInitiated", callback); + + expect((tm as any).eventListeners.transferInitiated).toEqual([objectCallback, callback]); + }); + + it("Should throw an error for an invalid event type", () => { + expect(() => { + (tm as any).addEventListener("invalidEvent", initiated); + }).toThrow("Unknown event type: invalidEvent"); + }); + + it("Should handle options.once correctly, running the listener at most once.", () => { + const mockCallback = vi.fn(); + tm.addEventListener("transferInitiated", mockCallback, { once: true }); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + tm.dispatchEvent(event); + + expect(mockCallback).toHaveBeenCalledTimes(1); + }); + + it("Should not add listener if included AbortSignal is aborted", () => { + const controller = new AbortController(); + const callback = vi.fn(); + controller.abort(); + tm.addEventListener("transferInitiated", callback, { signal: controller.signal }); + expect((tm as any).eventListeners.transferInitiated).toEqual([]); + }); + + it("Should remove listener after included AbortSignal was aborted", () => { + const controller = new AbortController(); + const callback = vi.fn(); + tm.addEventListener("transferInitiated", callback, { signal: controller.signal }); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + tm.dispatchEvent(event); + + expect(callback).toHaveBeenCalledTimes(1); + expect((tm as any).eventListeners.transferInitiated).toEqual([callback]); + + controller.abort(); + expect((tm as any).eventListeners.transferInitiated).toEqual([]); + }); + + it("Should clean up abort listeners and store cleanup functions in WeakMap", () => { + const controller = new AbortController(); + const callback = vi.fn(); + + tm.addEventListener("transferInitiated", callback, { signal: controller.signal }); + + expect((tm as any).eventListeners.transferInitiated).toEqual([callback]); + expect((tm as any).abortCleanupFunctions.has(controller.signal)).toBe(true); + + const cleanupFn = (tm as any).abortCleanupFunctions.get(controller.signal); + cleanupFn(); + (tm as any).abortCleanupFunctions.delete(controller.signal); + + expect((tm as any).abortCleanupFunctions.has(controller.signal)).toBe(false); + controller.abort(); + expect((tm as any).eventListeners.transferInitiated).toEqual([callback]); + }); + + it("Should handle boolean options parameter", () => { + tm.addEventListener("transferInitiated", initiated, true); + expect((tm as any).eventListeners.transferInitiated).toContain(initiated); + }); + + it("Should handle null callback", () => { + expect(() => { + (tm as any).addEventListener("transferInitiated", null); + }).not.toThrow(); + }); + + it("Should handle object-style callback with handleEvent", () => { + const objectCallback = { handleEvent: vi.fn() }; + tm.addEventListener("transferInitiated", objectCallback as any); + expect((tm as any).eventListeners.transferInitiated).toContain(objectCallback); + }); + }); + + describe("dispatchEvent()", () => { + it("Should dispatch an event", () => { + const mockCallback = vi.fn(); + tm.addEventListener("bytesTransferred", mockCallback); + + const event = Object.assign(new Event("bytesTransferred"), { + request: {}, + snapshot: {}, + }); + + const result = tm.dispatchEvent(event); + + expect(mockCallback).toHaveBeenCalledTimes(1); + expect(mockCallback).toHaveBeenCalledWith(event); + expect(result).toBe(true); + }); + + it("Should dispatch an event with request, snapshot, and response information", () => { + const mockCompleted = vi.fn().mockImplementation(completed); + tm.addEventListener("transferComplete", mockCompleted); + + const event = Object.assign(new Event("transferComplete"), { + request: { bucket: "test" }, + snapshot: { bytes: 100 }, + response: { status: "success" }, + }); + + tm.dispatchEvent(event); + + expect(mockCompleted).toHaveBeenCalledWith(event); + expect(mockCompleted).toHaveReturnedWith({ + request: { bucket: "test" }, + snapshot: { bytes: 100 }, + response: { status: "success" }, + }); + }); + + it("Should call multiple listeners for the same event type", () => { + const mockCallback = vi.fn(); + tm.addEventListener("transferInitiated", mockCallback); + tm.addEventListener("transferInitiated", mockCallback); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + const result = tm.dispatchEvent(event); + + expect(mockCallback).toHaveBeenCalledTimes(2); + expect(mockCallback).toHaveBeenCalledWith(event); + expect(result).toBe(true); + }); + + it("Should call listeners in the order they were added", () => { + const callOrder: number[] = []; + const mockCallback1 = vi.fn(() => callOrder.push(1)); + const mockCallback2 = vi.fn(() => callOrder.push(2)); + const mockCallback3 = vi.fn(() => callOrder.push(3)); + + tm.addEventListener("transferInitiated", mockCallback1); + tm.addEventListener("transferInitiated", mockCallback2); + tm.addEventListener("transferInitiated", mockCallback3); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(callOrder).toEqual([1, 2, 3]); + }); + + it("Should handle object-style callbacks with handleEvent method", () => { + const mockCallback = vi.fn(); + const objectCallback = { + handleEvent: mockCallback, + }; + tm.addEventListener("transferInitiated", objectCallback as any); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(mockCallback).toHaveBeenCalledTimes(1); + expect(mockCallback).toHaveBeenCalledWith(event); + }); + + it("Should handle events with no registered listeners", () => { + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + const result = tm.dispatchEvent(event); + + expect(result).toBe(true); + }); + + it("Should handle unknown event types", () => { + const event = Object.assign(new Event("unknownEvent"), { + request: {}, + snapshot: {}, + }); + + const results = tm.dispatchEvent(event); + expect(results).toBe(true); + }); + + it("Should handle a mix of object-style callbacks and functions", () => { + const callback = vi.fn(); + const objectCallback = { + handleEvent: vi.fn(), + }; + tm.addEventListener("transferInitiated", objectCallback as any); + tm.addEventListener("transferInitiated", callback); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(objectCallback.handleEvent).toHaveBeenCalledTimes(1); + expect(objectCallback.handleEvent).toHaveBeenCalledWith(event); + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith(event); + }); + }); + + describe("removeEventListener()", () => { + it("Should remove only the specified listener, leaving other intact", () => { + const callback1 = vi.fn(); + const callback2 = vi.fn(); + tm.addEventListener("transferInitiated", callback1); + tm.addEventListener("transferInitiated", callback2); + + tm.removeEventListener("transferInitiated", callback1); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(callback1).not.toHaveBeenCalled(); + expect(callback2).toHaveBeenCalledTimes(1); + }); + + it("Should remove object-style callback with handleEvent", () => { + const objectCallback = { handleEvent: vi.fn() }; + tm.addEventListener("transferInitiated", objectCallback as any); + tm.removeEventListener("transferInitiated", objectCallback as any); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + expect(objectCallback.handleEvent).not.toHaveBeenCalled(); + }); + + it("Should remove all instance of the same callback", () => { + const callback = vi.fn(); + tm.addEventListener("transferInitiated", callback); + tm.addEventListener("transferInitiated", callback); + + tm.removeEventListener("transferInitiated", callback); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(callback).not.toHaveBeenCalled(); + }); + + it("Should handle removing non-existing listener gracefully", () => { + const callback = vi.fn(); + expect(() => { + tm.removeEventListener("transferInitiated", callback); + }).not.toThrow(); + }); + + it("Should handle removing from an event type with no listeners gracefully", () => { + const callback = vi.fn(); + tm.removeEventListener("transferInitiated", callback); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(callback).not.toHaveBeenCalled(); + }); + + it("Should handle null callback parameter", () => { + expect(() => { + tm.removeEventListener("transferInitiated", null as any); + }).not.toThrow(); + }); + }); + }); + + describe("iterateListeners()", () => { + let tm: S3TransferManager; + + beforeEach(async () => { + tm = new S3TransferManager({ + s3ClientInstance: client, + }); + }); + + it("Should iterate over all listeners given a TransferManager's object of event listeners", () => { + const callback1 = vi.fn(); + const callback2 = vi.fn(); + const callback3 = vi.fn(); + + const eventListeners = { + transferInitiated: [callback1], + bytesTransferred: [callback2, callback3], + transferComplete: [], + transferFailed: [], + }; + + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; + + expect(results).toHaveLength(3); + expect(results[0][0]).toEqual({ eventType: "transferInitiated", callback: callback1 }); + expect(results[1][0]).toEqual({ eventType: "bytesTransferred", callback: callback2 }); + expect(results[2][0]).toEqual({ eventType: "bytesTransferred", callback: callback3 }); + }); + + it("Should handle empty event listeners object", () => { + const eventListeners = { + transferInitiated: [], + bytesTransferred: [], + transferComplete: [], + transferFailed: [], + }; + + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; + + expect(results).toHaveLength(0); + }); + + it("Should iterate over a mix of functions and objects with handleEvent callback types.", () => { + const callback1 = vi.fn(); + const callback2 = vi.fn(); + const objectCallback = { + handleEvent: vi.fn(), + }; + + const eventListeners = { + transferInitiated: [callback1], + bytesTransferred: [], + transferComplete: [], + transferFailed: [callback2, objectCallback], + }; + + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; + + expect(results).toHaveLength(3); + expect(results[0][0]).toEqual({ eventType: "transferInitiated", callback: callback1 }); + expect(results[1][0]).toEqual({ eventType: "transferFailed", callback: callback2 }); + expect(results[2][0]).toEqual({ eventType: "transferFailed", callback: objectCallback }); + }); + + it("Should handle event lisetners with duplicate callbacks in the same event type", () => { + const callback = vi.fn(); + + const eventListeners = { + transferInitiated: [callback, callback], + bytesTransferred: [], + transferComplete: [callback, callback], + transferFailed: [], + }; + + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; + + expect(results).toHaveLength(4); + for (let i = 0; i < results.length; i++) { + expect(results[i][0]).toEqual({ eventType: results[i][0].eventType, callback }); + } + }); + + it("Should return empty iterator when no callbacks are present", () => { + const eventListeners = {}; + + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; + + expect(results).toHaveLength(0); + }); + }); + + describe("validatePartDownload()", () => { + let tm: any; + beforeAll(async () => { + tm = new S3TransferManager() as any; + }, 120_000); + + it("Should pass correct ranges based on part number without throwing an error", () => { + const partSize = 5242880; + const ranges = [ + { partNumber: 1, range: "bytes 0-5242879/13631488" }, + { partNumber: 2, range: "bytes 5242880-10485759/13631488" }, + { partNumber: 3, range: "bytes 10485760-13631487/13631488" }, + ]; + + for (const { partNumber, range } of ranges) { + expect(() => { + tm.validatePartDownload(range, partNumber, partSize); + }).not.toThrow(); + } + }); + + it("Should throw error for incorrect start position", () => { + const partSize = 5242880; + + expect(() => { + tm.validatePartDownload("bytes 5242881-10485759/13631488", 2, partSize); + }).toThrow("Expected part 2 to start at 5242880 but got 5242881"); + + expect(() => { + tm.validatePartDownload("bytes 5242879-10485759/13631488", 2, partSize); + }).toThrow("Expected part 2 to start at 5242880 but got 5242879"); + + expect(() => { + tm.validatePartDownload("bytes 0-5242879/13631488", 2, partSize); + }).toThrow("Expected part 2 to start at 5242880 but got 0"); + }); + + it("Should throw error for incorrect end position", () => { + const partSize = 5242880; + + expect(() => { + tm.validatePartDownload("bytes 5242880-10485760/13631488", 2, partSize); + }).toThrow("Expected part 2 to end at 10485759 but got 10485760"); + + expect(() => { + tm.validatePartDownload("bytes 10485760-13631480/13631488", 3, partSize); + }).toThrow("Expected part 3 to end at 13631487 but got 13631480"); + }); + + it("Should handle last part correctly when not a full part size", () => { + const partSize = 5242880; + + expect(() => { + tm.validatePartDownload("bytes 10485760-13631487/13631488", 3, partSize); + }).not.toThrow(); + }); + + it("Should throw error for invalid ContentRange format", () => { + const partSize = 5242880; + + expect(() => { + tm.validatePartDownload("invalid-format", 2, partSize); + }).toThrow("Invalid ContentRange format: invalid-format"); + }); + + it("Should throw error for missing ContentRange", () => { + const partSize = 5242880; + + expect(() => { + tm.validatePartDownload(undefined, 2, partSize); + }).toThrow("Missing ContentRange for part 2."); + }); + }); + + describe("validateRangeDownload()", () => { + let tm: any; + beforeAll(async () => { + tm = new S3TransferManager() as any; + }, 120_000); + + it("Should pass when response range matches request range", () => { + expect(() => { + tm.validateRangeDownload("bytes=0-5242879", "bytes 0-5242879/13631488"); + }).not.toThrow(); + }); + + it("Should pass when response range ends at total size", () => { + expect(() => { + tm.validateRangeDownload("bytes=10485760-13631500", "bytes 10485760-13631487/13631488"); + }).not.toThrow(); + }); + + it("Should throw error for missing response range", () => { + expect(() => { + tm.validateRangeDownload("bytes=0-5242879", undefined); + }).toThrow("Missing ContentRange for range bytes=0-5242879."); + }); + + it("Should throw error for invalid response range format", () => { + expect(() => { + tm.validateRangeDownload("bytes=0-5242879", "invalid-format"); + }).toThrow("Invalid ContentRange format: invalid-format"); + }); + + it("Should throw error for invalid request range format", () => { + expect(() => { + tm.validateRangeDownload("invalid-format", "bytes 0-5242879/13631488"); + }).toThrow("Invalid Range format: invalid-format"); + }); + + it("Should throw error for incorrect start position", () => { + expect(() => { + tm.validateRangeDownload("bytes=0-5242879", "bytes 1-5242879/13631488"); + }).toThrow("Expected range to start at 0 but got 1"); + }); + + it("Should throw error for incorrect end position", () => { + expect(() => { + tm.validateRangeDownload("bytes=0-5242879", "bytes 0-5242878/13631488"); + }).toThrow("Expected range to end at 5242879 but got 5242878"); + }); + }); +}); + +describe("join-streams tests", () => { + const streamTypes = [ + { + name: "Readable", + createStream: () => new Readable({ read() {} }), + streamType: Readable, + }, + { + name: "ReadableStream", + createStream: () => new ReadableStream(), + streamType: ReadableStream, + }, + ]; + + describe("join-streams tests", () => { + const createReadableWithContent = (content: Buffer) => + new Readable({ + read() { + this.push(content); + this.push(null); + }, + }); + + const createEmptyReadable = () => + new Readable({ + read() { + this.push(null); + }, + }); + + const createReadableStreamWithContent = (content: Uint8Array) => + new ReadableStream({ + start(controller) { + controller.enqueue(content); + controller.close(); + }, + }); + + const createEmptyReadableStream = () => + new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const consumeReadable = async (stream: any): Promise => { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(Buffer.from(chunk)); + } + return Buffer.concat(chunks); + }; + + const consumeReadableStream = async (stream: any): Promise => { + const reader = stream.getReader(); + const chunks: Uint8Array[] = []; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + + return chunks; + }; + + const testCases = [ + { + name: "Readable", + createStream: () => new Readable({ read() {} }), + createWithContent: createReadableWithContent, + createEmpty: createEmptyReadable, + consume: consumeReadable, + isInstance: (stream: any) => stream instanceof Readable, + }, + { + name: "ReadableStream", + createStream: () => new ReadableStream(), + createWithContent: createReadableStreamWithContent, + createEmpty: createEmptyReadableStream, + consume: consumeReadableStream, + isInstance: (stream: any) => typeof stream.getReader === "function", + }, + ]; + + testCases.forEach(({ name, createStream, createWithContent, createEmpty, consume, isInstance }) => { + describe(`joinStreams() with ${name}`, () => { + it("should return single stream when only one stream is provided", async () => { + const stream = createStream(); + const result = await joinStreams([Promise.resolve(stream as unknown as StreamingBlobPayloadOutputTypes)]); + + expect(result).toBeDefined(); + expect(result).not.toBe(stream); + expect(isInstance(result)).toBe(true); + }); + + it("should join multiple streams into a single stream", async () => { + const contents = [Buffer.from("Chunk 1"), Buffer.from("Chunk 2"), Buffer.from("Chunk 3")]; + + const streams = contents.map((content) => + Promise.resolve(createWithContent(content) as unknown as StreamingBlobPayloadOutputTypes) + ); + + const joinedStream = await joinStreams(streams); + + const chunks = await consume(joinedStream); + + const joinedContent = Buffer.isBuffer(chunks) ? chunks.toString() : Buffer.concat(chunks).toString(); + contents.forEach((content) => { + expect(joinedContent).toContain(content.toString()); + }); + }); + + it("should handle consecutive calls of joining multiple streams into a single stream", async () => { + for (let i = 0; i <= 3; i++) { + const contents = [Buffer.from("Chunk 1"), Buffer.from("Chunk 2"), Buffer.from("Chunk 3")]; + + const streams = contents.map((content) => + Promise.resolve(createWithContent(content) as unknown as StreamingBlobPayloadOutputTypes) + ); + + const joinedStream = await joinStreams(streams); + + const chunks = await consume(joinedStream); + + const joinedContent = Buffer.isBuffer(chunks) ? chunks.toString() : Buffer.concat(chunks).toString(); + contents.forEach((content) => { + expect(joinedContent).toContain(content.toString()); + }); + } + }); + + it("should handle streams with no data", async () => { + const streams = [ + Promise.resolve(createEmpty() as unknown as StreamingBlobPayloadOutputTypes), + Promise.resolve(createEmpty() as unknown as StreamingBlobPayloadOutputTypes), + ]; + + const joinedStream = await joinStreams(streams); + + const chunks = await consume(joinedStream); + + const length = Array.isArray(chunks) ? chunks.length : chunks.length; + expect(length).toBe(0); + }); + + it("should report progress via eventListeners", async () => { + const streams = [ + Promise.resolve(createWithContent(Buffer.from("data")) as unknown as StreamingBlobPayloadOutputTypes), + Promise.resolve(createWithContent(Buffer.from("more")) as unknown as StreamingBlobPayloadOutputTypes), + ]; + + const onBytesSpy = vi.fn(); + const onCompletionSpy = vi.fn(); + + const joinedStream = await joinStreams(streams, { + onBytes: onBytesSpy, + onCompletion: onCompletionSpy, + }); + + await consume(joinedStream); + + expect(onBytesSpy).toHaveBeenCalled(); + expect(onCompletionSpy).toHaveBeenCalledWith(expect.any(Number), 1); + }); + }); + }); + }); +}); diff --git a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts new file mode 100644 index 000000000000..6fdb5a7cc9f2 --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts @@ -0,0 +1,879 @@ +import type { + _Object as S3Object, + ChecksumAlgorithm, + GetObjectCommandInput, + PutObjectCommandInput, +} from "@aws-sdk/client-s3"; +import { GetObjectCommand, HeadObjectCommand, S3Client } from "@aws-sdk/client-s3"; +import { type StreamingBlobPayloadOutputTypes } from "@smithy/types"; + +import type { AddEventListenerOptions, EventListener, RemoveEventListenerOptions } from "./event-listener-types"; +import { joinStreams } from "./join-streams"; +import type { + DownloadRequest, + DownloadResponse, + IS3TransferManager, + S3TransferManagerConfig, + TransferCompleteEvent, + TransferEvent, + TransferEventListeners, + TransferOptions, + UploadRequest, + UploadResponse, +} from "./types"; + +/** + * Client for efficient transfer of objects to and from Amazon S3. + * Provides methods to optimize uploading and downloading individual objects + * as well as entire directories, with support for multipart operations, + * concurrency control, and request cancellation. + * Implements an eventTarget-based progress tracking system with methods to register, + * dispatch, and remove listeners for transfer lifecycle events. + * + * @alpha + */ + +export class S3TransferManager implements IS3TransferManager { + private static MIN_PART_SIZE = 5 * 1024 * 1024; // 5MB + private static DEFAULT_PART_SIZE = 8 * 1024 * 1024; // 8MB + private static MIN_UPLOAD_THRESHOLD = 16 * 1024 * 1024; // 16MB + + private readonly s3ClientInstance: S3Client; + private readonly targetPartSizeBytes: number; + private readonly multipartUploadThresholdBytes: number; + private readonly checksumValidationEnabled: boolean; + private readonly checksumAlgorithm: ChecksumAlgorithm; + private readonly multipartDownloadType: "PART" | "RANGE"; + private readonly eventListeners: TransferEventListeners; + private readonly abortCleanupFunctions = new WeakMap void>(); + + public constructor(config: S3TransferManagerConfig = {}) { + this.checksumValidationEnabled = config.checksumValidationEnabled ?? true; + + const checksumMode = this.checksumValidationEnabled ? "WHEN_SUPPORTED" : "WHEN_REQUIRED"; + + this.s3ClientInstance = + config.s3ClientInstance ?? + new S3Client({ + requestChecksumCalculation: checksumMode, + responseChecksumValidation: checksumMode, + }); + + this.targetPartSizeBytes = config.targetPartSizeBytes ?? S3TransferManager.DEFAULT_PART_SIZE; + this.multipartUploadThresholdBytes = config.multipartUploadThresholdBytes ?? S3TransferManager.MIN_UPLOAD_THRESHOLD; + + this.checksumAlgorithm = config.checksumAlgorithm ?? "CRC32"; + this.multipartDownloadType = config.multipartDownloadType ?? "PART"; + this.eventListeners = { + transferInitiated: config.eventListeners?.transferInitiated ?? [], + bytesTransferred: config.eventListeners?.bytesTransferred ?? [], + transferComplete: config.eventListeners?.transferComplete ?? [], + transferFailed: config.eventListeners?.transferFailed ?? [], + }; + + this.validateConfig(); + } + + /** + * Registers a callback function to be executed when a specific transfer event occurs. + * Supports monitoring the full lifecycle of transfers. + * + * @param type - The type of event to listen for. + * @param callback - Function to execute when the specified event occurs. + * @param options - Optional configuration for event listener behavior. + * + * @alpha + */ + public addEventListener( + type: "transferInitiated", + callback: EventListener, + options?: AddEventListenerOptions | boolean + ): void; + public addEventListener( + type: "bytesTransferred", + callback: EventListener, + options?: AddEventListenerOptions | boolean + ): void; + public addEventListener( + type: "transferComplete", + callback: EventListener, + options?: AddEventListenerOptions | boolean + ): void; + public addEventListener( + type: "transferFailed", + callback: EventListener, + options?: AddEventListenerOptions | boolean + ): void; + public addEventListener(type: string, callback: EventListener, options?: AddEventListenerOptions | boolean): void; + public addEventListener(type: string, callback: EventListener, options?: AddEventListenerOptions | boolean): void { + const eventType = type as keyof TransferEventListeners; + const listeners = this.eventListeners[eventType]; + + if (!listeners) { + throw new Error(`Unknown event type: ${eventType}`); + } + + const once = typeof options !== "boolean" && options?.once; + const signal = typeof options !== "boolean" ? options?.signal : undefined; + let updatedCallback = callback; + + if (signal?.aborted) { + return; + } + + if (signal) { + const removeListenerAfterAbort = () => { + this.removeEventListener(eventType, updatedCallback); + this.abortCleanupFunctions.delete(signal); + }; + + this.abortCleanupFunctions.set(signal, () => signal.removeEventListener("abort", removeListenerAfterAbort)); + signal.addEventListener("abort", removeListenerAfterAbort, { once: true }); + } + + if (once) { + updatedCallback = (event: Event) => { + if (typeof callback === "function") { + callback(event); + } else { + callback.handleEvent(event); + } + this.removeEventListener(eventType, updatedCallback); + }; + } + listeners.push(updatedCallback); + } + + /** + * Dispatches an event to the registered event listeners. + * Triggers callbacks registered via addEventListener with matching event types. + * + * @param event - The event object to dispatch. + * @returns whether the event ran to completion + * + * @alpha + */ + public dispatchEvent(event: Event & TransferEvent): boolean; + public dispatchEvent(event: Event & TransferCompleteEvent): boolean; + public dispatchEvent(event: Event): boolean; + public dispatchEvent(event: Event): boolean { + const eventType = event.type; + const listeners = this.eventListeners[eventType as keyof TransferEventListeners] as EventListener[]; + + if (listeners) { + for (const listener of listeners) { + if (typeof listener === "function") { + listener(event); + } else { + listener.handleEvent(event); + } + } + } + return true; + } + + /** + * Removes a previously registered event listener from the specified event type. + * Stops the callback from being invoked when the event occurs. + * + * @param type - The type of event to stop listening for. + * @param callback - The function that was previously registered. + * @param options - Optional configuration for the event listener. + * + * @alpha + */ + public removeEventListener( + type: "transferInitiated", + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + public removeEventListener( + type: "bytesTransferred", + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + public removeEventListener( + type: "transferComplete", + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + public removeEventListener( + type: "transferFailed", + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + public removeEventListener( + type: string, + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + public removeEventListener( + type: string, + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void { + const eventType = type as keyof TransferEventListeners; + const listeners = this.eventListeners[eventType]; + + if (listeners) { + if ( + eventType === "transferInitiated" || + eventType === "bytesTransferred" || + eventType === "transferFailed" || + eventType === "transferComplete" + ) { + const eventListener = callback as EventListener; + let index = listeners.indexOf(eventListener); + while (index !== -1) { + listeners.splice(index, 1); + index = listeners.indexOf(eventListener); + } + } else { + throw new Error(`Unknown event type: ${type}`); + } + } + } + + /** + * Uploads objects to S3 with automatic multipart upload handling. + * Automatically chooses between single object upload or multipart upload based on content length threshold. + * + * @param request - PutObjectCommandInput and CreateMultipartUploadCommandInput parameters for single or multipart uploads. + * @param transferOptions - Optional abort signal and event listeners for transfer lifecycle monitoring. + * + * @returns S3 PutObject or CompleteMultipartUpload response with transfer event dispatching. + * + * @alpha + */ + public upload(request: UploadRequest, transferOptions?: TransferOptions): Promise { + throw new Error("Method not implemented."); + } + + /** + * Downloads single objects from S3 with automatic multipart handling. + * Automatically chooses between PART or RANGE download strategies and joins streams into a single response. + * + * @param request - GetObjectCommandInput parameters. PartNumber is not supported - use GetObjectCommand directly for specific parts. + * @param transferOptions - Optional abort signal and event listeners for transfer lifecycle monitoring. + * + * @returns S3 GetObject response with joined Body stream and transfer event dispatching. + * + * @alpha + */ + public async download(request: DownloadRequest, transferOptions?: TransferOptions): Promise { + const partNumber = request.PartNumber; + if (typeof partNumber === "number") { + throw new Error( + "partNumber included: S3 Transfer Manager does not support downloads for specific parts. Use GetObjectCommand instead" + ); + } + + const metadata = {} as Omit; + const streams = [] as Promise[]; + const requests = [] as GetObjectCommandInput[]; + + let totalSize: number | undefined; + + this.checkAborted(transferOptions); + this.addEventListeners(transferOptions?.eventListeners); + + if (this.multipartDownloadType === "PART") { + const responseMetadata = await this.downloadByPart(request, transferOptions ?? {}, streams, requests, metadata); + totalSize = responseMetadata.totalSize; + } else if (this.multipartDownloadType === "RANGE") { + const responseMetadata = await this.downloadByRange(request, transferOptions ?? {}, streams, requests, metadata); + totalSize = responseMetadata.totalSize; + } + + const removeLocalEventListeners = () => { + this.removeEventListeners(transferOptions?.eventListeners); + + // remove any local abort() listeners after request completes. + if (transferOptions?.abortSignal) { + this.abortCleanupFunctions.get(transferOptions.abortSignal as AbortSignal)?.(); + this.abortCleanupFunctions.delete(transferOptions.abortSignal as AbortSignal); + } + }; + + const response = { + ...metadata, + Body: await joinStreams(streams, { + onBytes: (byteLength: number, index) => { + this.dispatchEvent( + Object.assign(new Event("bytesTransferred"), { + request: requests[index], + snapshot: { + transferredBytes: byteLength, + totalBytes: totalSize, + }, + }) + ); + }, + onCompletion: (byteLength: number, index) => { + this.dispatchEvent( + Object.assign(new Event("transferComplete"), { + request: requests[index], + response, + snapshot: { + transferredBytes: byteLength, + totalBytes: totalSize, + }, + }) + ); + removeLocalEventListeners(); + }, + onFailure: (error: unknown, index) => { + this.dispatchEvent( + Object.assign(new Event("transferFailed"), { + request: requests[index], + snapshot: { + transferredBytes: error, + totalBytes: totalSize, + }, + }) + ); + removeLocalEventListeners(); + }, + }), + }; + + return response; + } + + /** + * Uploads all files in a directory recursively to an S3 bucket. + * Automatically maps local file paths to S3 object keys using prefix and delimiter configuration. + * + * @param options - Configuration including bucket, source directory, filtering, failure handling, and transfer settings. + * + * @returns the number of objects that have been uploaded and the number of objects that have failed. + * + * @alpha + */ + public uploadAll(options: { + bucket: string; + source: string; + followSymbolicLinks?: boolean; + recursive?: boolean; + s3Prefix?: string; + filter?: (filepath: string) => boolean; + s3Delimiter?: string; + putObjectRequestCallback?: (putObjectRequest: PutObjectCommandInput) => Promise; + failurePolicy?: (error?: unknown) => Promise; + transferOptions?: TransferOptions; + }): Promise<{ objectsUploaded: number; objectsFailed: number }> { + throw new Error("Method not implemented."); + } + + /** + * Downloads all objects in a bucket to a local directory. + * Uses ListObjectsV2 to retrieve objects and automatically maps S3 object keys to local file paths. + * + * @param options - Configuration including bucket, destination directory, filtering, failure handling, and transfer settings. + * + * @returns The number of objects that have been downloaded and the number of objects that have failed. + * + * @alpha + */ + public downloadAll(options: { + bucket: string; + destination: string; + s3Prefix?: string; + s3Delimiter?: string; + recursive?: boolean; + filter?: (object?: S3Object) => boolean; + getObjectRequestCallback?: (getObjectRequest: GetObjectCommandInput) => Promise; + failurePolicy?: (error?: unknown) => Promise; + transferOptions?: TransferOptions; + }): Promise<{ objectsDownloaded: number; objectsFailed: number }> { + throw new Error("Method not implemented."); + } + + /** + * Downloads object using part-based strategy with concurrent part requests. + * + * @internal + */ + protected async downloadByPart( + request: DownloadRequest, + transferOptions: TransferOptions, + streams: Promise[], + requests: GetObjectCommandInput[], + metadata: Omit + ): Promise<{ totalSize: number | undefined }> { + let totalSize: number | undefined; + this.checkAborted(transferOptions); + + if (request.Range == null) { + const initialPartRequest = { + ...request, + PartNumber: 1, + }; + try { + const initialPart = await this.s3ClientInstance.send(new GetObjectCommand(initialPartRequest), transferOptions); + const initialETag = initialPart.ETag ?? undefined; + await internalEventHandler.afterInitialGetObject(); + const partSize = initialPart.ContentLength; + totalSize = initialPart.ContentRange ? Number.parseInt(initialPart.ContentRange.split("/")[1]) : 0; + this.dispatchTransferInitiatedEvent(request, totalSize); + if (initialPart.Body) { + if (initialPart.Body && typeof (initialPart.Body as any).getReader === "function") { + const reader = (initialPart.Body as any).getReader(); + (initialPart.Body as any).getReader = function () { + return reader; + }; + } + streams.push(Promise.resolve(initialPart.Body)); + requests.push(initialPartRequest); + } + + this.processResponseMetadata(initialPart, metadata, totalSize); + + let partCount = 1; + if (initialPart.PartsCount! > 1) { + for (let part = 2; part <= initialPart.PartsCount!; part++) { + this.checkAborted(transferOptions); + const getObjectRequest = { + ...request, + PartNumber: part, + IfMatch: initialETag, + }; + + const getObject = this.s3ClientInstance + .send(new GetObjectCommand(getObjectRequest), transferOptions) + .then((response) => { + this.validatePartDownload(response.ContentRange, part, partSize ?? 0); + if (response.Body && typeof (response.Body as any).getReader === "function") { + const reader = (response.Body as any).getReader(); + (response.Body as any).getReader = function () { + return reader; + }; + } + return response.Body!; + }) + .catch((error) => { + this.dispatchTransferFailedEvent(getObjectRequest, totalSize, error as Error); + throw error; + }); + streams.push(getObject); + requests.push(getObjectRequest); + partCount++; + } + + if (partCount !== initialPart.PartsCount) { + throw new Error( + `The number of parts downloaded (${partCount}) does not match the expected number (${initialPart.PartsCount})` + ); + } + } + } catch (error) { + this.dispatchTransferFailedEvent(request, totalSize, error); + throw error; + } + } else { + this.checkAborted(transferOptions); + + try { + const getObjectRequest = { + ...request, + }; + + const getObject = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); + totalSize = getObject.ContentRange ? Number.parseInt(getObject.ContentRange.split("/")[1]) : 0; + + this.dispatchTransferInitiatedEvent(request, totalSize); + if (getObject.Body) { + streams.push(Promise.resolve(getObject.Body)); + requests.push(getObjectRequest); + } + this.processResponseMetadata(getObject, metadata, totalSize); + } catch (error) { + this.dispatchTransferFailedEvent(request, undefined, error); + throw error; + } + } + + return { + totalSize, + }; + } + + /** + * Downloads object using range-based strategy with concurrent range requests. + * + * @internal + */ + protected async downloadByRange( + request: DownloadRequest, + transferOptions: TransferOptions, + streams: Promise[], + requests: GetObjectCommandInput[], + metadata: Omit + ): Promise<{ totalSize: number | undefined }> { + this.checkAborted(transferOptions); + + const headResponse = await this.s3ClientInstance.send( + new HeadObjectCommand({ Bucket: request.Bucket, Key: request.Key }), + transferOptions + ); + + if (headResponse.ContentLength === 0) { + const getObjectRequest = { ...request }; + const response = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); + + this.dispatchTransferInitiatedEvent(request, 0); + if (response.Body) streams.push(Promise.resolve(response.Body)); + requests.push(getObjectRequest); + + this.processResponseMetadata(response, metadata, 0); + return { totalSize: 0 }; + } + + let left = 0; + let right = this.targetPartSizeBytes - 1; + let maxRange = Number.POSITIVE_INFINITY; + let remainingLength = 1; + let totalSize: number | undefined; + let initialETag: string | undefined; + + if (request.Range != null) { + const [userRangeLeft, userRangeRight] = request.Range.replace("bytes=", "").split("-").map(Number); + maxRange = userRangeRight; + left = userRangeLeft; + right = Math.min(userRangeRight, left + S3TransferManager.MIN_PART_SIZE - 1); + totalSize = userRangeRight + 1; + } + + try { + const getObjectRequest: GetObjectCommandInput = { + ...request, + Range: `bytes=${left}-${right}`, + }; + const initialRangeGet = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); + await internalEventHandler.afterInitialGetObject(); + this.validateRangeDownload(`bytes=${left}-${right}`, initialRangeGet.ContentRange); + initialETag = initialRangeGet.ETag ?? undefined; + if (!totalSize) { + totalSize = initialRangeGet.ContentRange + ? Number.parseInt(initialRangeGet.ContentRange.split("/")[1]) + : undefined; + } + + if (initialRangeGet.Body && typeof (initialRangeGet.Body as any).getReader === "function") { + const reader = (initialRangeGet.Body as any).getReader(); + (initialRangeGet.Body as any).getReader = function () { + return reader; + }; + } + + this.dispatchTransferInitiatedEvent(request, totalSize); + streams.push(Promise.resolve(initialRangeGet.Body!)); + requests.push(getObjectRequest); + this.processResponseMetadata(initialRangeGet, metadata, totalSize); + } catch (error) { + this.dispatchTransferFailedEvent(request, totalSize, error as Error); + throw error; + } + + let expectedRequestCount = 1; + if (totalSize) { + const contentLength = totalSize; + const remainingBytes = Math.max(0, contentLength - (right - left + 1)); + const additionalRequests = Math.ceil(remainingBytes / S3TransferManager.MIN_PART_SIZE); + expectedRequestCount += additionalRequests; + } + + left = right + 1; + right = Math.min(left + S3TransferManager.MIN_PART_SIZE - 1, maxRange); + remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0; + let actualRequestCount = 1; + + while (remainingLength > 0) { + this.checkAborted(transferOptions); + + const range = `bytes=${left}-${right}`; + const getObjectRequest: GetObjectCommandInput = { + ...request, + Range: range, + IfMatch: initialETag, + }; + + const getObject = this.s3ClientInstance + .send(new GetObjectCommand(getObjectRequest), transferOptions) + .then((response) => { + this.validateRangeDownload(range, response.ContentRange); + if (response.Body && typeof (response.Body as any).getReader === "function") { + const reader = (response.Body as any).getReader(); + (response.Body as any).getReader = function () { + return reader; + }; + } + return response.Body!; + }) + .catch((error) => { + this.dispatchTransferFailedEvent(getObjectRequest, totalSize, error); + throw error; + }); + + streams.push(getObject); + requests.push(getObjectRequest); + actualRequestCount++; + + left = right + 1; + right = Math.min(left + S3TransferManager.MIN_PART_SIZE - 1, maxRange); + remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0; + } + + if (expectedRequestCount !== actualRequestCount) { + throw new Error( + `The number of ranged GET requests sent (${actualRequestCount}) does not match the expected number (${expectedRequestCount})` + ); + } + + return { + totalSize, + }; + } + + /** + * Adds all event listeners from provided collection to the transfer manager. + * + * @internal + */ + private addEventListeners(eventListeners?: TransferEventListeners): void { + for (const listeners of this.iterateListeners(eventListeners)) { + for (const listener of listeners) { + this.addEventListener(listener.eventType, listener.callback as EventListener); + } + } + } + + /** + * Removes event listeners from provided collection from the transfer manager. + * + * @internal + */ + private removeEventListeners(eventListeners?: TransferEventListeners): void { + for (const listeners of this.iterateListeners(eventListeners)) { + for (const listener of listeners) { + this.removeEventListener(listener.eventType, listener.callback as EventListener); + } + } + } + + /** + * Copies all response properties except Body to the container object. + * + * @internal + */ + private assignMetadata(container: any, response: any) { + for (const key in response) { + if (key === "Body") { + continue; + } + container[key] = response[key]; + } + } + + /** + * Updates response ContentLength and ContentRange based on total object size. + * + * @internal + */ + private updateResponseLengthAndRange(response: DownloadResponse, totalSize: number | undefined): void { + if (totalSize !== undefined) { + response.ContentLength = totalSize; + response.ContentRange = `bytes 0-${totalSize - 1}/${totalSize}`; + } + } + + /** + * Clears checksum values for composite multipart downloads. + * + * @internal + */ + private updateChecksumValues(initialPart: DownloadResponse, metadata: Omit) { + if (initialPart.ChecksumType === "COMPOSITE") { + metadata.ChecksumCRC32 = undefined; + metadata.ChecksumCRC32C = undefined; + metadata.ChecksumSHA1 = undefined; + metadata.ChecksumSHA256 = undefined; + } + } + + /** + * Processes response metadata by updating length, copying properties, and handling checksums. + * + * @internal + */ + private processResponseMetadata( + response: DownloadResponse, + metadata: Omit, + totalSize: number | undefined + ): void { + this.updateResponseLengthAndRange(response, totalSize); + this.assignMetadata(metadata, response); + this.updateChecksumValues(response, metadata); + } + + /** + * Throws AbortError if transfer has been aborted via signal. + * + * @internal + */ + private checkAborted(transferOptions?: TransferOptions): void { + if (transferOptions?.abortSignal?.aborted) { + throw Object.assign(new Error("Download aborted."), { name: "AbortError" }); + } + } + + /** + * Validates if configuration parameters meets minimum requirements. + * + * @internal + */ + private validateConfig(): void { + if (this.targetPartSizeBytes < S3TransferManager.MIN_PART_SIZE) { + throw new Error(`targetPartSizeBytes must be at least ${S3TransferManager.MIN_PART_SIZE} bytes`); + } + } + + /** + * Dispatches transferInitiated event with initial progress snapshot. + * + * @internal + */ + private dispatchTransferInitiatedEvent(request: DownloadRequest | UploadRequest, totalSize?: number): boolean { + this.dispatchEvent( + Object.assign(new Event("transferInitiated"), { + request, + snapshot: { + transferredBytes: 0, + totalBytes: totalSize, + }, + }) + ); + return true; + } + + /** + * Dispatches transferFailed event with error details and progress snapshot. + * + * @internal + */ + private dispatchTransferFailedEvent( + request: DownloadRequest | UploadRequest, + totalSize?: number, + error?: Error + ): boolean { + this.dispatchEvent( + Object.assign(new Event("transferFailed"), { + request, + error, + snapshot: { + transferredBytes: 0, + totalBytes: totalSize, + }, + }) + ); + return true; + } + + /** + * Generator that yields event listeners from the provided collection for iteration. + * + * @internal + */ + private *iterateListeners(eventListeners: TransferEventListeners = {}) { + for (const key in eventListeners) { + const eventType = key as keyof TransferEventListeners; + const listeners = eventListeners[eventType as keyof TransferEventListeners]; + if (Array.isArray(listeners)) { + for (const callback of listeners) { + yield [ + { + eventType: eventType, + callback: callback, + }, + ]; + } + } + } + } + + /** + * Validates part download ContentRange matches expected part boundaries. + * + * @internal + */ + private validatePartDownload(contentRange: string | undefined, partNumber: number, partSize: number) { + if (!contentRange) { + throw new Error(`Missing ContentRange for part ${partNumber}.`); + } + + const match = contentRange.match(/bytes (\d+)-(\d+)\/(\d+)/); + if (!match) throw new Error(`Invalid ContentRange format: ${contentRange}`); + + const start = Number.parseInt(match[1]); + const end = Number.parseInt(match[2]); + const total = Number.parseInt(match[3]) - 1; + + const expectedStart = (partNumber - 1) * partSize; + const expectedEnd = Math.min(expectedStart + partSize - 1, total); + + if (start !== expectedStart) { + throw new Error(`Expected part ${partNumber} to start at ${expectedStart} but got ${start}`); + } + + if (end !== expectedEnd) { + throw new Error(`Expected part ${partNumber} to end at ${expectedEnd} but got ${end}`); + } + } + + /** + * Validates range download ContentRange matches requested byte range. + * + * @internal + */ + private validateRangeDownload(requestRange: string, responseRange: string | undefined) { + if (!responseRange) { + throw new Error(`Missing ContentRange for range ${requestRange}.`); + } + + const match = responseRange.match(/bytes (\d+)-(\d+)\/(\d+)/); + if (!match) throw new Error(`Invalid ContentRange format: ${responseRange}`); + + const start = Number.parseInt(match[1]); + const end = Number.parseInt(match[2]); + const total = Number.parseInt(match[3]) - 1; + + const rangeMatch = requestRange.match(/bytes=(\d+)-(\d+)/); + if (!rangeMatch) throw new Error(`Invalid Range format: ${requestRange}`); + + const expectedStart = Number.parseInt(rangeMatch[1]); + const expectedEnd = Number.parseInt(rangeMatch[2]); + + if (start !== expectedStart) { + throw new Error(`Expected range to start at ${expectedStart} but got ${start}`); + } + + if (end === expectedEnd) { + return; + } + + if (end === total) { + return; + } + + throw new Error(`Expected range to end at ${expectedEnd} but got ${end}`); + } +} + +/** + * Internal event handler for download lifecycle hooks. + * + * @internal + */ +export const internalEventHandler = { + async afterInitialGetObject() {}, +}; diff --git a/lib/lib-storage/src/s3-transfer-manager/event-listener-types.ts b/lib/lib-storage/src/s3-transfer-manager/event-listener-types.ts new file mode 100644 index 000000000000..13c8774b1736 --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/event-listener-types.ts @@ -0,0 +1,58 @@ +/** + * Function type for handling transfer events in the transfer manager. + * Represents a callback that receives event data during transfer operations. + * + * @param event - The event object containing transfer details and progress information. + + * @alpha + */ +export type EventListenerFunction = (event: Event & E) => void; + +/** + * Union type for handling transfer events in the transfer manager. + * Can be a function or an object. + * + * @alpha + */ +export type EventListener = EventListenerFunction | EventListenerObject; + +/** + * Object type for handling transfer events in the transfer manager. + * Represents an object that implements the `handleEvent` method to handle transfer events. + * + * @alpha + */ +export type EventListenerObject = { + handleEvent: EventListenerFunction; +}; + +/** + * Configuration options for registering event listeners in the transfer manager. + * Controls the behavior of event listeners for transfer events. + * + * @alpha + */ +export type AddEventListenerOptions = { + /** + * A boolean value indicating that the listener should be invoked at most once + * after being added. If true, the listener would be automatically removed when invoked. + * If not specified, defaults to false. + */ + once?: boolean; + /** + * An AbortSignal. The listener will be removed when the abort() method of the + * AbortController which owns the AbortSignal is called. If not specified, no + * AbortSignal is associated with the listener. + */ + signal?: AbortSignal; +}; + +/** + * Configuration options for removing event listeners in the transfer manager. + * Controls the behavior of event listeners for transfer events. + * + * @alpha + */ +export type RemoveEventListenerOptions = { + capture?: boolean; +}; diff --git a/lib/lib-storage/src/s3-transfer-manager/index.ts b/lib/lib-storage/src/s3-transfer-manager/index.ts new file mode 100644 index 000000000000..c5fd2ea54f9d --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/index.ts @@ -0,0 +1,3 @@ +export { S3TransferManager } from "./S3TransferManager"; +export type { IS3TransferManager } from "./types"; +export type {} from "./event-listener-types"; diff --git a/lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts b/lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts new file mode 100644 index 000000000000..b85da59aa737 --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts @@ -0,0 +1,92 @@ +import { StreamingBlobPayloadOutputTypes } from "@smithy/types"; +import { isReadableStream, sdkStreamMixin } from "@smithy/util-stream"; + +import { JoinStreamIterationEvents } from "./types"; + +/** + * Joins multiple stream promises into a single stream with event callbacks. + * + * @internal + */ +export async function joinStreams( + streams: Promise[], + eventListeners?: JoinStreamIterationEvents +): Promise { + const firstStream = await streams[0]; + if (isReadableStream(firstStream)) { + const newReadableStream = new ReadableStream({ + async start(controller) { + for await (const chunk of iterateStreams(streams, eventListeners)) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); + return sdkStreamMixin(newReadableStream); + } else { + throw new Error("Unsupported Stream Type"); + } +} + +/** + * Iterates through stream promises sequentially, yielding chunks with progress tracking. + * + * @internal + */ +export async function* iterateStreams( + promises: Promise[], + eventListeners?: JoinStreamIterationEvents +): AsyncIterable { + let bytesTransferred = 0; + let index = 0; + for (const streamPromise of promises) { + let stream: Awaited<(typeof promises)[0]>; + try { + stream = await streamPromise; + } catch (e) { + await destroy(promises); + eventListeners?.onFailure?.(e, index); + throw e; + } + + if (isReadableStream(stream)) { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + yield value; + bytesTransferred += value.byteLength; + eventListeners?.onBytes?.(bytesTransferred, index); + } + } finally { + reader.releaseLock(); + } + } else { + const failure = new Error(`unhandled stream type ${(stream as any)?.constructor?.name}`); + eventListeners?.onFailure?.(failure, index); + throw failure; + } + index++; + } + eventListeners?.onCompletion?.(bytesTransferred, index - 1); +} + +/** + * @internal + */ +async function destroy(promises: Promise[]): Promise { + await Promise.all( + promises.map(async (streamPromise) => { + return streamPromise + .then((stream) => { + if (isReadableStream(stream)) { + return stream.cancel(); + } + }) + .catch((e: unknown) => {}); + }) + ); +} diff --git a/lib/lib-storage/src/s3-transfer-manager/join-streams.ts b/lib/lib-storage/src/s3-transfer-manager/join-streams.ts new file mode 100644 index 000000000000..c1fbd2dca20e --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/join-streams.ts @@ -0,0 +1,105 @@ +import { StreamingBlobPayloadOutputTypes } from "@smithy/types"; +import { isReadableStream, sdkStreamMixin } from "@smithy/util-stream"; +import { Readable } from "stream"; + +import { JoinStreamIterationEvents } from "./types"; + +/** + * Joins multiple stream promises into a single stream with event callbacks. + * + * @internal + */ +export async function joinStreams( + streams: Promise[], + eventListeners?: JoinStreamIterationEvents +): Promise { + const firstStream = await streams[0]; + if (isReadableStream(firstStream)) { + const newReadableStream = new ReadableStream({ + async start(controller) { + for await (const chunk of iterateStreams(streams, eventListeners)) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); + return sdkStreamMixin(newReadableStream); + } else { + streams.forEach((stream) => stream.catch(() => {})); + + return sdkStreamMixin(Readable.from(iterateStreams(streams, eventListeners))); + } +} + +/** + * Iterates through stream promises sequentially, yielding chunks with progress tracking. + * + * @internal + */ +export async function* iterateStreams( + promises: Promise[], + eventListeners?: JoinStreamIterationEvents +): AsyncIterable { + let bytesTransferred = 0; + let index = 0; + for (const streamPromise of promises) { + let stream: Awaited<(typeof promises)[0]>; + try { + stream = await streamPromise; + } catch (e) { + await destroy(promises); + eventListeners?.onFailure?.(e, index); + throw e; + } + + if (isReadableStream(stream)) { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + yield value; + bytesTransferred += value.byteLength; + eventListeners?.onBytes?.(bytesTransferred, index); + } + } finally { + reader.releaseLock(); + } + } else if (stream instanceof Readable) { + for await (const chunk of stream) { + yield chunk; + const chunkSize = Buffer.isBuffer(chunk) ? chunk.length : Buffer.byteLength(chunk); + bytesTransferred += chunkSize; + eventListeners?.onBytes?.(bytesTransferred, index); + } + } else { + const failure = new Error(`unhandled stream type ${(stream as any)?.constructor?.name}`); + eventListeners?.onFailure?.(failure, index); + throw failure; + } + index++; + } + eventListeners?.onCompletion?.(bytesTransferred, index - 1); +} + +/** + * @internal + */ +async function destroy(promises: Promise[]): Promise { + await Promise.all( + promises.map(async (streamPromise) => { + return streamPromise + .then((stream) => { + if (stream instanceof Readable) { + stream.destroy(); + return; + } else if (isReadableStream(stream)) { + return stream.cancel(); + } + }) + .catch((e: unknown) => {}); + }) + ); +} diff --git a/lib/lib-storage/src/s3-transfer-manager/types.ts b/lib/lib-storage/src/s3-transfer-manager/types.ts new file mode 100644 index 000000000000..3312a3ee9244 --- /dev/null +++ b/lib/lib-storage/src/s3-transfer-manager/types.ts @@ -0,0 +1,340 @@ +import type { + _Object as S3Object, + ChecksumAlgorithm, + CompleteMultipartUploadCommandOutput, + CreateMultipartUploadCommandInput, + GetObjectCommandInput, + GetObjectCommandOutput, + PutObjectCommandInput, + PutObjectCommandOutput, + S3Client, +} from "@aws-sdk/client-s3"; +import { HttpHandlerOptions } from "@smithy/types"; + +import { AddEventListenerOptions, EventListener, RemoveEventListenerOptions } from "./event-listener-types"; + +/** + * Constructor parameters for the S3 Transfer Manager configuration. + * + * @alpha + */ +export interface S3TransferManagerConfig { + /** + * The low level S3 client that will be used to send requests to S3. + */ + s3ClientInstance?: S3Client; + /** + * The target part size to use in a multipart transfer. Does not apply to downloads if multipartDownloadType is PART. + */ + targetPartSizeBytes?: number; + /** + * The size threshold, in bytes, for when to use multipart upload. + */ + multipartUploadThresholdBytes?: number; + /** + * Option for whether to use checksum validation for download. + */ + checksumValidationEnabled?: boolean; + /** + * Checksum algorithm to use for upload. + */ + checksumAlgorithm?: ChecksumAlgorithm; + /** + * How the SDK should perform multipart download, either RANGE or PART. + */ + multipartDownloadType?: "RANGE" | "PART"; + /** + * Collection of callbacks for monitoring transfer lifecycle events. Allows tracking statuses of all transfers from the client. + */ + eventListeners?: TransferEventListeners; +} + +/** + * Uses intersection because requests includes all the required parameters from + * both PutObjectCommandInput and CreateMultipartUploadCommandInput to support both single object + * and multipart upload requests. + * + * @alpha + */ +export type UploadRequest = PutObjectCommandInput & CreateMultipartUploadCommandInput; + +/** + * Uses union because the responses can vary from single object upload response to multipart upload + * response depending on the request. + * + * @alpha + */ +export type UploadResponse = PutObjectCommandOutput | CompleteMultipartUploadCommandOutput; + +/** + * Features the same properties as SDK JS S3 Command GetObjectCommandInput. + * Created to standardize naming convention for TM APIs. + * + * @alpha + */ +export type DownloadRequest = GetObjectCommandInput; + +/** + * Features the same properties as SDK JS S3 Command GetObjectCommandOutput. + * Created to standardize naming convention for TM APIs. + * + * @alpha + */ +export type DownloadResponse = GetObjectCommandOutput; + +/** + * Options for transfer operations that combine HTTP handler options with transfer event listeners. + * + * @property eventListeners - Collection of callbacks for monitoring transfer lifecycle events + * + * @alpha + */ +export type TransferOptions = HttpHandlerOptions & { eventListeners?: TransferEventListeners }; + +/** + * Client for efficient transfer of objects to and from Amazon S3. + * Provides methods to optimize uploading and downloading individual objects + * as well as entire directories, with support for multipart operations, + * concurrency control, and request cancellation. + * Implements an event-based progress tracking system with methods to register, + * dispatch, and remove listeners for transfer lifecycle events. + * + * @alpha + */ +export interface IS3TransferManager { + /** + * Lets users upload single objects from a given directory to a given bucket. + * Supports multipart upload, single object upload, and transfer progress listeners. + * + * @param request - All properties of a single or multipart upload request. + * @param transferOptions - Allows users to specify cancel functions for the request and a collection of callbacks for monitoring transfer lifecycle events. Allows tracking statuses per request. + * + * @returns The response from the S3 API for the upload request. + */ + upload(request: UploadRequest, transferOptions?: TransferOptions): Promise; + + /** + * Lets users download single objects from a given bucket to a given directory. + * Supports multipart download, single object download, and transfer progress listeners. + * + * @param request - All properties of a single or multipart upload request. + * @param transferOptions - Allows users to specify cancel functions for the request and a collection of callbacks for monitoring transfer lifecycle events. Allows tracking statuses per request. + * + * @returns the response from the S3 API for the download request. + */ + download(request: DownloadRequest, transferOptions?: TransferOptions): Promise; + + /** + * Represents an API to upload all files under the given directory to the provided S3 bucket. + * + * @param options.bucket - The name of the bucket to upload objects to. + * @param options.source - The source directory to upload. + * @param options.followSymbolicLinks - Whether to follow symbolic links when traversing the file tree. + * @param options.recursive - Whether to upload directories recursively. + * @param options.s3Prefix - The S3 key prefix to use for each object. If not provided, files will be uploaded to the root of the bucket todo(). + * @param options.filter - A callback to allow users to filter out unwanted S3 object. It is invoked for each S3 object. An example implementation is a predicate that takes an S3Object and returns a boolean indicating whether this S3Object should be uploaded. + * @param options.s3Delimiter - Default "/". The S3 delimiter. A delimiter causes a list operation to roll up all the keys that share a common prefix into a single summary list result. + * @param options.putObjectRequestCallback - A callback mechanism to allow customers to update individual putObjectRequest that the S3 Transfer Manager generates. + * @param options.failurePolicy - The failure policy to handle failed requests. + * @param options.transferOptions - Allows supplying an AbortSignal and/or transfer event listeners. + * + * @returns the number of objects that have been uploaded and the number of objects that have failed + */ + uploadAll(options: { + bucket: string; + source: string; + followSymbolicLinks?: boolean; + recursive?: boolean; + s3Prefix?: string; + filter?: (filepath: string) => boolean; + s3Delimiter?: string; + putObjectRequestCallback?: (putObjectRequest: PutObjectCommandInput) => Promise; + failurePolicy?: (error?: unknown) => Promise; + transferOptions?: TransferOptions; + }): Promise<{ + objectsUploaded: number; + objectsFailed: number; + }>; + + /** + * Represents an API to download all objects under a bucket to the provided local directory. + * + * @param options.bucket - The name of the bucket. + * @param options.destination - The destination directory. + * @param options.s3Prefix - Specify the S3 prefix that limits the response to keys that begin with the specified prefix. + * @param options.s3Delimiter - Specify the S3 delimiter. + * @param options.recursive - Whether to upload directories recursively. + * @param options.filter - A callback to allow users to filter out unwanted S3 object. It is invoked for each S3 object. An example implementation is a predicate that takes an S3Object and returns a boolean indicating whether this S3Object should be downloaded. + * @param options.getObjectRequestCallback - A callback mechanism to allow customers to update individual getObjectRequest that the S3 Transfer Manager generates. + * @param options.failurePolicy - The failure policy to handle failed requests. + * @param options.transferOptions - Allows supplying an AbortSignal and/or transfer event listeners. + * + * @returns The number of objects that have been uploaded and the number of objects that have failed + */ + downloadAll(options: { + bucket: string; + destination: string; + s3Prefix?: string; + s3Delimiter?: string; + recursive?: boolean; + filter?: (object?: S3Object) => boolean; + getObjectRequestCallback?: (getObjectRequest: GetObjectCommandInput) => Promise; + failurePolicy?: (error?: unknown) => Promise; + transferOptions?: TransferOptions; + }): Promise<{ + objectsDownloaded: number; + objectsFailed: number; + }>; + + /** + * Registers a callback function to be executed when a specific transfer event occurs. + * Supports monitoring the full lifecycle of transfers. + * + * @param type - The type of event to listen for. + * @param callback - Function to execute when the specified event occurs. + * @param options - Optional configuration for event listener behavior. + * + * @alpha + */ + addEventListener( + type: "transferInitiated", + callback: EventListener, + options?: AddEventListenerOptions | boolean + ): void; + addEventListener( + type: "bytesTransferred", + callback: EventListener, + options?: AddEventListenerOptions | boolean + ): void; + addEventListener( + type: "transferComplete", + callback: EventListener, + options?: AddEventListenerOptions | boolean + ): void; + addEventListener( + type: "transferFailed", + callback: EventListener, + options?: AddEventListenerOptions | boolean + ): void; + addEventListener(type: string, callback: EventListener, options?: AddEventListenerOptions | boolean): void; + + /** + * Dispatches an event to the registered event listeners. + * Triggers callbacks registered via addEventListener with matching event types. + * + * @param event - The event object to dispatch. + * @returns whether the event ran to completion + * + * @alpha + */ + dispatchEvent(event: Event & TransferEvent): boolean; + dispatchEvent(event: Event & TransferCompleteEvent): boolean; + dispatchEvent(event: Event): boolean; + + /** + * Removes a previously registered event listener from the specified event type. + * Stops the callback from being invoked when the event occurs. + * + * @param type - The type of event to stop listening for. + * @param callback - The function that was previously registered. + * @param options - Optional configuration for the event listener. + * + * @alpha + */ + removeEventListener( + type: "transferInitiated", + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + removeEventListener( + type: "bytesTransferred", + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + removeEventListener( + type: "transferComplete", + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + removeEventListener( + type: "transferFailed", + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void; + removeEventListener(type: string, callback: EventListener, options?: RemoveEventListenerOptions | boolean): void; +} + +/** + * Provides a snapshot of the progress during a single object transfer. + * + * @alpha + */ +export interface SingleObjectProgressSnapshot { + transferredBytes: number; + totalBytes?: number; + response?: UploadResponse | DownloadResponse; +} + +/** + * Provides a snapshot of the progress during a directory transfer. + * + * @alpha + */ +export interface DirectoryProgressSnapshot { + transferredBytes: number; + totalBytes?: number; + transferredFiles: number; + totalFiles?: number; +} + +/** + * Progress snapshot for either single object transfers or directory transfers. + * + * @alpha + */ +export type TransferProgressSnapshot = SingleObjectProgressSnapshot | DirectoryProgressSnapshot; + +/** + * Event interface for transfer progress events. + * Used for tracking ongoing transfers with the original request and progress snapshot. + * + * @alpha + */ +export interface TransferEvent extends Event { + request: UploadRequest | DownloadRequest; + snapshot: TransferProgressSnapshot; +} + +/** + * Event interface for transfer completion. + * Extends TransferEvent with response data that is received after a completed transfer. + * + * @alpha + */ +export interface TransferCompleteEvent extends TransferEvent { + response: UploadResponse | DownloadResponse; +} + +/** + * Collection of event handlers to monitor transfer lifecycle events. + * Allows a way to register callbacks for each stage of the transfer process. + * + * @alpha + */ +export interface TransferEventListeners { + transferInitiated?: EventListener[]; + bytesTransferred?: EventListener[]; + transferComplete?: EventListener[]; + transferFailed?: EventListener[]; +} + +/** + * Event listener type. + * + * @alpha + */ +export interface JoinStreamIterationEvents { + onBytes?: (byteLength: number, index: number) => void; + onCompletion?: (byteLength: number, index: number) => void; + onFailure?: (error: unknown, index: number) => void; +} diff --git a/lib/lib-storage/vitest.config.browser.ts b/lib/lib-storage/vitest.config.browser.ts new file mode 100644 index 000000000000..2e6679a3d07e --- /dev/null +++ b/lib/lib-storage/vitest.config.browser.ts @@ -0,0 +1,8 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["**/*.browser.spec.ts"], + environment: "happy-dom", + }, +}); diff --git a/package.json b/package.json index 2da8be494643..14563dee9522 100644 --- a/package.json +++ b/package.json @@ -95,7 +95,7 @@ "fs-extra": "^9.0.0", "generate-changelog": "^1.7.1", "glob": "7.1.6", - "happy-dom": "16.3.0", + "happy-dom": "^18.0.1", "husky": "^4.2.3", "jest": "29.7.0", "jmespath": "^0.15.0", diff --git a/tests/react-native/End2End/android/gradle/wrapper/gradle-wrapper.properties b/tests/react-native/End2End/android/gradle/wrapper/gradle-wrapper.properties index ee69dd68d1a6..3ab114e6e74f 100755 --- a/tests/react-native/End2End/android/gradle/wrapper/gradle-wrapper.properties +++ b/tests/react-native/End2End/android/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ +#Mon Jun 16 18:19:18 UTC 2025 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.9-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/yarn.lock b/yarn.lock index fef234f88dfc..e71e5abdd5ba 100644 --- a/yarn.lock +++ b/yarn.lock @@ -29692,6 +29692,15 @@ __metadata: languageName: node linkType: hard +"@types/node@npm:^20.0.0": + version: 20.19.9 + resolution: "@types/node@npm:20.19.9" + dependencies: + undici-types: "npm:~6.21.0" + checksum: 10c0/c6738131f1698258a5ac1e0185e4fc56977f7f566cd0ee11167f93f2339478470257bd82c5e1908a936a204e0ad7996d741356a1a07c04997a236161ea23a874 + languageName: node + linkType: hard + "@types/normalize-package-data@npm:^2.4.0": version: 2.4.4 resolution: "@types/normalize-package-data@npm:2.4.4" @@ -29748,6 +29757,13 @@ __metadata: languageName: node linkType: hard +"@types/whatwg-mimetype@npm:^3.0.2": + version: 3.0.2 + resolution: "@types/whatwg-mimetype@npm:3.0.2" + checksum: 10c0/dad39d1e4abe760a0a963c84bbdbd26b1df0eb68aff83bdf6ecbb50ad781ead777f6906d19a87007790b750f7500a12e5624d31fc6a1529d14bd19b5c3a316d1 + languageName: node + linkType: hard + "@types/ws@npm:*": version: 8.5.13 resolution: "@types/ws@npm:8.5.13" @@ -31088,7 +31104,7 @@ __metadata: fs-extra: "npm:^9.0.0" generate-changelog: "npm:^1.7.1" glob: "npm:7.1.6" - happy-dom: "npm:16.3.0" + happy-dom: "npm:^18.0.1" husky: "npm:^4.2.3" jest: "npm:29.7.0" jmespath: "npm:^0.15.0" @@ -34840,16 +34856,6 @@ __metadata: languageName: node linkType: hard -"happy-dom@npm:16.3.0": - version: 16.3.0 - resolution: "happy-dom@npm:16.3.0" - dependencies: - webidl-conversions: "npm:^7.0.0" - whatwg-mimetype: "npm:^3.0.0" - checksum: 10c0/c90e29ff44818008aaae1fc65b276a9a1920455884fe895fdc634ced3f98a71fe81317ddf9a2ec1d7d07af0b300500d9f652ba7cfc144ed96c5f7c480edde83b - languageName: node - linkType: hard - "happy-dom@npm:^15.7.4": version: 15.11.7 resolution: "happy-dom@npm:15.11.7" @@ -34861,6 +34867,17 @@ __metadata: languageName: node linkType: hard +"happy-dom@npm:^18.0.1": + version: 18.0.1 + resolution: "happy-dom@npm:18.0.1" + dependencies: + "@types/node": "npm:^20.0.0" + "@types/whatwg-mimetype": "npm:^3.0.2" + whatwg-mimetype: "npm:^3.0.0" + checksum: 10c0/10f2115f5001fdaf1aedcbda89c15248a1c2e43a25d7e774cb641a35bf6763cef9097b438ef3c2248ab59a0ef33b3e88cb94da096f2bb0fc109ba3f43f7c66d4 + languageName: node + linkType: hard + "har-schema@npm:^2.0.0": version: 2.0.0 resolution: "har-schema@npm:2.0.0" @@ -41636,6 +41653,13 @@ __metadata: languageName: node linkType: hard +"undici-types@npm:~6.21.0": + version: 6.21.0 + resolution: "undici-types@npm:6.21.0" + checksum: 10c0/c01ed51829b10aa72fc3ce64b747f8e74ae9b60eafa19a7b46ef624403508a54c526ffab06a14a26b3120d055e1104d7abe7c9017e83ced038ea5cf52f8d5e04 + languageName: node + linkType: hard + "unique-filename@npm:^2.0.0": version: 2.0.1 resolution: "unique-filename@npm:2.0.1"