Skip to content

Commit cba6791

Browse files
veithengregturn
authored andcommitted
SWS-707 - Enable streaming incoming responses
Original pull request: #85
1 parent 7d267af commit cba6791

File tree

3 files changed

+117
-13
lines changed

3 files changed

+117
-13
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<aspectj.version>1.8.12</aspectj.version>
8989
<axiom.version>1.2.20</axiom.version>
9090
<commons-httpclient.version>3.1</commons-httpclient.version>
91+
<commons-io.version>2.5</commons-io.version>
9192
<commons-logging.version>1.2</commons-logging.version>
9293
<dom4j.version>1.6.1</dom4j.version>
9394
<easymock.version>3.1</easymock.version>
@@ -123,6 +124,11 @@
123124
<artifactId>commons-logging</artifactId>
124125
<version>${commons-logging.version}</version>
125126
</dependency>
127+
<dependency>
128+
<groupId>commons-io</groupId>
129+
<artifactId>commons-io</artifactId>
130+
<version>${commons-io.version}</version>
131+
</dependency>
126132
<dependency>
127133
<groupId>org.springframework</groupId>
128134
<artifactId>spring-core</artifactId>

spring-ws-core/src/main/java/org/springframework/ws/transport/http/AbstractHttpSenderConnection.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616

1717
package org.springframework.ws.transport.http;
1818

19-
import java.io.ByteArrayInputStream;
2019
import java.io.IOException;
2120
import java.io.InputStream;
21+
import java.io.PushbackInputStream;
2222
import java.util.Iterator;
2323
import java.util.zip.GZIPInputStream;
2424
import javax.xml.namespace.QName;
2525

26-
import org.springframework.util.FileCopyUtils;
2726
import org.springframework.util.StringUtils;
2827
import org.springframework.ws.transport.AbstractSenderConnection;
2928
import org.springframework.ws.transport.FaultAwareWebServiceConnection;
@@ -33,13 +32,21 @@
3332
* Abstract base class for {@link WebServiceConnection} implementations that send request over HTTP.
3433
*
3534
* @author Arjen Poutsma
35+
* @author Andreas Veithen
3636
* @since 1.0.0
3737
*/
3838
public abstract class AbstractHttpSenderConnection extends AbstractSenderConnection
3939
implements FaultAwareWebServiceConnection {
4040

41-
/** Buffer used for reading the response, when the content length is invalid. */
42-
private byte[] responseBuffer;
41+
/**
42+
* Cached result of {@link #hasResponse}.
43+
*/
44+
private Boolean hasResponse;
45+
46+
/**
47+
* The raw response input stream to use instead of calling {@link #getRawResponseInputStream()}.
48+
*/
49+
private PushbackInputStream rawResponseInputStream;
4350

4451
@Override
4552
public final boolean hasError() throws IOException {
@@ -69,23 +76,29 @@ protected final boolean hasResponse() throws IOException {
6976
HttpTransportConstants.STATUS_NO_CONTENT == responseCode) {
7077
return false;
7178
}
79+
if (hasResponse != null) {
80+
return hasResponse;
81+
}
7282
long contentLength = getResponseContentLength();
7383
if (contentLength < 0) {
74-
if (responseBuffer == null) {
75-
responseBuffer = FileCopyUtils.copyToByteArray(getRawResponseInputStream());
84+
rawResponseInputStream = new PushbackInputStream(getRawResponseInputStream());
85+
int b = rawResponseInputStream.read();
86+
if (b == -1) {
87+
hasResponse = Boolean.FALSE;
88+
} else {
89+
hasResponse = Boolean.TRUE;
90+
rawResponseInputStream.unread(b);
7691
}
77-
contentLength = responseBuffer.length;
92+
} else {
93+
hasResponse = contentLength > 0;
7894
}
79-
return contentLength > 0;
95+
return hasResponse;
8096
}
8197

8298
@Override
8399
protected final InputStream getResponseInputStream() throws IOException {
84-
InputStream inputStream;
85-
if (responseBuffer != null) {
86-
inputStream = new ByteArrayInputStream(responseBuffer);
87-
}
88-
else {
100+
InputStream inputStream = rawResponseInputStream;
101+
if (inputStream == null) {
89102
inputStream = getRawResponseInputStream();
90103
}
91104
return isGzipResponse() ? new GZIPInputStream(inputStream) : inputStream;
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.ws.transport.http;
18+
19+
import java.io.ByteArrayInputStream;
20+
import java.io.InputStream;
21+
import java.util.Collections;
22+
import java.util.Random;
23+
24+
import org.apache.commons.io.IOUtils;
25+
import org.apache.commons.io.input.CountingInputStream;
26+
import org.easymock.Capture;
27+
import org.junit.Test;
28+
29+
import org.springframework.ws.WebServiceMessage;
30+
import org.springframework.ws.WebServiceMessageFactory;
31+
32+
import static org.easymock.EasyMock.*;
33+
import static org.junit.Assert.*;
34+
35+
/**
36+
* @author Andreas Veithen
37+
*/
38+
public class AbstractHttpSenderConnectionTest {
39+
40+
/**
41+
* Tests that {@link AbstractHttpSenderConnection} doesn't consume the response stream before
42+
* passing it to the message factory. This is a regression test for SWS-707.
43+
*
44+
* @param chunking
45+
* Specifies whether the test should simulate a response with chunking enabled.
46+
* @throws Exception
47+
*/
48+
private void testSupportsStreaming(boolean chunking) throws Exception {
49+
byte[] content = new byte[16*1024];
50+
new Random().nextBytes(content);
51+
CountingInputStream rawInputStream = new CountingInputStream(new ByteArrayInputStream(content));
52+
53+
AbstractHttpSenderConnection connection = createNiceMock(AbstractHttpSenderConnection.class);
54+
expect(connection.getResponseCode()).andReturn(200);
55+
// Simulate response with chunking enabled
56+
expect(connection.getResponseContentLength()).andReturn(chunking ? -1L : content.length);
57+
expect(connection.getRawResponseInputStream()).andReturn(rawInputStream);
58+
expect(connection.getResponseHeaders(anyObject())).andReturn(Collections.emptyIterator());
59+
60+
// Create a mock message factory to capture the InputStream passed to it
61+
WebServiceMessageFactory messageFactory = createNiceMock(WebServiceMessageFactory.class);
62+
WebServiceMessage message = createNiceMock(WebServiceMessage.class);
63+
Capture<InputStream> inputStreamCapture = new Capture<>();
64+
expect(messageFactory.createWebServiceMessage(capture(inputStreamCapture))).andReturn(message);
65+
66+
replay(connection, messageFactory, message);
67+
68+
connection.receive(messageFactory);
69+
70+
assertTrue("The raw input stream has been completely consumed",
71+
rawInputStream.getCount() < content.length);
72+
assertArrayEquals("Unexpected content received by the message factory",
73+
content, IOUtils.toByteArray(inputStreamCapture.getValue()));
74+
}
75+
76+
@Test
77+
public void testSupportsStreamingWithChunkingEnabled() throws Exception {
78+
testSupportsStreaming(true);
79+
}
80+
81+
@Test
82+
public void testSupportsStreamingWithChunkingDisabled() throws Exception {
83+
testSupportsStreaming(false);
84+
}
85+
}

0 commit comments

Comments
 (0)