32
32
import software .amazon .awssdk .annotations .SdkInternalApi ;
33
33
import software .amazon .awssdk .core .async .AsyncRequestBody ;
34
34
import software .amazon .awssdk .core .async .listener .PublisherListener ;
35
+ import software .amazon .awssdk .core .exception .SdkClientException ;
35
36
import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadResponse ;
36
37
import software .amazon .awssdk .services .s3 .model .CompletedPart ;
37
38
import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
@@ -54,10 +55,10 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
54
55
private final AtomicBoolean failureActionInitiated = new AtomicBoolean (false );
55
56
private final AtomicInteger partNumber = new AtomicInteger (1 );
56
57
private final MultipartUploadHelper multipartUploadHelper ;
57
- private final long contentLength ;
58
+ private final long totalSize ;
58
59
private final long partSize ;
59
- private final int partCount ;
60
- private final int numExistingParts ;
60
+ private final int expectedNumParts ;
61
+ private final int existingNumParts ;
61
62
private final String uploadId ;
62
63
private final Collection <CompletableFuture <CompletedPart >> futures = new ConcurrentLinkedQueue <>();
63
64
private final PutObjectRequest putObjectRequest ;
@@ -77,25 +78,21 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
77
78
KnownContentLengthAsyncRequestBodySubscriber (MpuRequestContext mpuRequestContext ,
78
79
CompletableFuture <PutObjectResponse > returnFuture ,
79
80
MultipartUploadHelper multipartUploadHelper ) {
80
- this .contentLength = mpuRequestContext .contentLength ();
81
+ this .totalSize = mpuRequestContext .contentLength ();
81
82
this .partSize = mpuRequestContext .partSize ();
82
- this .partCount = determinePartCount ( contentLength , partSize );
83
+ this .expectedNumParts = mpuRequestContext . expectedNumParts ( );
83
84
this .putObjectRequest = mpuRequestContext .request ().left ();
84
85
this .returnFuture = returnFuture ;
85
86
this .uploadId = mpuRequestContext .uploadId ();
86
87
this .existingParts = mpuRequestContext .existingParts () == null ? new HashMap <>() : mpuRequestContext .existingParts ();
87
- this .numExistingParts = NumericUtils .saturatedCast (mpuRequestContext .numPartsCompleted ());
88
- this .completedParts = new AtomicReferenceArray <>(partCount );
88
+ this .existingNumParts = NumericUtils .saturatedCast (mpuRequestContext .numPartsCompleted ());
89
+ this .completedParts = new AtomicReferenceArray <>(expectedNumParts );
89
90
this .multipartUploadHelper = multipartUploadHelper ;
90
91
this .progressListener = putObjectRequest .overrideConfiguration ().map (c -> c .executionAttributes ()
91
92
.getAttribute (JAVA_PROGRESS_LISTENER ))
92
93
.orElseGet (PublisherListener ::noOp );
93
94
}
94
95
95
- private int determinePartCount (long contentLength , long partSize ) {
96
- return (int ) Math .ceil (contentLength / (double ) partSize );
97
- }
98
-
99
96
public S3ResumeToken pause () {
100
97
isPaused = true ;
101
98
@@ -119,8 +116,8 @@ public S3ResumeToken pause() {
119
116
return S3ResumeToken .builder ()
120
117
.uploadId (uploadId )
121
118
.partSize (partSize )
122
- .totalNumParts ((long ) partCount )
123
- .numPartsCompleted (numPartsCompleted + numExistingParts )
119
+ .totalNumParts ((long ) expectedNumParts )
120
+ .numPartsCompleted (numPartsCompleted + existingNumParts )
124
121
.build ();
125
122
}
126
123
@@ -145,21 +142,23 @@ public void onSubscribe(Subscription s) {
145
142
146
143
@ Override
147
144
public void onNext (AsyncRequestBody asyncRequestBody ) {
148
- if (isPaused ) {
145
+ if (isPaused || isDone ) {
149
146
return ;
150
147
}
151
148
152
- if ( existingParts . containsKey ( partNumber .get ())) {
153
- partNumber . getAndIncrement ();
149
+ int currentPartNum = partNumber .getAndIncrement ();
150
+ if ( existingParts . containsKey ( currentPartNum )) {
154
151
asyncRequestBody .subscribe (new CancelledSubscriber <>());
155
152
subscription .request (1 );
156
153
asyncRequestBody .contentLength ().ifPresent (progressListener ::subscriberOnNext );
157
154
return ;
158
155
}
159
156
157
+ validatePart (asyncRequestBody , currentPartNum );
158
+
160
159
asyncRequestBodyInFlight .incrementAndGet ();
161
160
UploadPartRequest uploadRequest = SdkPojoConversionUtils .toUploadPartRequest (putObjectRequest ,
162
- partNumber . getAndIncrement () ,
161
+ currentPartNum ,
163
162
uploadId );
164
163
165
164
Consumer <CompletedPart > completedPartConsumer = completedPart -> completedParts .set (completedPart .partNumber () - 1 ,
@@ -179,6 +178,49 @@ public void onNext(AsyncRequestBody asyncRequestBody) {
179
178
subscription .request (1 );
180
179
}
181
180
181
+ private void validatePart (AsyncRequestBody asyncRequestBody , int currentPartNum ) {
182
+ if (!asyncRequestBody .contentLength ().isPresent ()) {
183
+ SdkClientException e = SdkClientException .create ("Content length must be present on the AsyncRequestBody" );
184
+ multipartUploadHelper .failRequestsElegantly (futures , e , uploadId , returnFuture , putObjectRequest );
185
+ return ;
186
+ }
187
+
188
+ Long currentPartSize = asyncRequestBody .contentLength ().get ();
189
+ if (currentPartNum > expectedNumParts ) {
190
+ SdkClientException exception = SdkClientException .create (String .format ("The number of parts divided is "
191
+ + "not equal to the expected number of "
192
+ + "parts. Expected: %d, Actual: %d" ,
193
+ expectedNumParts , currentPartNum ));
194
+ multipartUploadHelper .failRequestsElegantly (futures , exception , uploadId , returnFuture , putObjectRequest );
195
+ return ;
196
+ }
197
+
198
+ if (currentPartNum == expectedNumParts ) {
199
+ validateLastPartSize (currentPartSize );
200
+ return ;
201
+ }
202
+
203
+ if (currentPartSize != partSize ) {
204
+ SdkClientException e = SdkClientException .create (String .format ("Content length must be equal to the "
205
+ + "part size. Expected: %d, Actual: %d" ,
206
+ partSize ,
207
+ currentPartSize ));
208
+ multipartUploadHelper .failRequestsElegantly (futures , e , uploadId , returnFuture , putObjectRequest );
209
+ }
210
+ }
211
+
212
+ private void validateLastPartSize (Long currentPartSize ) {
213
+ long remainder = totalSize % partSize ;
214
+ long expectedLastPartSize = remainder == 0 ? partSize : remainder ;
215
+ if (currentPartSize != expectedLastPartSize ) {
216
+ SdkClientException exception =
217
+ SdkClientException .create ("Content length of the last part must be equal to the "
218
+ + "expected last part size. Expected: " + expectedLastPartSize
219
+ + ", Actual: " + currentPartSize );
220
+ multipartUploadHelper .failRequestsElegantly (futures , exception , uploadId , returnFuture , putObjectRequest );
221
+ }
222
+ }
223
+
182
224
private boolean shouldFailRequest () {
183
225
return failureActionInitiated .compareAndSet (false , true ) && !isPaused ;
184
226
}
@@ -187,6 +229,7 @@ private boolean shouldFailRequest() {
187
229
public void onError (Throwable t ) {
188
230
log .debug (() -> "Received onError " , t );
189
231
if (failureActionInitiated .compareAndSet (false , true )) {
232
+ isDone = true ;
190
233
multipartUploadHelper .failRequestsElegantly (futures , t , uploadId , returnFuture , putObjectRequest );
191
234
}
192
235
}
@@ -203,6 +246,7 @@ public void onComplete() {
203
246
private void completeMultipartUploadIfFinished (int requestsInFlight ) {
204
247
if (isDone && requestsInFlight == 0 && completedMultipartInitiated .compareAndSet (false , true )) {
205
248
CompletedPart [] parts ;
249
+
206
250
if (existingParts .isEmpty ()) {
207
251
parts =
208
252
IntStream .range (0 , completedParts .length ())
@@ -213,14 +257,14 @@ private void completeMultipartUploadIfFinished(int requestsInFlight) {
213
257
parts = mergeCompletedParts ();
214
258
}
215
259
completeMpuFuture = multipartUploadHelper .completeMultipartUpload (returnFuture , uploadId , parts , putObjectRequest ,
216
- contentLength );
260
+ totalSize );
217
261
}
218
262
}
219
263
220
264
private CompletedPart [] mergeCompletedParts () {
221
- CompletedPart [] merged = new CompletedPart [partCount ];
265
+ CompletedPart [] merged = new CompletedPart [expectedNumParts ];
222
266
int currPart = 1 ;
223
- while (currPart < partCount + 1 ) {
267
+ while (currPart < expectedNumParts + 1 ) {
224
268
CompletedPart completedPart = existingParts .containsKey (currPart ) ? existingParts .get (currPart ) :
225
269
completedParts .get (currPart - 1 );
226
270
merged [currPart - 1 ] = completedPart ;
0 commit comments