Skip to content

Commit 265976d

Browse files
authored
Fix Apache5 HTTP client retry failures with non-resettable streams (#6154)
* Fix architecture test failures for apache5.x * Checkstyle issues * Update to use PoolingHttpClientConnectionManager class reference that is implementation of HttpClientConnectionManager * Fix stream reset failure in RepeatableInputStreamRequestEntity by storing content reference to avoid multiple ContentStreamProvider.newStream() calls that cause IOException when retrying requests with non-resettable streams * writeTo_ConcurrentWrites_HandlesCorrectly no longer needed since even Apache 4.x doesnot suports this
1 parent 0bdf878 commit 265976d

File tree

3 files changed

+363
-159
lines changed

3 files changed

+363
-159
lines changed

http-clients/apache-client/src/test/java/software/amazon/awssdk/http/apache/internal/RepeatableInputStreamRequestEntityTest.java

Lines changed: 160 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.http.apache.internal;
1717

1818
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
19+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
1920
import static org.junit.jupiter.api.Assertions.assertEquals;
2021
import static org.junit.jupiter.api.Assertions.assertFalse;
2122
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -35,12 +36,7 @@
3536
import java.io.InputStream;
3637
import java.io.InterruptedIOException;
3738
import java.net.URI;
38-
import java.util.ArrayList;
39-
import java.util.Collections;
40-
import java.util.List;
4139
import java.util.Random;
42-
import java.util.concurrent.CountDownLatch;
43-
import java.util.concurrent.TimeUnit;
4440
import java.util.concurrent.atomic.AtomicInteger;
4541
import org.junit.jupiter.api.BeforeEach;
4642
import org.junit.jupiter.api.DisplayName;
@@ -343,7 +339,9 @@ public int read() throws IOException {
343339
return -1;
344340
}
345341
hasBeenRead = true;
346-
return data[position++] & 0xFF;
342+
int i = data[position] & 0xFF;
343+
position++;
344+
return i;
347345
}
348346

349347
@Override
@@ -670,51 +668,6 @@ void constructor_WithoutContentType_HandlesGracefully() {
670668
assertEquals(100L, entity.getContentLength());
671669
}
672670

673-
@Test
674-
@DisplayName("Entity should handle concurrent write attempts")
675-
void writeTo_ConcurrentWrites_HandlesCorrectly() throws Exception {
676-
// Given
677-
String content = "Concurrent test content";
678-
ContentStreamProvider provider = () -> new ByteArrayInputStream(content.getBytes());
679-
SdkHttpRequest httpRequest = httpRequestBuilder.build();
680-
HttpExecuteRequest request = HttpExecuteRequest.builder()
681-
.request(httpRequest)
682-
.contentStreamProvider(provider)
683-
.build();
684-
685-
entity = new RepeatableInputStreamRequestEntity(request);
686-
687-
// Simulate concurrent writes
688-
int threadCount = 5;
689-
CountDownLatch latch = new CountDownLatch(threadCount);
690-
List<ByteArrayOutputStream> outputs = Collections.synchronizedList(new ArrayList<>());
691-
List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
692-
693-
for (int i = 0; i < threadCount; i++) {
694-
new Thread(() -> {
695-
try {
696-
ByteArrayOutputStream output = new ByteArrayOutputStream();
697-
entity.writeTo(output);
698-
outputs.add(output);
699-
} catch (Exception e) {
700-
exceptions.add(e);
701-
} finally {
702-
latch.countDown();
703-
}
704-
}).start();
705-
}
706-
707-
latch.await(5, TimeUnit.SECONDS);
708-
709-
// At least one should succeed, others may fail due to stream state
710-
assertFalse(outputs.isEmpty(), "At least one write should succeed");
711-
for (ByteArrayOutputStream output : outputs) {
712-
if (output.size() > 0) {
713-
assertEquals(content, output.toString());
714-
}
715-
}
716-
}
717-
718671
@Test
719672
@DisplayName("Entity should handle interrupted IO operations")
720673
void writeTo_InterruptedStream_ThrowsIOException() throws IOException {
@@ -790,5 +743,161 @@ void multipleOperations_StatePreservation_WorksCorrectly() throws IOException {
790743
assertEquals(contentLength1, contentLength2);
791744
assertEquals(contentLength2, contentLength3);
792745
}
746+
747+
@Test
748+
@DisplayName("markSupported should be be called everytime")
749+
void markSupported_NotCachedDuringConstruction() {
750+
// Given
751+
AtomicInteger markSupportedCalls = new AtomicInteger(0);
752+
InputStream trackingStream = new ByteArrayInputStream("test".getBytes()) {
753+
@Override
754+
public boolean markSupported() {
755+
markSupportedCalls.incrementAndGet();
756+
return true;
757+
}
758+
};
759+
760+
entity = createEntity(trackingStream);
761+
assertEquals(0, markSupportedCalls.get());
762+
763+
// Multiple isRepeatable calls trigger new markSupported calls
764+
assertTrue(entity.isRepeatable());
765+
assertTrue(entity.isRepeatable());
766+
assertEquals(2, markSupportedCalls.get());
767+
}
768+
769+
@Test
770+
@DisplayName("ContentStreamProvider.newStream() should only be called once")
771+
void contentStreamProvider_NewStreamCalledOnce() {
772+
AtomicInteger newStreamCalls = new AtomicInteger(0);
773+
ContentStreamProvider provider = () -> {
774+
if (newStreamCalls.incrementAndGet() > 1) {
775+
throw new RuntimeException("Could not create new stream: Already created");
776+
}
777+
return new ByteArrayInputStream("test".getBytes());
778+
};
779+
780+
entity = createEntity(provider);
781+
782+
assertEquals(1, newStreamCalls.get());
783+
assertTrue(entity.isRepeatable());
784+
assertFalse(entity.isChunked());
785+
}
786+
787+
@Test
788+
@DisplayName("writeTo should use cached markSupported for reset decision")
789+
void writeTo_UsesCachedMarkSupported() throws IOException {
790+
// Given - Stream that changes markSupported behavior
791+
AtomicInteger markSupportedCalls = new AtomicInteger(0);
792+
ByteArrayInputStream baseStream = new ByteArrayInputStream("test".getBytes());
793+
InputStream stream = new InputStream() {
794+
@Override
795+
public int read() throws IOException {
796+
return baseStream.read();
797+
}
798+
799+
@Override
800+
public boolean markSupported() {
801+
return markSupportedCalls.incrementAndGet() == 1; // Only first call returns true
802+
}
803+
804+
@Override
805+
public synchronized void reset() throws IOException {
806+
baseStream.reset();
807+
}
808+
};
809+
810+
entity = createEntity(stream);
811+
812+
// Write twice
813+
ByteArrayOutputStream output1 = new ByteArrayOutputStream();
814+
entity.writeTo(output1);
815+
816+
ByteArrayOutputStream output2 = new ByteArrayOutputStream();
817+
entity.writeTo(output2);
818+
819+
// Then - Both writes succeed using cached markSupported value
820+
assertEquals("test", output1.toString());
821+
assertEquals("test", output2.toString());
822+
assertEquals(1, markSupportedCalls.get());
823+
}
824+
825+
@Test
826+
@DisplayName("Non-repeatable stream should not attempt reset")
827+
void nonRepeatableStream_NoResetAttempt() throws IOException {
828+
// Given
829+
AtomicInteger resetCalls = new AtomicInteger(0);
830+
InputStream nonRepeatableStream = new ByteArrayInputStream("test".getBytes()) {
831+
@Override
832+
public boolean markSupported() {
833+
return false;
834+
}
835+
836+
@Override
837+
public synchronized void reset() {
838+
resetCalls.incrementAndGet();
839+
throw new RuntimeException("Reset not supported");
840+
}
841+
};
842+
843+
entity = createEntity(nonRepeatableStream);
844+
assertFalse(entity.isRepeatable());
845+
entity.writeTo(new ByteArrayOutputStream());
846+
entity.writeTo(new ByteArrayOutputStream());
847+
assertEquals(0, resetCalls.get());
848+
}
849+
850+
@Test
851+
@DisplayName("Stream should not be read during construction")
852+
void constructor_DoesNotReadStream() {
853+
// Given
854+
InputStream nonReadableStream = new InputStream() {
855+
@Override
856+
public int read() throws IOException {
857+
throw new IOException("Stream should not be read during construction");
858+
}
859+
860+
@Override
861+
public boolean markSupported() {
862+
return true;
863+
}
864+
};
865+
assertDoesNotThrow(() -> entity = createEntity(nonReadableStream));
866+
assertTrue(entity.isRepeatable());
867+
}
868+
869+
@Test
870+
@DisplayName("getContent should reuse existing stream")
871+
void getContent_ReusesExistingStream() throws IOException {
872+
InputStream originalStream = new ByteArrayInputStream("content".getBytes());
873+
entity = createEntity(originalStream);
874+
InputStream content1 = entity.getContent();
875+
InputStream content2 = entity.getContent();
876+
assertSame(content1, content2);
877+
}
878+
879+
@Test
880+
@DisplayName("Empty stream should be repeatable")
881+
void emptyStream_IsRepeatable() {
882+
// Given - No content provider
883+
HttpExecuteRequest request = HttpExecuteRequest.builder()
884+
.request(httpRequestBuilder.build())
885+
.build();
886+
entity = new RepeatableInputStreamRequestEntity(request);
887+
assertTrue(entity.isRepeatable());
888+
}
889+
890+
// Helper methods
891+
private RepeatableInputStreamRequestEntity createEntity(InputStream stream) {
892+
return createEntity(() -> stream);
893+
}
894+
895+
private RepeatableInputStreamRequestEntity createEntity(ContentStreamProvider provider) {
896+
HttpExecuteRequest request = HttpExecuteRequest.builder()
897+
.request(httpRequestBuilder.build())
898+
.contentStreamProvider(provider)
899+
.build();
900+
return new RepeatableInputStreamRequestEntity(request);
901+
}
793902
}
794903

http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/RepeatableInputStreamRequestEntity.java

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,11 @@ public class RepeatableInputStreamRequestEntity extends HttpEntityWrapper {
4949
/**
5050
* True if the "Transfer-Encoding:chunked" header is present
5151
*/
52-
private boolean isChunked;
53-
52+
private final boolean isChunked;
5453
/**
55-
* The underlying InputStreamEntity being delegated to
54+
* The underlying reference of content
5655
*/
57-
private InputStreamEntity inputStreamRequestEntity;
58-
59-
/**
60-
* The InputStream containing the content to write out
61-
*/
62-
private InputStream content;
63-
56+
private final InputStream content;
6457
/**
6558
* Record the original exception if we do attempt a retry, so that if the
6659
* retry fails, we can report the original exception. Otherwise, we're most
@@ -70,18 +63,36 @@ public class RepeatableInputStreamRequestEntity extends HttpEntityWrapper {
7063
private IOException originalException;
7164

7265
/**
73-
* Creates a new RepeatableInputStreamRequestEntity using the information
74-
* from the specified request. If the input stream containing the request's
75-
* contents is repeatable, then this RequestEntity will report as being
76-
* repeatable.
77-
*
78-
* @param request The details of the request being written out (content type,
79-
* content length, and content).
66+
* Helper class to capture both the created entity and the original content stream reference.
67+
* <p>
68+
* We store the content stream reference to avoid calling {@code getContent()} on the wrapped
69+
* entity multiple times, which could potentially create new stream instances or perform
70+
* unnecessary operations. This ensures we consistently use the same stream instance for
71+
* {@code markSupported()} checks and {@code reset()} operations throughout the entity's lifecycle.
8072
*/
73+
74+
private static class EntityCreationResult {
75+
final InputStreamEntity entity;
76+
final InputStream content;
77+
78+
EntityCreationResult(InputStreamEntity entity, InputStream content) {
79+
this.entity = entity;
80+
this.content = content;
81+
}
82+
}
83+
8184
public RepeatableInputStreamRequestEntity(HttpExecuteRequest request) {
82-
super(createInputStreamEntity(request));
85+
this(createInputStreamEntityWithMetadata(request), request);
86+
}
87+
88+
private RepeatableInputStreamRequestEntity(EntityCreationResult result, HttpExecuteRequest request) {
89+
super(result.entity);
90+
this.content = result.content;
91+
this.isChunked = request.httpRequest().matchingHeaders(TRANSFER_ENCODING).contains(CHUNKED);
92+
}
8393

84-
isChunked = request.httpRequest().matchingHeaders(TRANSFER_ENCODING).contains(CHUNKED);
94+
private static EntityCreationResult createInputStreamEntityWithMetadata(HttpExecuteRequest request) {
95+
InputStream content = getContent(request.contentStreamProvider());
8596

8697
/*
8798
* If we don't specify a content length when we instantiate our
@@ -93,35 +104,14 @@ public RepeatableInputStreamRequestEntity(HttpExecuteRequest request) {
93104
.map(RepeatableInputStreamRequestEntity::parseContentLength)
94105
.orElse(-1L);
95106

96-
content = getContent(request.contentStreamProvider());
97-
98-
// Create InputStreamEntity with proper ContentType handling for HttpClient 5.x
99-
ContentType contentType = request.httpRequest().firstMatchingHeader("Content-Type")
100-
.map(RepeatableInputStreamRequestEntity::parseContentType)
101-
.orElse(null);
102-
103-
if (contentLength >= 0) {
104-
inputStreamRequestEntity = new InputStreamEntity(content, contentLength, contentType);
105-
} else {
106-
inputStreamRequestEntity = new InputStreamEntity(content, contentType);
107-
}
108-
}
109-
110-
private static InputStreamEntity createInputStreamEntity(HttpExecuteRequest request) {
111-
InputStream content = getContent(request.contentStreamProvider());
112-
113-
long contentLength = request.httpRequest().firstMatchingHeader("Content-Length")
114-
.map(RepeatableInputStreamRequestEntity::parseContentLength)
115-
.orElse(-1L);
116-
117107
ContentType contentType = request.httpRequest().firstMatchingHeader("Content-Type")
118108
.map(RepeatableInputStreamRequestEntity::parseContentType)
119109
.orElse(null);
120110

121-
if (contentLength >= 0) {
122-
return new InputStreamEntity(content, contentLength, contentType);
123-
}
124-
return new InputStreamEntity(content, contentType);
111+
InputStreamEntity entity = contentLength >= 0
112+
? new InputStreamEntity(content, contentLength, contentType)
113+
: new InputStreamEntity(content, contentType);
114+
return new EntityCreationResult(entity, content);
125115
}
126116

127117
private static long parseContentLength(String contentLength) {
@@ -164,13 +154,9 @@ public boolean isChunked() {
164154
*/
165155
@Override
166156
public boolean isRepeatable() {
167-
boolean markSupported = content.markSupported();
168-
boolean entityRepeatable = inputStreamRequestEntity.isRepeatable();
169-
boolean result = markSupported || entityRepeatable;
170-
return result;
157+
return content.markSupported() || super.isRepeatable();
171158
}
172159

173-
174160
/**
175161
* Resets the underlying InputStream if this isn't the first attempt to
176162
* write out the request, otherwise simply delegates to
@@ -189,7 +175,7 @@ public void writeTo(OutputStream output) throws IOException {
189175
}
190176

191177
firstAttempt = false;
192-
inputStreamRequestEntity.writeTo(output);
178+
super.writeTo(output);
193179
} catch (IOException ioe) {
194180
if (originalException == null) {
195181
originalException = ioe;
@@ -200,12 +186,8 @@ public void writeTo(OutputStream output) throws IOException {
200186

201187
@Override
202188
public void close() throws IOException {
203-
try {
204-
if (content != null) {
205-
content.close();
206-
}
207-
} finally {
208-
super.close();
209-
}
189+
// The InputStreamEntity handles closing the stream when it's closed
190+
// We don't need to close our reference separately to avoid double-closing
191+
super.close();
210192
}
211193
}

0 commit comments

Comments
 (0)