55
66package aws.sdk.kotlin.hll.s3transfermanager
77
8+ import aws.sdk.kotlin.hll.s3transfermanager.model.MultiPartDownloadType
9+ import aws.sdk.kotlin.hll.s3transfermanager.model.Part
10+ import aws.sdk.kotlin.hll.s3transfermanager.model.UploadFileRequest
11+ import aws.sdk.kotlin.hll.s3transfermanager.model.UploadFileRequest.Companion.toCreateMultiPartUploadRequest
12+ import aws.sdk.kotlin.hll.s3transfermanager.model.UploadFileRequest.Companion.toPutObjectRequest
13+ import aws.sdk.kotlin.hll.s3transfermanager.model.UploadFileResponse
814import aws.sdk.kotlin.services.s3.S3Client
15+ import aws.sdk.kotlin.services.s3.model.CreateMultipartUploadRequest
16+ import aws.sdk.kotlin.services.s3.model.PutObjectRequest
917import aws.sdk.kotlin.services.s3.withConfig
18+ import kotlinx.coroutines.Deferred
19+ import kotlinx.coroutines.async
20+ import kotlinx.coroutines.coroutineScope
1021
1122/* *
1223 * High level utility for managing transfers to Amazon S3.
1324 */
1425public class S3TransferManager private constructor(
1526 public val client : S3Client ,
16- public val partSize : Long? ,
17- public val multipartUploadThreshold : Long? ,
18- public val multipartDownloadType : MultiPartDownloadType ? ,
19- public val interceptors : MutableList <TransferInterceptor >? ,
27+ public val partSize : Long ,
28+ public val multipartUploadThreshold : Long ,
29+ public val multipartDownloadType : MultiPartDownloadType ,
30+ public val interceptors : MutableList <TransferInterceptor >,
2031) {
2132 public companion object {
2233 public operator fun invoke (block : Builder .() -> Unit ): S3TransferManager =
@@ -25,20 +36,100 @@ public class S3TransferManager private constructor(
2536
2637 public class Builder {
2738 public var client: S3Client ? = null
28- public var partSize: Long? = 8_000_000L
29- public var multipartUploadThreshold: Long? = 16_000_000L
30- public var multipartDownloadType: MultiPartDownloadType ? = Part
31- public var interceptors: MutableList <TransferInterceptor >? = mutableListOf ()
39+ public var partSize: Long? = null
40+ public var multipartUploadThreshold: Long? = null
41+ public var multipartDownloadType: MultiPartDownloadType ? = null
42+ public var interceptors: MutableList <TransferInterceptor >? = null
3243
3344 internal fun build (): S3TransferManager =
3445 S3TransferManager (
3546 client = client?.withConfig { interceptors + = BusinessMetricInterceptor } ? : error(" client must be set" ),
36- partSize = partSize,
37- multipartUploadThreshold = multipartUploadThreshold,
38- multipartDownloadType = multipartDownloadType,
39- interceptors = interceptors
47+ partSize = partSize ? : 8_000_000L ,
48+ multipartUploadThreshold = multipartUploadThreshold ? : 16_000_000L ,
49+ multipartDownloadType = multipartDownloadType ? : Part ,
50+ interceptors = interceptors ? : mutableListOf (),
4051 )
4152 }
4253
43- public fun x (): String = " " // TODO
54+ private var context: TransferContext = TransferContext ()
55+
56+ // TODO: Try to find parts of the code you can commonize
57+ /* *
58+ * TODO
59+ */
60+ public suspend fun uploadFile (uploadFileRequest : UploadFileRequest ): Deferred <UploadFileResponse > = coroutineScope {
61+ async {
62+ val multiPartUpload = uploadFileRequest.contentLength >= multipartUploadThreshold
63+
64+ context.request = if (multiPartUpload) {
65+ uploadFileRequest.toCreateMultiPartUploadRequest()
66+ } else {
67+ uploadFileRequest.toPutObjectRequest()
68+ }
69+
70+ operationHook(TransferInitiated ) {
71+ context.transferredBytes = 0L
72+ context.transferableBytes = uploadFileRequest.contentLength
73+
74+ if (multiPartUpload) {
75+ context.response = client.createMultipartUpload(context.request as CreateMultipartUploadRequest )
76+ }
77+ }
78+
79+ operationHook(BytesTransferred ) {
80+ if (multiPartUpload) {
81+ // TODO: MPU logic
82+ // TODO: Update bytes transferred
83+ } else {
84+ context.response = client.putObject(context.request as PutObjectRequest )
85+ context.transferredBytes = context.transferableBytes
86+ }
87+ }
88+
89+ operationHook(TransferCompleted ) {
90+ if (multiPartUpload) {
91+ // TODO: MPU logic
92+ // TODO: Update bytes transferred?
93+ }
94+ }
95+
96+ UploadFileResponse .fromS3Response(context.response)
97+ }
98+ }
99+
100+ private suspend fun operationHook (hook : TransferHook , block : suspend () -> Any ) {
101+ interceptors.forEach { interceptor ->
102+ when (hook) {
103+ is TransferInitiated -> {
104+ interceptor.readBeforeTransferInitiated(context)
105+ context = interceptor.modifyBeforeTransferInitiated(context)
106+ block.invoke()
107+ interceptor.readAfterTransferInitiated(context)
108+ context = interceptor.modifyAfterTransferInitiated(context)
109+ }
110+ is BytesTransferred -> {
111+ interceptor.readBeforeBytesTransferred(context)
112+ context = interceptor.modifyBeforeBytesTransferred(context)
113+ block.invoke()
114+ interceptor.readAfterBytesTransferred(context)
115+ context = interceptor.modifyAfterBytesTransferred(context)
116+ }
117+ is FileTransferred -> {
118+ interceptor.readBeforeFileTransferred(context)
119+ context = interceptor.modifyBeforeFileTransferred(context)
120+ block.invoke()
121+ interceptor.readAfterFileTransferred(context)
122+ context = interceptor.modifyAfterFileTransferred(context)
123+ }
124+ is TransferCompleted -> {
125+ interceptor.readBeforeTransferCompleted(context)
126+ context = interceptor.modifyBeforeTransferCompleted(context)
127+ block.invoke()
128+ interceptor.readAfterTransferCompleted(context)
129+ context = interceptor.modifyAfterTransferCompleted(context)
130+ }
131+ else -> error(" TransferHook not implemented: ${hook::class .simpleName} " )
132+ }
133+ }
134+ }
44135}
0 commit comments