Skip to content

Commit 7f9e77f

Browse files
committed
Fix stream reset failure in RepeatableInputStreamRequestEntity by storing content reference to avoid multiple ContentStreamProvider.newStream() calls that cause IOException when retrying requests with non-resettable streams
1 parent 89debeb commit 7f9e77f

File tree

3 files changed

+360
-58
lines changed

3 files changed

+360
-58
lines changed

http-clients/apache-client/src/test/java/software/amazon/awssdk/http/apache/internal/RepeatableInputStreamRequestEntityTest.java

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.http.apache.internal;
1717

1818
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
19+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
1920
import static org.junit.jupiter.api.Assertions.assertEquals;
2021
import static org.junit.jupiter.api.Assertions.assertFalse;
2122
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -790,5 +791,161 @@ void multipleOperations_StatePreservation_WorksCorrectly() throws IOException {
790791
assertEquals(contentLength1, contentLength2);
791792
assertEquals(contentLength2, contentLength3);
792793
}
794+
795+
@Test
796+
@DisplayName("markSupported should be be called everytime")
797+
void markSupported_NotCachedDuringConstruction() {
798+
// Given
799+
AtomicInteger markSupportedCalls = new AtomicInteger(0);
800+
InputStream trackingStream = new ByteArrayInputStream("test".getBytes()) {
801+
@Override
802+
public boolean markSupported() {
803+
markSupportedCalls.incrementAndGet();
804+
return true;
805+
}
806+
};
807+
808+
entity = createEntity(trackingStream);
809+
assertEquals(0, markSupportedCalls.get());
810+
811+
// Multiple isRepeatable calls trigger new markSupported calls
812+
assertTrue(entity.isRepeatable());
813+
assertTrue(entity.isRepeatable());
814+
assertEquals(2, markSupportedCalls.get());
815+
}
816+
817+
@Test
818+
@DisplayName("ContentStreamProvider.newStream() should only be called once")
819+
void contentStreamProvider_NewStreamCalledOnce() {
820+
AtomicInteger newStreamCalls = new AtomicInteger(0);
821+
ContentStreamProvider provider = () -> {
822+
if (newStreamCalls.incrementAndGet() > 1) {
823+
throw new RuntimeException("Could not create new stream: Already created");
824+
}
825+
return new ByteArrayInputStream("test".getBytes());
826+
};
827+
828+
entity = createEntity(provider);
829+
830+
assertEquals(1, newStreamCalls.get());
831+
assertTrue(entity.isRepeatable());
832+
assertFalse(entity.isChunked());
833+
}
834+
835+
@Test
836+
@DisplayName("writeTo should use cached markSupported for reset decision")
837+
void writeTo_UsesCachedMarkSupported() throws IOException {
838+
// Given - Stream that changes markSupported behavior
839+
AtomicInteger markSupportedCalls = new AtomicInteger(0);
840+
ByteArrayInputStream baseStream = new ByteArrayInputStream("test".getBytes());
841+
InputStream stream = new InputStream() {
842+
@Override
843+
public int read() throws IOException {
844+
return baseStream.read();
845+
}
846+
847+
@Override
848+
public boolean markSupported() {
849+
return markSupportedCalls.incrementAndGet() == 1; // Only first call returns true
850+
}
851+
852+
@Override
853+
public synchronized void reset() throws IOException {
854+
baseStream.reset();
855+
}
856+
};
857+
858+
entity = createEntity(stream);
859+
860+
// Write twice
861+
ByteArrayOutputStream output1 = new ByteArrayOutputStream();
862+
entity.writeTo(output1);
863+
864+
ByteArrayOutputStream output2 = new ByteArrayOutputStream();
865+
entity.writeTo(output2);
866+
867+
// Then - Both writes succeed using cached markSupported value
868+
assertEquals("test", output1.toString());
869+
assertEquals("test", output2.toString());
870+
assertEquals(1, markSupportedCalls.get());
871+
}
872+
873+
@Test
874+
@DisplayName("Non-repeatable stream should not attempt reset")
875+
void nonRepeatableStream_NoResetAttempt() throws IOException {
876+
// Given
877+
AtomicInteger resetCalls = new AtomicInteger(0);
878+
InputStream nonRepeatableStream = new ByteArrayInputStream("test".getBytes()) {
879+
@Override
880+
public boolean markSupported() {
881+
return false;
882+
}
883+
884+
@Override
885+
public synchronized void reset() {
886+
resetCalls.incrementAndGet();
887+
throw new RuntimeException("Reset not supported");
888+
}
889+
};
890+
891+
entity = createEntity(nonRepeatableStream);
892+
assertFalse(entity.isRepeatable());
893+
entity.writeTo(new ByteArrayOutputStream());
894+
entity.writeTo(new ByteArrayOutputStream());
895+
assertEquals(0, resetCalls.get());
896+
}
897+
898+
@Test
899+
@DisplayName("Stream should not be read during construction")
900+
void constructor_DoesNotReadStream() {
901+
// Given
902+
InputStream nonReadableStream = new InputStream() {
903+
@Override
904+
public int read() throws IOException {
905+
throw new IOException("Stream should not be read during construction");
906+
}
907+
908+
@Override
909+
public boolean markSupported() {
910+
return true;
911+
}
912+
};
913+
assertDoesNotThrow(() -> entity = createEntity(nonReadableStream));
914+
assertTrue(entity.isRepeatable());
915+
}
916+
917+
@Test
918+
@DisplayName("getContent should reuse existing stream")
919+
void getContent_ReusesExistingStream() throws IOException {
920+
InputStream originalStream = new ByteArrayInputStream("content".getBytes());
921+
entity = createEntity(originalStream);
922+
InputStream content1 = entity.getContent();
923+
InputStream content2 = entity.getContent();
924+
assertSame(content1, content2);
925+
}
926+
927+
@Test
928+
@DisplayName("Empty stream should be repeatable")
929+
void emptyStream_IsRepeatable() {
930+
// Given - No content provider
931+
HttpExecuteRequest request = HttpExecuteRequest.builder()
932+
.request(httpRequestBuilder.build())
933+
.build();
934+
entity = new RepeatableInputStreamRequestEntity(request);
935+
assertTrue(entity.isRepeatable());
936+
}
937+
938+
// Helper methods
939+
private RepeatableInputStreamRequestEntity createEntity(InputStream stream) {
940+
return createEntity(() -> stream);
941+
}
942+
943+
private RepeatableInputStreamRequestEntity createEntity(ContentStreamProvider provider) {
944+
HttpExecuteRequest request = HttpExecuteRequest.builder()
945+
.request(httpRequestBuilder.build())
946+
.contentStreamProvider(provider)
947+
.build();
948+
return new RepeatableInputStreamRequestEntity(request);
949+
}
793950
}
794951

http-clients/apache5-client/src/main/java/software/amazon/awssdk/http/apache5/internal/RepeatableInputStreamRequestEntity.java

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,11 @@ public class RepeatableInputStreamRequestEntity extends HttpEntityWrapper {
4949
/**
5050
* True if the "Transfer-Encoding:chunked" header is present
5151
*/
52-
private boolean isChunked;
53-
52+
private final boolean isChunked;
5453
/**
55-
* The underlying InputStreamEntity being delegated to
54+
* The underlying reference of content
5655
*/
57-
private InputStreamEntity inputStreamRequestEntity;
58-
59-
/**
60-
* The InputStream containing the content to write out
61-
*/
62-
private InputStream content;
63-
56+
private final InputStream content;
6457
/**
6558
* Record the original exception if we do attempt a retry, so that if the
6659
* retry fails, we can report the original exception. Otherwise, we're most
@@ -70,18 +63,36 @@ public class RepeatableInputStreamRequestEntity extends HttpEntityWrapper {
7063
private IOException originalException;
7164

7265
/**
73-
* Creates a new RepeatableInputStreamRequestEntity using the information
74-
* from the specified request. If the input stream containing the request's
75-
* contents is repeatable, then this RequestEntity will report as being
76-
* repeatable.
77-
*
78-
* @param request The details of the request being written out (content type,
79-
* content length, and content).
66+
* Helper class to capture both the created entity and the original content stream reference.
67+
* <p>
68+
* We store the content stream reference to avoid calling {@code getContent()} on the wrapped
69+
* entity multiple times, which could potentially create new stream instances or perform
70+
* unnecessary operations. This ensures we consistently use the same stream instance for
71+
* {@code markSupported()} checks and {@code reset()} operations throughout the entity's lifecycle.
8072
*/
73+
74+
private static class EntityCreationResult {
75+
final InputStreamEntity entity;
76+
final InputStream content;
77+
78+
EntityCreationResult(InputStreamEntity entity, InputStream content) {
79+
this.entity = entity;
80+
this.content = content;
81+
}
82+
}
83+
8184
public RepeatableInputStreamRequestEntity(HttpExecuteRequest request) {
82-
super(createInputStreamEntity(request));
85+
this(createInputStreamEntityWithMetadata(request), request);
86+
}
87+
88+
private RepeatableInputStreamRequestEntity(EntityCreationResult result, HttpExecuteRequest request) {
89+
super(result.entity);
90+
this.content = result.content;
91+
this.isChunked = request.httpRequest().matchingHeaders(TRANSFER_ENCODING).contains(CHUNKED);
92+
}
8393

84-
isChunked = request.httpRequest().matchingHeaders(TRANSFER_ENCODING).contains(CHUNKED);
94+
private static EntityCreationResult createInputStreamEntityWithMetadata(HttpExecuteRequest request) {
95+
InputStream content = getContent(request.contentStreamProvider());
8596

8697
/*
8798
* If we don't specify a content length when we instantiate our
@@ -93,35 +104,14 @@ public RepeatableInputStreamRequestEntity(HttpExecuteRequest request) {
93104
.map(RepeatableInputStreamRequestEntity::parseContentLength)
94105
.orElse(-1L);
95106

96-
content = getContent(request.contentStreamProvider());
97-
98-
// Create InputStreamEntity with proper ContentType handling for HttpClient 5.x
99-
ContentType contentType = request.httpRequest().firstMatchingHeader("Content-Type")
100-
.map(RepeatableInputStreamRequestEntity::parseContentType)
101-
.orElse(null);
102-
103-
if (contentLength >= 0) {
104-
inputStreamRequestEntity = new InputStreamEntity(content, contentLength, contentType);
105-
} else {
106-
inputStreamRequestEntity = new InputStreamEntity(content, contentType);
107-
}
108-
}
109-
110-
private static InputStreamEntity createInputStreamEntity(HttpExecuteRequest request) {
111-
InputStream content = getContent(request.contentStreamProvider());
112-
113-
long contentLength = request.httpRequest().firstMatchingHeader("Content-Length")
114-
.map(RepeatableInputStreamRequestEntity::parseContentLength)
115-
.orElse(-1L);
116-
117107
ContentType contentType = request.httpRequest().firstMatchingHeader("Content-Type")
118108
.map(RepeatableInputStreamRequestEntity::parseContentType)
119109
.orElse(null);
120110

121-
if (contentLength >= 0) {
122-
return new InputStreamEntity(content, contentLength, contentType);
123-
}
124-
return new InputStreamEntity(content, contentType);
111+
InputStreamEntity entity = contentLength >= 0
112+
? new InputStreamEntity(content, contentLength, contentType)
113+
: new InputStreamEntity(content, contentType);
114+
return new EntityCreationResult(entity, content);
125115
}
126116

127117
private static long parseContentLength(String contentLength) {
@@ -164,13 +154,9 @@ public boolean isChunked() {
164154
*/
165155
@Override
166156
public boolean isRepeatable() {
167-
boolean markSupported = content.markSupported();
168-
boolean entityRepeatable = inputStreamRequestEntity.isRepeatable();
169-
boolean result = markSupported || entityRepeatable;
170-
return result;
157+
return content.markSupported() || super.isRepeatable();
171158
}
172159

173-
174160
/**
175161
* Resets the underlying InputStream if this isn't the first attempt to
176162
* write out the request, otherwise simply delegates to
@@ -189,7 +175,7 @@ public void writeTo(OutputStream output) throws IOException {
189175
}
190176

191177
firstAttempt = false;
192-
inputStreamRequestEntity.writeTo(output);
178+
super.writeTo(output);
193179
} catch (IOException ioe) {
194180
if (originalException == null) {
195181
originalException = ioe;
@@ -200,12 +186,8 @@ public void writeTo(OutputStream output) throws IOException {
200186

201187
@Override
202188
public void close() throws IOException {
203-
try {
204-
if (content != null) {
205-
content.close();
206-
}
207-
} finally {
208-
super.close();
209-
}
189+
// The InputStreamEntity handles closing the stream when it's closed
190+
// We don't need to close our reference separately to avoid double-closing
191+
super.close();
210192
}
211193
}

0 commit comments

Comments
 (0)