19
19
import static software .amazon .awssdk .services .s3 .crt .S3CrtSdkHttpExecutionAttribute .CRT_PROGRESS_LISTENER ;
20
20
import static software .amazon .awssdk .services .s3 .crt .S3CrtSdkHttpExecutionAttribute .METAREQUEST_PAUSE_OBSERVABLE ;
21
21
import static software .amazon .awssdk .services .s3 .internal .crt .S3InternalSdkHttpExecutionAttribute .CRT_PAUSE_RESUME_TOKEN ;
22
+ import static software .amazon .awssdk .services .s3 .multipart .S3MultipartExecutionAttribute .JAVA_PROGRESS_LISTENER ;
22
23
23
24
import java .util .concurrent .CompletableFuture ;
24
25
import java .util .function .Consumer ;
33
34
import software .amazon .awssdk .services .s3 .model .PutObjectResponse ;
34
35
import software .amazon .awssdk .transfer .s3 .S3TransferManager ;
35
36
import software .amazon .awssdk .transfer .s3 .internal .model .CrtFileUpload ;
37
+ import software .amazon .awssdk .transfer .s3 .internal .model .DefaultUpload ;
36
38
import software .amazon .awssdk .transfer .s3 .internal .progress .TransferProgressUpdater ;
37
39
import software .amazon .awssdk .transfer .s3 .model .CompletedFileUpload ;
40
+ import software .amazon .awssdk .transfer .s3 .model .CompletedUpload ;
38
41
import software .amazon .awssdk .transfer .s3 .model .FileUpload ;
39
42
import software .amazon .awssdk .transfer .s3 .model .ResumableFileUpload ;
43
+ import software .amazon .awssdk .transfer .s3 .model .Upload ;
40
44
import software .amazon .awssdk .transfer .s3 .model .UploadFileRequest ;
45
+ import software .amazon .awssdk .transfer .s3 .model .UploadRequest ;
41
46
import software .amazon .awssdk .utils .CompletableFutureUtils ;
42
47
import software .amazon .awssdk .utils .Validate ;
43
48
@@ -54,6 +59,51 @@ class CrtS3TransferManager extends GenericS3TransferManager {
54
59
this .s3AsyncClient = s3AsyncClient ;
55
60
}
56
61
62
+ @ Override
63
+ public final Upload upload (UploadRequest uploadRequest ) {
64
+ Validate .paramNotNull (uploadRequest , "uploadRequest" );
65
+
66
+ AsyncRequestBody requestBody = uploadRequest .requestBody ();
67
+
68
+ CompletableFuture <CompletedUpload > returnFuture = new CompletableFuture <>();
69
+
70
+ TransferProgressUpdater progressUpdater = new TransferProgressUpdater (uploadRequest ,
71
+ requestBody .contentLength ().orElse (null ));
72
+ progressUpdater .transferInitiated ();
73
+ // requestBody = progressUpdater.wrapRequestBody(requestBody);
74
+ progressUpdater .registerCompletion (returnFuture );
75
+
76
+ S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable ();
77
+
78
+ Consumer <SdkHttpExecutionAttributes .Builder > attachObservable =
79
+ b -> b .put (METAREQUEST_PAUSE_OBSERVABLE , observable )
80
+ .put (CRT_PROGRESS_LISTENER , progressUpdater .crtProgressListener ());
81
+
82
+ PutObjectRequest putObjectRequest = attachCrtSdkAttribute (uploadRequest .putObjectRequest (), attachObservable );
83
+
84
+ progressUpdater .transferInitiated ();
85
+ progressUpdater .registerCompletion (returnFuture );
86
+
87
+ try {
88
+ assertNotUnsupportedArn (uploadRequest .putObjectRequest ().bucket (), "upload" );
89
+
90
+ CompletableFuture <PutObjectResponse > future =
91
+ s3AsyncClient .putObject (putObjectRequest , requestBody );
92
+
93
+ // Forward upload cancellation to future
94
+ CompletableFutureUtils .forwardExceptionTo (returnFuture , future );
95
+
96
+ CompletableFutureUtils .forwardTransformedResultTo (future , returnFuture ,
97
+ r -> CompletedUpload .builder ()
98
+ .response (r )
99
+ .build ());
100
+ } catch (Throwable throwable ) {
101
+ returnFuture .completeExceptionally (throwable );
102
+ }
103
+
104
+ return new DefaultUpload (returnFuture , progressUpdater .progress ());
105
+ }
106
+
57
107
@ Override
58
108
public FileUpload uploadFile (UploadFileRequest uploadFileRequest ) {
59
109
Validate .paramNotNull (uploadFileRequest , "uploadFileRequest" );
0 commit comments