Skip to content

Commit 22b0a0b

Browse files
committed
Address comments pt2
1 parent 7f8803f commit 22b0a0b

File tree

2 files changed

+50
-13
lines changed

2 files changed

+50
-13
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,19 +324,19 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
324324

325325
@Override
326326
public void exceptionOccurred(Throwable error) {
327-
log.trace(() -> "calling exceptionOccurred on the upstream transformer");
328-
329327
if (partNumber == 1) {
328+
log.trace(() -> "calling exceptionOccurred on the upstream transformer");
330329
upstreamResponseTransformer.exceptionOccurred(error);
331330
}
332331

333-
// TODO - add comments explaining
332+
// Invoking publisherToUpstream.error() essentially fails the request immediately. We should only call this if
333+
// 1) The part number is greater than 1, since we want to retry errors on the first part OR 2) onStream() has
334+
// already been invoked and data has started to be written
334335
synchronized (cancelLock) {
335336
if (partNumber > 1 || onStreamCalled) {
336337
publisherToUpstream.error(error);
337338
}
338339
}
339-
340340
}
341341
}
342342

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
import static org.junit.jupiter.api.Assertions.assertNotNull;
3131
import static org.junit.jupiter.api.Assertions.assertThrows;
3232

33+
import com.github.tomakehurst.wiremock.http.Fault;
3334
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
3435
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
3536
import com.github.tomakehurst.wiremock.stubbing.Scenario;
3637
import java.net.URI;
38+
import java.nio.charset.StandardCharsets;
3739
import java.time.Duration;
3840
import java.util.ArrayList;
3941
import java.util.List;
@@ -92,10 +94,8 @@ public void setup(WireMockRuntimeInfo wm) {
9294
.build();
9395
}
9496

95-
// TODO - 1) update test names 2) add test for I/O error
96-
9797
@Test
98-
public void stub_200_only() {
98+
public void getObject_concurrentCallsReturn200_shouldSucceed() {
9999
List<CompletableFuture<ResponseBytes<GetObjectResponse>>> futures = new ArrayList<>();
100100

101101
int numRuns = 1000;
@@ -108,7 +108,7 @@ public void stub_200_only() {
108108
}
109109

110110
@Test
111-
public void stub_200s_one503_more200s() {
111+
public void getObject_single500WithinMany200s_shouldRetrySuccessfully() {
112112
List<CompletableFuture<ResponseBytes<GetObjectResponse>>> futures = new ArrayList<>();
113113

114114
int numRuns = 1000;
@@ -130,7 +130,7 @@ public void stub_200s_one503_more200s() {
130130
}
131131

132132
@Test
133-
public void stub_503_then_200_multipleTimes() {
133+
public void getObject_concurrent503s_shouldRetrySuccessfully() {
134134
List<CompletableFuture<ResponseBytes<GetObjectResponse>>> futures = new ArrayList<>();
135135

136136
int numRuns = 1000;
@@ -143,7 +143,7 @@ public void stub_503_then_200_multipleTimes() {
143143
}
144144

145145
@Test
146-
public void stub_503_only(WireMockRuntimeInfo wm) {
146+
public void getObject_503Response_shouldNotReuseInitialRequestId() {
147147
String firstRequestId = UUID.randomUUID().toString();
148148
String secondRequestId = UUID.randomUUID().toString();
149149

@@ -186,7 +186,7 @@ public void stub_503_only(WireMockRuntimeInfo wm) {
186186
}
187187

188188
@Test
189-
public void multipleParts_all_200() {
189+
public void multipartDownload_200Response_shouldSucceed() {
190190
int totalParts = 3;
191191
int partSize = 1024;
192192

@@ -237,7 +237,7 @@ public void multipleParts_all_200() {
237237
}
238238

239239
@Test
240-
public void multipleParts_503OnFirstPart_then_200s() {
240+
public void multipartDownload_503OnFirstPart_shouldRetrySuccessfully() {
241241
int totalParts = 3;
242242
int partSize = 1024;
243243

@@ -306,6 +306,43 @@ public void multipleParts_503OnFirstPart_then_200s() {
306306
verify(1, getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", BUCKET, KEY))));
307307
}
308308

309+
@Test
310+
public void getObject_iOError_shouldRetrySuccessfully() {
311+
String requestId = UUID.randomUUID().toString();
312+
313+
stubFor(any(anyUrl())
314+
.inScenario("io-error")
315+
.whenScenarioStateIs(Scenario.STARTED)
316+
.willReturn(aResponse()
317+
.withFault(Fault.CONNECTION_RESET_BY_PEER))
318+
.willSetStateTo("retry"));
319+
320+
stubFor(any(anyUrl())
321+
.inScenario("io-error")
322+
.whenScenarioStateIs("retry")
323+
.willReturn(aResponse()
324+
.withStatus(200)
325+
.withHeader("x-amz-request-id", requestId)
326+
.withBody("Hello World")));
327+
328+
ResponseBytes<GetObjectResponse> response = multipartClient.getObject(GetObjectRequest.builder()
329+
.bucket(BUCKET)
330+
.key(KEY)
331+
.build(),
332+
AsyncResponseTransformer.toBytes()).join();
333+
334+
assertArrayEquals("Hello World".getBytes(StandardCharsets.UTF_8), response.asByteArray());
335+
336+
verify(2, getRequestedFor(urlEqualTo("/" + BUCKET + "/" + KEY + "?partNumber=1")));
337+
338+
List<SdkHttpResponse> responses = capturingInterceptor.getResponses();
339+
String finalRequestId = responses.get(responses.size() - 1)
340+
.firstMatchingHeader("x-amz-request-id")
341+
.orElse(null);
342+
343+
assertEquals(requestId, finalRequestId);
344+
}
345+
309346
private CompletableFuture<ResponseBytes<GetObjectResponse>> mock200Response(S3AsyncClient s3Client, int runNumber) {
310347
String runId = runNumber + " success";
311348

@@ -331,7 +368,7 @@ private CompletableFuture<ResponseBytes<GetObjectResponse>> mockRetryableErrorTh
331368
.whenScenarioStateIs(Scenario.STARTED)
332369
.willReturn(aResponse()
333370
.withHeader("x-amz-request-id", String.valueOf(UUID.randomUUID()))
334-
.withStatus(503).withBody(ERROR_BODY)
371+
.withStatus(500).withBody(ERROR_BODY)
335372
)
336373
.willSetStateTo("SecondAttempt" + runId));
337374

0 commit comments

Comments
 (0)