Skip to content

Commit 450368b

Browse files
committed
change validation in futurn complete
1 parent fd674d0 commit 450368b

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloaderSubscriber.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ public class MultipartDownloaderSubscriber implements Subscriber<AsyncResponseTr
6161
*/
6262
private final AtomicInteger completedParts;
6363

64+
/**
65+
* The total number of getObject calls made. This tracks how many times we've actually called getObject.
66+
*/
67+
private final AtomicInteger getObjectCallCount;
68+
6469
/**
6570
* The subscription received from the publisher this subscriber subscribes to.
6671
*/
@@ -95,6 +100,7 @@ public MultipartDownloaderSubscriber(S3AsyncClient s3, GetObjectRequest getObjec
95100
this.s3 = s3;
96101
this.getObjectRequest = getObjectRequest;
97102
this.completedParts = new AtomicInteger(completedParts);
103+
this.getObjectCallCount = new AtomicInteger(completedParts);
98104
}
99105

100106
@Override
@@ -120,7 +126,6 @@ public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse
120126
synchronized (lock) {
121127
if (totalParts != null && nextPartToGet > totalParts) {
122128
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
123-
validatePartsCount(currentPart);
124129
subscription.cancel();
125130
return;
126131
}
@@ -129,6 +134,7 @@ public void onNext(AsyncResponseTransformer<GetObjectResponse, GetObjectResponse
129134
GetObjectRequest actualRequest = nextRequest(nextPartToGet);
130135
log.debug(() -> "Sending GetObjectRequest for next part with partNumber=" + nextPartToGet);
131136
CompletableFuture<GetObjectResponse> getObjectFuture = s3.getObject(actualRequest, asyncResponseTransformer);
137+
getObjectCallCount.incrementAndGet();
132138
getObjectFutures.add(getObjectFuture);
133139
getObjectFuture.whenComplete((response, error) -> {
134140
if (error != null) {
@@ -169,7 +175,6 @@ private void requestMoreIfNeeded(GetObjectResponse response) {
169175
if (totalParts != null && totalParts > 1 && totalComplete < totalParts) {
170176
subscription.request(1);
171177
} else {
172-
validatePartsCount(totalComplete);
173178
log.debug(() -> String.format("Completing multipart download after a total of %d parts downloaded.", totalParts));
174179
subscription.cancel();
175180
}
@@ -187,6 +192,7 @@ public void onError(Throwable t) {
187192

188193
@Override
189194
public void onComplete() {
195+
validatePartsCount();
190196
future.complete(null);
191197
}
192198

@@ -203,10 +209,11 @@ private GetObjectRequest nextRequest(int nextPartToGet) {
203209
});
204210
}
205211

206-
private void validatePartsCount(int currentGetCount) {
207-
if (totalParts != null && currentGetCount != totalParts) {
212+
private void validatePartsCount() {
213+
int actualGetCount = getObjectCallCount.get();
214+
if (totalParts != null && actualGetCount != totalParts) {
208215
String errorMessage = String.format("PartsCount validation failed. Expected %d, downloaded %d parts.", totalParts,
209-
currentGetCount);
216+
actualGetCount);
210217
subscription.cancel();
211218
SdkClientException exception = SdkClientException.create(errorMessage);
212219
onError(exception);

0 commit comments

Comments
 (0)