|
37 | 37 | import software.amazon.awssdk.services.s3.model.CopyObjectResponse; |
38 | 38 | import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; |
39 | 39 | import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; |
| 40 | +import software.amazon.awssdk.services.s3.model.GetObjectRequest; |
40 | 41 | import software.amazon.awssdk.services.s3.model.GetObjectResponse; |
41 | 42 | import software.amazon.awssdk.services.s3.model.ObjectIdentifier; |
42 | 43 | import software.amazon.awssdk.services.s3.model.PutObjectResponse; |
43 | 44 | import software.amazon.awssdk.services.s3.model.S3Exception; |
44 | 45 | import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration; |
| 46 | +import software.amazon.awssdk.transfer.s3.S3TransferManager; |
| 47 | +import software.amazon.awssdk.transfer.s3.model.CompletedDownload; |
| 48 | +import software.amazon.awssdk.transfer.s3.model.Download; |
| 49 | +import software.amazon.awssdk.transfer.s3.model.DownloadRequest; |
| 50 | +import software.amazon.awssdk.transfer.s3.model.Upload; |
| 51 | +import software.amazon.awssdk.transfer.s3.model.UploadRequest; |
| 52 | +import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; |
45 | 53 | import software.amazon.encryption.s3.internal.InstructionFileConfig; |
46 | 54 | import software.amazon.encryption.s3.materials.KmsKeyring; |
47 | 55 | import software.amazon.encryption.s3.utils.BoundedInputStream; |
|
61 | 69 | import java.util.Map; |
62 | 70 | import java.util.concurrent.CompletableFuture; |
63 | 71 | import java.util.concurrent.CompletionException; |
| 72 | +import java.util.concurrent.ExecutionException; |
64 | 73 | import java.util.concurrent.ExecutorService; |
65 | 74 | import java.util.concurrent.Executors; |
66 | 75 |
|
@@ -141,6 +150,142 @@ public void asyncCustomConfiguration() { |
141 | 150 | s3Client.close(); |
142 | 151 | } |
143 | 152 |
|
| 153 | + @Test |
| 154 | + public void transferManagerUploadString() { |
| 155 | + final String objectKey = appendTestSuffix("tm-string"); |
| 156 | + final String input = "short test of s3 encryption client with transfer manager"; |
| 157 | + S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder() |
| 158 | + .aesKey(AES_KEY) |
| 159 | + .build(); |
| 160 | + S3TransferManager transferManager = |
| 161 | + S3TransferManager.builder() |
| 162 | + .s3Client(v3AsyncClient) |
| 163 | + .build(); |
| 164 | + |
| 165 | + Upload upload = transferManager.upload(UploadRequest.builder() |
| 166 | + .putObjectRequest((builder -> builder |
| 167 | + .bucket(BUCKET) |
| 168 | + .key(objectKey) |
| 169 | + .build())) |
| 170 | + .requestBody(AsyncRequestBody.fromString(input)) |
| 171 | + .build()); |
| 172 | + upload.completionFuture().join(); |
| 173 | + |
| 174 | + // tm download |
| 175 | + Download<ResponseBytes<GetObjectResponse>> download = transferManager.download(DownloadRequest.builder() |
| 176 | + .getObjectRequest(GetObjectRequest.builder() |
| 177 | + .bucket(BUCKET) |
| 178 | + .key(objectKey) |
| 179 | + .build()) |
| 180 | + .responseTransformer(AsyncResponseTransformer.toBytes()) |
| 181 | + .build()); |
| 182 | + CompletedDownload<ResponseBytes<GetObjectResponse>> resp = download.completionFuture().join(); |
| 183 | + assertEquals(input, resp.result().asUtf8String()); |
| 184 | + |
| 185 | + // Cleanup |
| 186 | + deleteObject(BUCKET, objectKey, v3AsyncClient); |
| 187 | + transferManager.close(); |
| 188 | + } |
| 189 | + |
| 190 | + @Test |
| 191 | + public void transferManagerUploadStream() throws IOException { |
| 192 | + final String objectKey = appendTestSuffix("tm-stream"); |
| 193 | + |
| 194 | + final long fileSizeLimit = 1024 * 1024 * 100; |
| 195 | + final InputStream inputStream = new BoundedInputStream(fileSizeLimit); |
| 196 | + final InputStream objectStreamForResult = new BoundedInputStream(fileSizeLimit); |
| 197 | + final InputStream objectStreamForResultTm = new BoundedInputStream(fileSizeLimit); |
| 198 | + |
| 199 | + S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder() |
| 200 | + .aesKey(AES_KEY) |
| 201 | + .enableDelayedAuthenticationMode(true) |
| 202 | + .enableMultipartPutObject(true) |
| 203 | + .build(); |
| 204 | + S3TransferManager transferManager = |
| 205 | + S3TransferManager.builder() |
| 206 | + .s3Client(v3AsyncClient) |
| 207 | + .build(); |
| 208 | + |
| 209 | + ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); |
| 210 | + Upload upload = transferManager.upload(UploadRequest.builder() |
| 211 | + .putObjectRequest((builder -> builder |
| 212 | + .bucket(BUCKET) |
| 213 | + .key(objectKey) |
| 214 | + .build())) |
| 215 | + .requestBody(AsyncRequestBody.fromInputStream(inputStream, fileSizeLimit, singleThreadExecutor)) |
| 216 | + .addTransferListener(LoggingTransferListener.create()) |
| 217 | + .build()); |
| 218 | + upload.completionFuture().join(); |
| 219 | + singleThreadExecutor.shutdown(); |
| 220 | + |
| 221 | + // tm download |
| 222 | + Download<ResponseInputStream<GetObjectResponse>> download = transferManager.download(DownloadRequest.builder() |
| 223 | + .getObjectRequest(GetObjectRequest.builder() |
| 224 | + .bucket(BUCKET) |
| 225 | + .key(objectKey) |
| 226 | + .build()) |
| 227 | + .responseTransformer(AsyncResponseTransformer.toBlockingInputStream()) |
| 228 | + .build()); |
| 229 | + |
| 230 | + CompletedDownload<ResponseInputStream<GetObjectResponse>> resp = download.completionFuture().join(); |
| 231 | + assertTrue(IOUtils.contentEquals(objectStreamForResultTm, resp.result())); |
| 232 | + |
| 233 | + // Cleanup |
| 234 | + deleteObject(BUCKET, objectKey, v3AsyncClient); |
| 235 | + transferManager.close(); |
| 236 | + } |
| 237 | + |
| 238 | + @Test |
| 239 | + public void transferManagerUploadStreamCrt() throws ExecutionException, InterruptedException, IOException { |
| 240 | + final String objectKey = appendTestSuffix("tm-stream-crt"); |
| 241 | + |
| 242 | + final long fileSizeLimit = 1024 * 1024 * 100; |
| 243 | + final InputStream inputStream = new BoundedInputStream(fileSizeLimit); |
| 244 | + final InputStream objectStreamForResult = new BoundedInputStream(fileSizeLimit); |
| 245 | + final InputStream objectStreamForResultTm = new BoundedInputStream(fileSizeLimit); |
| 246 | + |
| 247 | + S3AsyncClient wrappedCrt = S3AsyncClient.crtBuilder() |
| 248 | + .minimumPartSizeInBytes(8000000L) |
| 249 | + .thresholdInBytes(500L) |
| 250 | + .build(); |
| 251 | + S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder() |
| 252 | + .wrappedClient(wrappedCrt) |
| 253 | + .aesKey(AES_KEY) |
| 254 | + .enableDelayedAuthenticationMode(true) |
| 255 | + .enableMultipartPutObject(true) |
| 256 | + .build(); |
| 257 | + S3TransferManager transferManager = |
| 258 | + S3TransferManager.builder() |
| 259 | + .s3Client(v3AsyncClient) |
| 260 | + .build(); |
| 261 | + |
| 262 | + ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); |
| 263 | + Upload upload = transferManager.upload(UploadRequest.builder() |
| 264 | + .putObjectRequest((builder -> builder |
| 265 | + .bucket(BUCKET) |
| 266 | + .key(objectKey) |
| 267 | + .build())) |
| 268 | + .requestBody(AsyncRequestBody.fromInputStream(inputStream, fileSizeLimit, singleThreadExecutor)) |
| 269 | + .addTransferListener(LoggingTransferListener.create()) |
| 270 | + .build()); |
| 271 | + upload.completionFuture().join(); |
| 272 | + singleThreadExecutor.shutdown(); |
| 273 | + |
| 274 | + Download<ResponseInputStream<GetObjectResponse>> download = transferManager.download(DownloadRequest.builder() |
| 275 | + .getObjectRequest(GetObjectRequest.builder() |
| 276 | + .bucket(BUCKET) |
| 277 | + .key(objectKey) |
| 278 | + .build()) |
| 279 | + .responseTransformer(AsyncResponseTransformer.toBlockingInputStream()) |
| 280 | + .build()); |
| 281 | + download.completionFuture().join(); |
| 282 | + CompletedDownload<ResponseInputStream<GetObjectResponse>> resp = download.completionFuture().get(); |
| 283 | + |
| 284 | + assertTrue(IOUtils.contentEquals(objectStreamForResultTm, resp.result())); |
| 285 | + deleteObject(BUCKET, objectKey, v3AsyncClient); |
| 286 | + transferManager.close(); |
| 287 | + } |
| 288 | + |
144 | 289 | @Test |
145 | 290 | public void asyncTopLevelConfigurationAllOptions() { |
146 | 291 | final String objectKey = appendTestSuffix("async-top-level-all-options"); |
|
0 commit comments