Skip to content

Commit 49c7ef2

Browse files
committed
Update tests and refactor
1 parent 24aedc6 commit 49c7ef2

File tree

9 files changed

+254
-403
lines changed

9 files changed

+254
-403
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.ByteArrayOutputStream;
2121
import java.nio.ByteBuffer;
2222
import java.util.concurrent.CompletableFuture;
23-
import java.util.function.Consumer;
2423
import org.reactivestreams.Subscriber;
2524
import org.reactivestreams.Subscription;
2625
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -69,9 +68,11 @@ public void exceptionOccurred(Throwable throwable) {
6968

7069
@Override
7170
public SplitResult<ResponseT, ResponseBytes<ResponseT>> split(SplittingTransformerConfiguration splitConfig) {
71+
// TODO - splitConfig not used - support this or log warning/update javdocs
72+
7273
CompletableFuture<ResponseBytes<ResponseT>> future = new CompletableFuture<>();
7374
SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> transformer =
74-
new ByteArraySplittingTransformer(this, future);
75+
new ByteArraySplittingTransformer<>(this, future);
7576
return AsyncResponseTransformer.SplitResult.<ResponseT, ResponseBytes<ResponseT>>builder()
7677
.publisher(transformer)
7778
.resultFuture(future)
@@ -121,5 +122,4 @@ public void onComplete() {
121122
resultFuture.complete(baos.toByteArray());
122123
}
123124
}
124-
125125
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicReference;
2828
import org.reactivestreams.Subscriber;
2929
import org.reactivestreams.Subscription;
30+
import software.amazon.awssdk.annotations.SdkInternalApi;
3031
import software.amazon.awssdk.core.ResponseBytes;
3132
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3233
import software.amazon.awssdk.core.async.SdkPublisher;
@@ -36,13 +37,13 @@
3637
import software.amazon.awssdk.utils.async.DelegatingBufferingSubscriber;
3738
import software.amazon.awssdk.utils.async.SimplePublisher;
3839

40+
@SdkInternalApi
3941
public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> {
4042
private static final Logger log = Logger.loggerFor(ByteArraySplittingTransformer.class);
4143
private final AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> upstreamResponseTransformer;
42-
private final SimplePublisher<AsyncResponseTransformer<ResponseT, ResponseT>> simplePublisher = new SimplePublisher<>();
4344
private final CompletableFuture<ResponseBytes<ResponseT>> resultFuture;
4445
private Subscriber<? super AsyncResponseTransformer<ResponseT, ResponseT>> downstreamSubscriber;
45-
private final AtomicInteger partNumber = new AtomicInteger(0);
46+
private final AtomicInteger onNextSignalsSent = new AtomicInteger(0);
4647
private final AtomicReference<ResponseT> responseT = new AtomicReference<>();
4748

4849
/**
@@ -77,7 +78,8 @@ public class ByteArraySplittingTransformer<ResponseT> implements SdkPublisher<As
7778

7879
private final Map<Integer, ByteBuffer> buffers;
7980

80-
public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> upstreamResponseTransformer,
81+
public ByteArraySplittingTransformer(AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
82+
upstreamResponseTransformer,
8183
CompletableFuture<ResponseBytes<ResponseT>> resultFuture) {
8284
this.upstreamResponseTransformer = upstreamResponseTransformer;
8385
this.resultFuture = resultFuture;
@@ -141,7 +143,7 @@ private boolean doEmit() {
141143
}
142144
if (outstandingDemand.get() > 0) {
143145
demand = outstandingDemand.decrementAndGet();
144-
downstreamSubscriber.onNext(new IndividualTransformer(partNumber.incrementAndGet()));
146+
downstreamSubscriber.onNext(new IndividualTransformer(onNextSignalsSent.incrementAndGet()));
145147
}
146148
}
147149
return false;
@@ -195,15 +197,15 @@ private void handleSubscriptionCancel() {
195197
}
196198
}
197199

198-
private class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> {
199-
private final int partNumber;
200-
private ByteArrayAsyncResponseTransformer<ResponseT> delegate = new ByteArrayAsyncResponseTransformer<>();
200+
private final class IndividualTransformer implements AsyncResponseTransformer<ResponseT, ResponseT> {
201+
private final int onNextCount;
202+
private final ByteArrayAsyncResponseTransformer<ResponseT> delegate = new ByteArrayAsyncResponseTransformer<>();
201203

202204
private CompletableFuture<ResponseT> future;
203-
private List<CompletableFuture<ResponseBytes<ResponseT>>> delegatePrepareFutures = new ArrayList<>();
205+
private final List<CompletableFuture<ResponseBytes<ResponseT>>> delegatePrepareFutures = new ArrayList<>();
204206

205-
private IndividualTransformer(int partNumber) {
206-
this.partNumber = partNumber;
207+
private IndividualTransformer(int onNextCount) {
208+
this.onNextCount = onNextCount;
207209
}
208210

209211
@Override
@@ -213,7 +215,7 @@ public CompletableFuture<ResponseT> prepare() {
213215
CompletableFutureUtils.forwardExceptionTo(prepare, future);
214216
delegatePrepareFutures.add(prepare);
215217
return prepare.thenApply(responseTResponseBytes -> {
216-
buffers.put(partNumber, responseTResponseBytes.asByteBuffer());
218+
buffers.put(onNextCount, responseTResponseBytes.asByteBuffer());
217219
return responseTResponseBytes.response();
218220
});
219221
}

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectToBytesWiremockTest.java

Lines changed: 23 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
import static org.junit.jupiter.api.Assertions.assertEquals;
3030
import static org.junit.jupiter.api.Assertions.assertNotEquals;
3131
import static org.junit.jupiter.api.Assertions.assertNotNull;
32-
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartDownloadTestUtils.internalErrorBody;
33+
import static software.amazon.awssdk.services.s3.internal.multipart.utils.MultipartDownloadTestUtils.slowdownErrorBody;
3334

3435
import com.github.tomakehurst.wiremock.http.Fault;
3536
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
3637
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
3738
import com.github.tomakehurst.wiremock.stubbing.Scenario;
3839
import java.net.URI;
39-
import java.nio.ByteBuffer;
4040
import java.nio.charset.StandardCharsets;
4141
import java.time.Duration;
4242
import java.util.ArrayList;
@@ -46,19 +46,15 @@
4646
import java.util.concurrent.CompletableFuture;
4747
import java.util.concurrent.CompletionException;
4848
import java.util.concurrent.TimeUnit;
49-
import java.util.concurrent.atomic.AtomicBoolean;
5049
import org.junit.jupiter.api.BeforeAll;
5150
import org.junit.jupiter.api.BeforeEach;
5251
import org.junit.jupiter.api.Test;
5352
import org.junit.jupiter.api.Timeout;
54-
import org.reactivestreams.Subscriber;
55-
import org.reactivestreams.Subscription;
5653
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
5754
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
5855
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy;
5956
import software.amazon.awssdk.core.ResponseBytes;
6057
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
61-
import software.amazon.awssdk.core.async.SdkPublisher;
6258
import software.amazon.awssdk.core.exception.SdkClientException;
6359
import software.amazon.awssdk.core.interceptor.Context;
6460
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
@@ -213,23 +209,6 @@ public void getObject_5xxErrorResponses_shouldNotReuseInitialRequestId() {
213209
verify(0, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))));
214210
}
215211

216-
@Test
217-
public void getObject_ioException_shouldRetryAndFail() {
218-
String firstRequestId = UUID.randomUUID().toString();
219-
String secondRequestId = UUID.randomUUID().toString();
220-
221-
stubIoError(1);
222-
assertThatThrownBy(() -> multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY),
223-
AsyncResponseTransformer.toBytes()).join())
224-
.isInstanceOf(CompletionException.class)
225-
.hasCauseInstanceOf(SdkClientException.class).hasMessageContaining("The connection was closed")
226-
.hasStackTraceContaining("Error encountered during GetObjectRequest");
227-
228-
verify(MAX_ATTEMPTS, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))));
229-
verify(0, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))));
230-
verify(0, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))));
231-
}
232-
233212

234213
@Test
235214
public void multipartDownload_200Response_shouldSucceed() {
@@ -254,16 +233,14 @@ public void multipartDownload_200Response_shouldSucceed() {
254233
@Test
255234
public void multipartDownload_secondPartNonRetryableError_shouldFail() {
256235
stub200SuccessPart1();
257-
stubError(2, errorBody(String.valueOf(500), "Internal Error"));
236+
stubError(2, internalErrorBody());
258237

259238
CompletableFuture<ResponseBytes<GetObjectResponse>> future =
260239
multipartClient.getObject(GetObjectRequest.builder().bucket(BUCKET).key(KEY).build(),
261240
AsyncResponseTransformer.toBytes());
262241

263-
assertThatThrownBy(() -> future.join()).hasCauseInstanceOf(S3Exception.class)
264-
.hasStackTraceContaining("Error encountered "
265-
+ "during "
266-
+ "GetObjectRequest");
242+
assertThatThrownBy(future::join).hasCauseInstanceOf(S3Exception.class)
243+
.hasMessageContaining("We encountered an internal error. Please try again. (Service: S3, Status Code: 500");
267244

268245
verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))));
269246
verify(MAX_ATTEMPTS, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))));
@@ -369,7 +346,22 @@ public void multipartDownload_503OnFirstPartAndSecondPart_shouldRetrySuccessfull
369346
}
370347

371348
@Test
372-
public void getObject_iOError_shouldRetrySuccessfully() {
349+
public void getObject_ioExceptionOnly_shouldExhaustRetriesAndFail() {
350+
stubIoError(1);
351+
stub200SuccessPart2();
352+
stub200SuccessPart3();
353+
assertThatThrownBy(() -> multipartClient.getObject(b -> b.bucket(BUCKET).key(KEY),
354+
AsyncResponseTransformer.toBytes()).join())
355+
.isInstanceOf(CompletionException.class)
356+
.hasCauseInstanceOf(SdkClientException.class);
357+
358+
verify(MAX_ATTEMPTS, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", BUCKET, KEY))));
359+
verify(0, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=2", BUCKET, KEY))));
360+
verify(0, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))));
361+
}
362+
363+
@Test
364+
public void getObject_iOErrorThen200Response_shouldRetrySuccessfully() {
373365
String requestId = UUID.randomUUID().toString();
374366

375367
stubFor(any(anyUrl())
@@ -405,32 +397,16 @@ public void getObject_iOError_shouldRetrySuccessfully() {
405397
assertEquals(requestId, finalRequestId);
406398
}
407399

408-
private String errorBody(String errorCode, String errorMessage) {
409-
return "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
410-
+ "<Error>\n"
411-
+ " <Code>" + errorCode + "</Code>\n"
412-
+ " <Message>" + errorMessage + "</Message>\n"
413-
+ "</Error>";
414-
}
415-
416-
private String internalErrorBody() {
417-
return errorBody("InternalError", "We encountered an internal error. Please try again.");
418-
}
419-
420-
private String slowdownErrorBody() {
421-
return errorBody("SlowDown", "Please reduce your request rate.");
422-
}
423-
424400
private void stubError(int partNumber, String errorBody) {
425-
stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=" + partNumber, BUCKET, KEY)))
401+
stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=%d", BUCKET, KEY, partNumber)))
426402
.willReturn(aResponse()
427403
.withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID()))
428404
.withHeader("x-amz-mp-parts-count", String.valueOf(TOTAL_PARTS))
429405
.withStatus(500).withBody(errorBody)));
430406
}
431407

432408
private void stubIoError(int partNumber) {
433-
stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=" + partNumber, BUCKET, KEY)))
409+
stubFor(get(urlEqualTo(String.format("/%s/%s?partNumber=%d", BUCKET, KEY, partNumber)))
434410
.willReturn(aResponse()
435411
.withFault(Fault.CONNECTION_RESET_BY_PEER)));
436412
}
@@ -520,62 +496,4 @@ public void clear() {
520496
responses.clear();
521497
}
522498
}
523-
524-
/**
525-
* Custom AsyncResponseTransformer that simulates an error occurring after onStream() has been called
526-
*/
527-
private static final class StreamingErrorTransformer
528-
implements AsyncResponseTransformer<GetObjectResponse, ResponseBytes<GetObjectResponse>> {
529-
530-
private final CompletableFuture<ResponseBytes<GetObjectResponse>> future = new CompletableFuture<>();
531-
private final AtomicBoolean errorThrown = new AtomicBoolean();
532-
private final AtomicBoolean onStreamCalled = new AtomicBoolean();
533-
534-
@Override
535-
public CompletableFuture<ResponseBytes<GetObjectResponse>> prepare() {
536-
return future;
537-
}
538-
539-
@Override
540-
public void onResponse(GetObjectResponse response) {
541-
//
542-
}
543-
544-
@Override
545-
public void onStream(SdkPublisher<ByteBuffer> publisher) {
546-
onStreamCalled.set(true);
547-
publisher.subscribe(new Subscriber<ByteBuffer>() {
548-
private Subscription subscription;
549-
550-
@Override
551-
public void onSubscribe(Subscription s) {
552-
this.subscription = s;
553-
s.request(1);
554-
}
555-
556-
@Override
557-
public void onNext(ByteBuffer byteBuffer) {
558-
if (errorThrown.compareAndSet(false, true)) {
559-
future.completeExceptionally(new RuntimeException());
560-
subscription.cancel();
561-
}
562-
}
563-
564-
@Override
565-
public void onError(Throwable t) {
566-
future.completeExceptionally(t);
567-
}
568-
569-
@Override
570-
public void onComplete() {
571-
//
572-
}
573-
});
574-
}
575-
576-
@Override
577-
public void exceptionOccurred(Throwable throwable) {
578-
future.completeExceptionally(throwable);
579-
}
580-
}
581499
}

0 commit comments

Comments
 (0)