Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit 05108f4

Browse files
Marek PotočiarGerrit Code Review
authored andcommitted
Merge "JERSEY-2705: Input stream entity interruption in chunked mode issue."
2 parents cf3c6a5 + b1e4873 commit 05108f4

File tree

1 file changed

+295
-0
lines changed

1 file changed

+295
-0
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*
2+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3+
*
4+
* Copyright (c) 2014 Oracle and/or its affiliates. All rights reserved.
5+
*
6+
* The contents of this file are subject to the terms of either the GNU
7+
* General Public License Version 2 only ("GPL") or the Common Development
8+
* and Distribution License("CDDL") (collectively, the "License"). You
9+
* may not use this file except in compliance with the License. You can
10+
* obtain a copy of the License at
11+
* http://glassfish.java.net/public/CDDL+GPL_1_1.html
12+
* or packager/legal/LICENSE.txt. See the License for the specific
13+
* language governing permissions and limitations under the License.
14+
*
15+
* When distributing the software, include this License Header Notice in each
16+
* file and include the License file at packager/legal/LICENSE.txt.
17+
*
18+
* GPL Classpath Exception:
19+
* Oracle designates this particular file as subject to the "Classpath"
20+
* exception as provided by Oracle in the GPL Version 2 section of the License
21+
* file that accompanied this code.
22+
*
23+
* Modifications:
24+
* If applicable, add the following below the License Header, with the fields
25+
* enclosed by brackets [] replaced by your own identifying information:
26+
* "Portions Copyright [year] [name of copyright owner]"
27+
*
28+
* Contributor(s):
29+
* If you wish your version of this file to be governed by only the CDDL or
30+
* only the GPL Version 2, indicate your decision by adding "[Contributor]
31+
* elects to include this software in this distribution under the [CDDL or GPL
32+
* Version 2] license." If you don't indicate a single choice of license, a
33+
* recipient has the option to distribute your version of this file under
34+
* either the CDDL, the GPL Version 2 or to extend the choice of license to
35+
* its licensees as provided above. However, if you add GPL Version 2 code
36+
* and therefore, elected the GPL Version 2 license, then the option applies
37+
* only if the new code is made subject to such option by the copyright
38+
* holder.
39+
*/
40+
package org.glassfish.jersey.tests.e2e.client;
41+
42+
import java.io.IOException;
43+
import java.io.InputStream;
44+
import java.io.OutputStream;
45+
import java.net.HttpURLConnection;
46+
import java.net.URL;
47+
import java.util.Arrays;
48+
import java.util.concurrent.ConcurrentHashMap;
49+
import java.util.concurrent.ConcurrentMap;
50+
import java.util.concurrent.ExecutionException;
51+
import java.util.concurrent.TimeUnit;
52+
import java.util.concurrent.TimeoutException;
53+
import java.util.concurrent.atomic.AtomicInteger;
54+
import java.util.logging.Level;
55+
import java.util.logging.Logger;
56+
57+
import javax.ws.rs.GET;
58+
import javax.ws.rs.InternalServerErrorException;
59+
import javax.ws.rs.POST;
60+
import javax.ws.rs.Path;
61+
import javax.ws.rs.ProcessingException;
62+
import javax.ws.rs.QueryParam;
63+
import javax.ws.rs.client.Entity;
64+
import javax.ws.rs.core.Application;
65+
import javax.ws.rs.core.MediaType;
66+
import javax.ws.rs.core.Response;
67+
import javax.ws.rs.core.UriBuilder;
68+
69+
import org.glassfish.jersey.client.ClientConfig;
70+
import org.glassfish.jersey.client.ClientProperties;
71+
import org.glassfish.jersey.client.RequestEntityProcessing;
72+
import org.glassfish.jersey.message.internal.ReaderWriter;
73+
import org.glassfish.jersey.server.ResourceConfig;
74+
import org.glassfish.jersey.test.JerseyTest;
75+
76+
import org.junit.Assert;
77+
import org.junit.Ignore;
78+
import org.junit.Test;
79+
import static org.junit.Assert.assertEquals;
80+
import static org.junit.Assert.assertFalse;
81+
import static org.junit.Assert.assertTrue;
82+
83+
import com.google.common.util.concurrent.SettableFuture;
84+
85+
/**
86+
* Reproducer for JERSEY-2705. Client side entity InputStream exception
87+
* in chunked mode should not lead to the same behavior on the server side,
88+
* as if no exception occurred at all.
89+
*
90+
* @author Jakub Podlesak (jakub.podlesak at oracle.com)
91+
* @author Marek Potociar (marek.potociar at oracle.com)
92+
*/
93+
public class ChunkedInputStreamClosedPrematurelyTest extends JerseyTest {
94+
private static final Logger LOGGER = Logger.getLogger(ChunkedInputStreamClosedPrematurelyTest.class.getName());
95+
private static final Exception NO_EXCEPTION = new Exception("No exception.");
96+
97+
private static final AtomicInteger NEXT_REQ_ID = new AtomicInteger(0);
98+
private static final String REQ_ID_PARAM_NAME = "test-req-id";
99+
private static final int BYTES_TO_SEND = 1024 * 1024 + 13;
100+
101+
@Path("/test")
102+
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "JavaDoc"})
103+
public static class TestResource {
104+
private final static ConcurrentMap<String, SettableFuture<Exception>> REQUEST_MAP = new ConcurrentHashMap<>();
105+
106+
@QueryParam(REQ_ID_PARAM_NAME)
107+
private String reqId;
108+
109+
@POST
110+
public String post(InputStream is) {
111+
final byte[] buffer = new byte[4096];
112+
int readTotal = 0;
113+
114+
Exception thrown = NO_EXCEPTION;
115+
try {
116+
int read;
117+
while ((read = is.read(buffer)) > -1) {
118+
readTotal += read;
119+
}
120+
} catch (Exception ex) {
121+
thrown = ex;
122+
}
123+
124+
if (!getFutureFor(reqId).set(thrown)) {
125+
LOGGER.log(Level.WARNING,
126+
"Unable to set stream processing exception into the settable future instance for request id " + reqId,
127+
thrown);
128+
}
129+
130+
return Integer.toString(readTotal);
131+
}
132+
133+
@Path("/requestWasMade")
134+
@GET
135+
public Boolean getRequestWasMade() {
136+
// add a new future for the request if not there yet to avoid race conditions with POST processing
137+
final SettableFuture<Exception> esf = getFutureFor(reqId);
138+
try {
139+
// wait for up to three second for a request to be made;
140+
// there is always a value, if set...
141+
return esf.get(3, TimeUnit.SECONDS) != null;
142+
} catch (InterruptedException | TimeoutException | ExecutionException e) {
143+
throw new InternalServerErrorException("Post request processing has timed out for request id " + reqId, e);
144+
}
145+
}
146+
147+
@Path("/requestCausedException")
148+
@GET
149+
public Boolean getRequestCausedException() {
150+
final SettableFuture<Exception> esf = getFutureFor(reqId);
151+
try {
152+
return esf.get(3, TimeUnit.SECONDS) != NO_EXCEPTION;
153+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
154+
throw new InternalServerErrorException("Post request processing has timed out for request id " + reqId, e);
155+
}
156+
}
157+
158+
private SettableFuture<Exception> getFutureFor(String key) {
159+
final SettableFuture<Exception> esf = SettableFuture.create();
160+
final SettableFuture<Exception> oldEsf = REQUEST_MAP.putIfAbsent(key, esf);
161+
return (oldEsf != null) ? oldEsf : esf;
162+
}
163+
}
164+
165+
@Override
166+
protected Application configure() {
167+
return new ResourceConfig(TestResource.class);
168+
}
169+
170+
@Override
171+
protected void configureClient(ClientConfig config) {
172+
config.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED);
173+
config.property(ClientProperties.CHUNKED_ENCODING_SIZE, 7);
174+
}
175+
176+
/**
177+
* A sanity test to check the normal use case is working as expected.
178+
*/
179+
@Test
180+
public void testUninterrupted() {
181+
final String testReqId = nextRequestId("testUninterrupted");
182+
183+
Response testResponse = target("test").queryParam(REQ_ID_PARAM_NAME, testReqId)
184+
.request().post(Entity.entity("0123456789ABCDEF", MediaType.APPLICATION_OCTET_STREAM));
185+
assertEquals("Unexpected response status code.", 200, testResponse.getStatus());
186+
assertEquals("Unexpected response entity.", "16", testResponse.readEntity(String.class));
187+
188+
assertTrue("POST request " + testReqId + " has not reached the server.",
189+
target("test").path("requestWasMade").queryParam(REQ_ID_PARAM_NAME, testReqId)
190+
.request().get(Boolean.class));
191+
assertFalse("POST request " + testReqId + " has caused an unexpected exception on the server.",
192+
target("test").path("requestCausedException").queryParam(REQ_ID_PARAM_NAME, testReqId)
193+
.request().get(Boolean.class));
194+
}
195+
196+
/**
197+
* This test simulates how Jersey Client should behave after JERSEY-2705 gets fixed.
198+
*
199+
* @throws Exception in case the test fails to execute.
200+
*/
201+
@Test
202+
public void testInterruptedJerseyHttpUrlConnection() throws Exception {
203+
204+
final String testReqId = nextRequestId("testInterruptedJerseyHttpUrlConnection");
205+
206+
URL postUrl = UriBuilder.fromUri(getBaseUri()).path("test").queryParam(REQ_ID_PARAM_NAME, testReqId).build().toURL();
207+
final HttpURLConnection connection = (HttpURLConnection) postUrl.openConnection();
208+
209+
try {
210+
connection.setRequestMethod("POST");
211+
connection.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM);
212+
connection.setDoOutput(true);
213+
connection.setChunkedStreamingMode(1024);
214+
OutputStream entityStream = connection.getOutputStream();
215+
ReaderWriter.writeTo(new ExceptionThrowingInputStream(BYTES_TO_SEND), entityStream);
216+
Assert.fail("Expected ProcessingException has not been thrown.");
217+
} catch (IOException expected) {
218+
// so far so good
219+
} finally {
220+
connection.disconnect();
221+
}
222+
// we should make it to the server, but there the exceptional behaviour should get noticed
223+
assertTrue("POST request " + testReqId + " has not reached the server.",
224+
target("test").path("requestWasMade").queryParam(REQ_ID_PARAM_NAME, testReqId).request().get(Boolean.class));
225+
assertTrue("POST request " + testReqId + " did not cause an expected exception on the server.",
226+
target("test").path("requestCausedException").queryParam(REQ_ID_PARAM_NAME, testReqId)
227+
.request().get(Boolean.class));
228+
}
229+
230+
/**
231+
* This test reproduces the Jersey Client behavior reported in JERSEY-2705.
232+
*/
233+
@Ignore
234+
@Test
235+
public void testInterruptedJerseyClient() {
236+
final String testReqId = nextRequestId("testInterruptedJerseyClient");
237+
238+
try {
239+
target("test").queryParam(REQ_ID_PARAM_NAME, testReqId).request()
240+
.post(Entity.entity(new ExceptionThrowingInputStream(BYTES_TO_SEND), MediaType.APPLICATION_OCTET_STREAM));
241+
Assert.fail("Expected ProcessingException has not been thrown.");
242+
} catch (ProcessingException expected) {
243+
// so far so good
244+
}
245+
// we should make it to the server, but there the exceptional behaviour should get noticed
246+
assertTrue("POST request " + testReqId + " has not reached the server.",
247+
target("test").path("requestWasMade").queryParam(REQ_ID_PARAM_NAME, testReqId).request().get(Boolean.class));
248+
assertTrue("POST request " + testReqId + " did not cause an expected exception on the server.",
249+
target("test").path("requestCausedException").queryParam(REQ_ID_PARAM_NAME, testReqId)
250+
.request().get(Boolean.class));
251+
}
252+
253+
private static String nextRequestId(String testMethodName) {
254+
return String.format(testMethodName + "-%03d", NEXT_REQ_ID.getAndIncrement());
255+
}
256+
257+
/**
258+
* InputStream implementation that allows "reading" as many bytes as specified by threshold constructor parameter.
259+
* Throws an IOException if read operation is attempted after the threshold is exceeded.
260+
*/
261+
private class ExceptionThrowingInputStream extends InputStream {
262+
263+
private final int threshold;
264+
private int offset = 0;
265+
266+
/**
267+
* Get me a new stream that throws exception.
268+
*
269+
* @param threshold this number of bytes will be read all right
270+
*/
271+
public ExceptionThrowingInputStream(int threshold) {
272+
this.threshold = threshold;
273+
}
274+
275+
@Override
276+
public int read() throws IOException {
277+
if (offset++ < threshold) {
278+
return 'A';
279+
} else {
280+
throw new IOException("stream closed");
281+
}
282+
}
283+
284+
@Override
285+
public int read(byte[] b, int off, int len) throws IOException {
286+
offset += len;
287+
if (offset < threshold) {
288+
Arrays.fill(b, off, off + len, (byte) 'A');
289+
return len;
290+
} else {
291+
throw new IOException("Stream closed");
292+
}
293+
}
294+
}
295+
}

0 commit comments

Comments
 (0)