15
15
16
16
package software .amazon .awssdk .services .s3 .internal .multipart ;
17
17
18
+ import static software .amazon .awssdk .services .s3 .internal .multipart .MultipartUploadHelper .contentLengthMismatchForPart ;
19
+ import static software .amazon .awssdk .services .s3 .internal .multipart .MultipartUploadHelper .partNumMismatch ;
18
20
import static software .amazon .awssdk .services .s3 .multipart .S3MultipartExecutionAttribute .JAVA_PROGRESS_LISTENER ;
19
21
20
22
import java .util .Collection ;
21
23
import java .util .HashMap ;
22
24
import java .util .Map ;
25
+ import java .util .Optional ;
23
26
import java .util .concurrent .CompletableFuture ;
24
27
import java .util .concurrent .ConcurrentLinkedQueue ;
25
28
import java .util .concurrent .atomic .AtomicBoolean ;
32
35
import software .amazon .awssdk .annotations .SdkInternalApi ;
33
36
import software .amazon .awssdk .core .async .AsyncRequestBody ;
34
37
import software .amazon .awssdk .core .async .listener .PublisherListener ;
38
+ import software .amazon .awssdk .core .exception .SdkClientException ;
35
39
import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadResponse ;
36
40
import software .amazon .awssdk .services .s3 .model .CompletedPart ;
37
41
import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
@@ -54,10 +58,10 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
54
58
private final AtomicBoolean failureActionInitiated = new AtomicBoolean (false );
55
59
private final AtomicInteger partNumber = new AtomicInteger (1 );
56
60
private final MultipartUploadHelper multipartUploadHelper ;
57
- private final long contentLength ;
61
+ private final long totalSize ;
58
62
private final long partSize ;
59
- private final int partCount ;
60
- private final int numExistingParts ;
63
+ private final int expectedNumParts ;
64
+ private final int existingNumParts ;
61
65
private final String uploadId ;
62
66
private final Collection <CompletableFuture <CompletedPart >> futures = new ConcurrentLinkedQueue <>();
63
67
private final PutObjectRequest putObjectRequest ;
@@ -77,25 +81,21 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
77
81
KnownContentLengthAsyncRequestBodySubscriber (MpuRequestContext mpuRequestContext ,
78
82
CompletableFuture <PutObjectResponse > returnFuture ,
79
83
MultipartUploadHelper multipartUploadHelper ) {
80
- this .contentLength = mpuRequestContext .contentLength ();
84
+ this .totalSize = mpuRequestContext .contentLength ();
81
85
this .partSize = mpuRequestContext .partSize ();
82
- this .partCount = determinePartCount ( contentLength , partSize );
86
+ this .expectedNumParts = mpuRequestContext . expectedNumParts ( );
83
87
this .putObjectRequest = mpuRequestContext .request ().left ();
84
88
this .returnFuture = returnFuture ;
85
89
this .uploadId = mpuRequestContext .uploadId ();
86
90
this .existingParts = mpuRequestContext .existingParts () == null ? new HashMap <>() : mpuRequestContext .existingParts ();
87
- this .numExistingParts = NumericUtils .saturatedCast (mpuRequestContext .numPartsCompleted ());
88
- this .completedParts = new AtomicReferenceArray <>(partCount );
91
+ this .existingNumParts = NumericUtils .saturatedCast (mpuRequestContext .numPartsCompleted ());
92
+ this .completedParts = new AtomicReferenceArray <>(expectedNumParts );
89
93
this .multipartUploadHelper = multipartUploadHelper ;
90
94
this .progressListener = putObjectRequest .overrideConfiguration ().map (c -> c .executionAttributes ()
91
95
.getAttribute (JAVA_PROGRESS_LISTENER ))
92
96
.orElseGet (PublisherListener ::noOp );
93
97
}
94
98
95
- private int determinePartCount (long contentLength , long partSize ) {
96
- return (int ) Math .ceil (contentLength / (double ) partSize );
97
- }
98
-
99
99
public S3ResumeToken pause () {
100
100
isPaused = true ;
101
101
@@ -119,8 +119,8 @@ public S3ResumeToken pause() {
119
119
return S3ResumeToken .builder ()
120
120
.uploadId (uploadId )
121
121
.partSize (partSize )
122
- .totalNumParts ((long ) partCount )
123
- .numPartsCompleted (numPartsCompleted + numExistingParts )
122
+ .totalNumParts ((long ) expectedNumParts )
123
+ .numPartsCompleted (numPartsCompleted + existingNumParts )
124
124
.build ();
125
125
}
126
126
@@ -145,21 +145,32 @@ public void onSubscribe(Subscription s) {
145
145
146
146
@ Override
147
147
public void onNext (AsyncRequestBody asyncRequestBody ) {
148
- if (isPaused ) {
148
+ if (isPaused || isDone ) {
149
149
return ;
150
150
}
151
151
152
- if ( existingParts . containsKey ( partNumber .get ())) {
153
- partNumber . getAndIncrement ();
152
+ int currentPartNum = partNumber .getAndIncrement ();
153
+ if ( existingParts . containsKey ( currentPartNum )) {
154
154
asyncRequestBody .subscribe (new CancelledSubscriber <>());
155
155
subscription .request (1 );
156
156
asyncRequestBody .contentLength ().ifPresent (progressListener ::subscriberOnNext );
157
157
return ;
158
158
}
159
159
160
+ Optional <SdkClientException > sdkClientException = validatePart (asyncRequestBody , currentPartNum );
161
+ if (sdkClientException .isPresent ()) {
162
+ multipartUploadHelper .failRequestsElegantly (futures ,
163
+ sdkClientException .get (),
164
+ uploadId ,
165
+ returnFuture ,
166
+ putObjectRequest );
167
+ subscription .cancel ();
168
+ return ;
169
+ }
170
+
160
171
asyncRequestBodyInFlight .incrementAndGet ();
161
172
UploadPartRequest uploadRequest = SdkPojoConversionUtils .toUploadPartRequest (putObjectRequest ,
162
- partNumber . getAndIncrement () ,
173
+ currentPartNum ,
163
174
uploadId );
164
175
165
176
Consumer <CompletedPart > completedPartConsumer = completedPart -> completedParts .set (completedPart .partNumber () - 1 ,
@@ -179,6 +190,39 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
179
190
subscription .request (1 );
180
191
}
181
192
193
+ private Optional <SdkClientException > validatePart (AsyncRequestBody asyncRequestBody , int currentPartNum ) {
194
+ if (!asyncRequestBody .contentLength ().isPresent ()) {
195
+ return Optional .of (MultipartUploadHelper .contentLengthMissingForPart (currentPartNum ));
196
+ }
197
+
198
+ Long currentPartSize = asyncRequestBody .contentLength ().get ();
199
+
200
+ if (currentPartNum > expectedNumParts ) {
201
+ return Optional .of (partNumMismatch (expectedNumParts , currentPartNum ));
202
+ }
203
+
204
+ if (currentPartNum == expectedNumParts ) {
205
+ return validateLastPartSize (currentPartSize );
206
+ }
207
+
208
+ if (currentPartSize != partSize ) {
209
+ return Optional .of (contentLengthMismatchForPart (partSize , currentPartSize ));
210
+ }
211
+ return Optional .empty ();
212
+ }
213
+
214
+ private Optional <SdkClientException > validateLastPartSize (Long currentPartSize ) {
215
+ long remainder = totalSize % partSize ;
216
+ long expectedLastPartSize = remainder == 0 ? partSize : remainder ;
217
+ if (currentPartSize != expectedLastPartSize ) {
218
+ return Optional .of (
219
+ SdkClientException .create ("Content length of the last part must be equal to the "
220
+ + "expected last part size. Expected: " + expectedLastPartSize
221
+ + ", Actual: " + currentPartSize ));
222
+ }
223
+ return Optional .empty ();
224
+ }
225
+
182
226
private boolean shouldFailRequest () {
183
227
return failureActionInitiated .compareAndSet (false , true ) && !isPaused ;
184
228
}
@@ -187,6 +231,7 @@ private boolean shouldFailRequest() {
187
231
public void onError (Throwable t ) {
188
232
log .debug (() -> "Received onError " , t );
189
233
if (failureActionInitiated .compareAndSet (false , true )) {
234
+ isDone = true ;
190
235
multipartUploadHelper .failRequestsElegantly (futures , t , uploadId , returnFuture , putObjectRequest );
191
236
}
192
237
}
@@ -203,6 +248,7 @@ public void onComplete() {
203
248
private void completeMultipartUploadIfFinished (int requestsInFlight ) {
204
249
if (isDone && requestsInFlight == 0 && completedMultipartInitiated .compareAndSet (false , true )) {
205
250
CompletedPart [] parts ;
251
+
206
252
if (existingParts .isEmpty ()) {
207
253
parts =
208
254
IntStream .range (0 , completedParts .length ())
@@ -212,15 +258,23 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
212
258
// List of CompletedParts needs to be in ascending order
213
259
parts = mergeCompletedParts ();
214
260
}
261
+
262
+ int actualNumParts = partNumber .get () - 1 ;
263
+ if (actualNumParts != expectedNumParts ) {
264
+ SdkClientException exception = partNumMismatch (expectedNumParts , actualNumParts );
265
+ multipartUploadHelper .failRequestsElegantly (futures , exception , uploadId , returnFuture , putObjectRequest );
266
+ return ;
267
+ }
268
+
215
269
completeMpuFuture = multipartUploadHelper .completeMultipartUpload (returnFuture , uploadId , parts , putObjectRequest ,
216
- contentLength );
270
+ totalSize );
217
271
}
218
272
}
219
273
220
274
private CompletedPart [] mergeCompletedParts () {
221
- CompletedPart [] merged = new CompletedPart [partCount ];
275
+ CompletedPart [] merged = new CompletedPart [expectedNumParts ];
222
276
int currPart = 1 ;
223
- while (currPart < partCount + 1 ) {
277
+ while (currPart < expectedNumParts + 1 ) {
224
278
CompletedPart completedPart = existingParts .containsKey (currPart ) ? existingParts .get (currPart ) :
225
279
completedParts .get (currPart - 1 );
226
280
merged [currPart - 1 ] = completedPart ;
0 commit comments