Skip to content

Commit 2edf756

Browse files
jansupolsenivam
authored andcommitted
Added test for JerseyChunkedInputStreamClose
Signed-off-by: jansupol <[email protected]>
1 parent 8185a47 commit 2edf756

File tree

4 files changed

+342
-3
lines changed

4 files changed

+342
-3
lines changed

connectors/netty-connector/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,25 @@
8181
</plugins>
8282
</build>
8383

84+
<profiles>
85+
<profile>
86+
<id>InaccessibleObjectException</id>
87+
<activation><jdk>[12,)</jdk></activation>
88+
<build>
89+
<plugins>
90+
<plugin>
91+
<groupId>org.apache.maven.plugins</groupId>
92+
<artifactId>maven-surefire-plugin</artifactId>
93+
<configuration>
94+
<argLine>
95+
--add-opens java.base/java.lang=ALL-UNNAMED
96+
--add-opens java.base/java.lang.reflect=ALL-UNNAMED
97+
</argLine>
98+
</configuration>
99+
</plugin>
100+
</plugins>
101+
</build>
102+
</profile>
103+
</profiles>
104+
84105
</project>

connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
435435
};
436436
ch.closeFuture().addListener(closeListener);
437437

438-
final NettyEntityWriter entityWriter = NettyEntityWriter.getInstance(jerseyRequest, ch);
438+
final NettyEntityWriter entityWriter = nettyEntityWriter(jerseyRequest, ch);
439439
switch (entityWriter.getType()) {
440440
case CHUNKED:
441441
HttpUtil.setTransferEncodingChunked(nettyRequest, true);
@@ -523,6 +523,10 @@ public void run() {
523523
}
524524
}
525525

526+
/* package */ NettyEntityWriter nettyEntityWriter(ClientRequest clientRequest, Channel channel) {
527+
return NettyEntityWriter.getInstance(clientRequest, channel);
528+
}
529+
526530
private SSLContext getSslContext(Client client, ClientRequest request) {
527531
Supplier<SSLContext> supplier = request.resolveProperty(ClientProperties.SSL_CONTEXT_SUPPLIER, Supplier.class);
528532
return supplier == null ? client.getSslContext() : supplier.get();

connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,15 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
101101

102102
@Override
103103
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
104+
try {
105+
return readChunk0(allocator);
106+
} catch (Exception e) {
107+
closeOnThrowable();
108+
throw e;
109+
}
110+
}
104111

112+
private ByteBuf readChunk0(ByteBufAllocator allocator) throws Exception {
105113
if (!open) {
106114
return null;
107115
}
@@ -143,6 +151,14 @@ public long progress() {
143151
return offset;
144152
}
145153

154+
private void closeOnThrowable() {
155+
try {
156+
close();
157+
} catch (Throwable t) {
158+
// do not throw other throwable
159+
}
160+
}
161+
146162
@Override
147163
public void close() throws IOException {
148164

@@ -208,12 +224,12 @@ private void write(Provider<ByteBuffer> bufferSupplier) throws IOException {
208224
try {
209225
boolean queued = queue.offer(bufferSupplier.get(), WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
210226
if (!queued) {
211-
close();
227+
closeOnThrowable();
212228
throw new IOException("Buffer overflow.");
213229
}
214230

215231
} catch (InterruptedException e) {
216-
close();
232+
closeOnThrowable();
217233
throw new IOException(e);
218234
}
219235
}
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
/*
2+
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* This Source Code may also be made available under the following Secondary
9+
* Licenses when the conditions for such availability set forth in the
10+
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11+
* version 2 with the GNU Classpath Exception, which is available at
12+
* https://www.gnu.org/software/classpath/license.html.
13+
*
14+
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15+
*/
16+
17+
package org.glassfish.jersey.netty.connector;
18+
19+
import io.netty.channel.Channel;
20+
import org.glassfish.jersey.client.ClientConfig;
21+
import org.glassfish.jersey.client.ClientProperties;
22+
import org.glassfish.jersey.client.ClientRequest;
23+
import org.glassfish.jersey.client.spi.Connector;
24+
import org.glassfish.jersey.client.spi.ConnectorProvider;
25+
import org.glassfish.jersey.netty.connector.internal.JerseyChunkedInput;
26+
import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;
27+
import org.glassfish.jersey.server.ResourceConfig;
28+
import org.glassfish.jersey.test.JerseyTest;
29+
import org.junit.jupiter.api.Assertions;
30+
import org.junit.jupiter.api.Test;
31+
32+
import javax.ws.rs.POST;
33+
import javax.ws.rs.Path;
34+
import javax.ws.rs.client.Client;
35+
import javax.ws.rs.client.ClientBuilder;
36+
import javax.ws.rs.client.Entity;
37+
import javax.ws.rs.client.Invocation;
38+
import javax.ws.rs.client.WebTarget;
39+
import javax.ws.rs.core.Application;
40+
import javax.ws.rs.core.Configuration;
41+
import javax.ws.rs.core.MediaType;
42+
import javax.ws.rs.core.MultivaluedHashMap;
43+
import javax.ws.rs.core.Response;
44+
import java.lang.reflect.Field;
45+
import java.lang.reflect.Method;
46+
import java.lang.reflect.Modifier;
47+
import java.lang.reflect.Proxy;
48+
import java.nio.ByteBuffer;
49+
import java.util.Arrays;
50+
import java.util.NoSuchElementException;
51+
import java.util.Objects;
52+
import java.util.concurrent.CompletableFuture;
53+
import java.util.concurrent.CompletionStage;
54+
import java.util.concurrent.ExecutionException;
55+
import java.util.concurrent.LinkedBlockingDeque;
56+
import java.util.concurrent.TimeUnit;
57+
import java.util.concurrent.atomic.AtomicInteger;
58+
import java.util.concurrent.atomic.AtomicReference;
59+
import java.util.logging.Level;
60+
import java.util.logging.Logger;
61+
62+
public class ChunkedInputWriteErrorSimulationTest extends JerseyTest {
63+
private static final String EXCEPTION_MSG = "BOGUS BUFFER OVERFLOW";
64+
private static final AtomicReference<Throwable> caught = new AtomicReference<>(null);
65+
66+
public static class ClientThread extends Thread {
67+
68+
public static AtomicInteger count = new AtomicInteger();
69+
public static String url;
70+
public static int nLoops;
71+
72+
private static Client client;
73+
74+
public static void main(DequeOffer offer, String[] args) throws InterruptedException {
75+
url = args[0];
76+
int nThreads = Integer.parseInt(args[1]);
77+
nLoops = Integer.parseInt(args[2]);
78+
initClient(offer);
79+
Thread[] threads = new Thread[nThreads];
80+
for (int i = 0; i < nThreads; i++) {
81+
threads[i] = new ClientThread();
82+
threads[i].start();
83+
}
84+
85+
for (int i = 0; i < nThreads; i++) {
86+
threads[i].join();
87+
}
88+
// System.out.println("Processed calls: " + count);
89+
}
90+
91+
private static void initClient(DequeOffer offer) {
92+
ClientConfig defaultConfig = new ClientConfig();
93+
defaultConfig.property(ClientProperties.CONNECT_TIMEOUT, 10 * 1000);
94+
defaultConfig.property(ClientProperties.READ_TIMEOUT, 10 * 1000);
95+
defaultConfig.connectorProvider(getJerseyChunkedInputModifiedNettyConnector(offer));
96+
client = ClientBuilder.newBuilder()
97+
.withConfig(defaultConfig)
98+
.build();
99+
}
100+
101+
public void doCall() {
102+
CompletableFuture<Response> cf = invokeResponse().toCompletableFuture()
103+
.whenComplete((rsp, t) -> {
104+
if (t != null) {
105+
// System.out.println(Thread.currentThread() + " async complete. Caught exception " + t);
106+
// t.printStackTrace();
107+
while (t.getCause() != null) {
108+
t = t.getCause();
109+
}
110+
caught.set(t);
111+
}
112+
})
113+
.handle((rsp, t) -> {
114+
if (rsp != null) {
115+
rsp.readEntity(String.class);
116+
} else {
117+
System.out.println(Thread.currentThread().getName() + " response is null");
118+
}
119+
return rsp;
120+
}).exceptionally(t -> {
121+
System.out.println("async complete. completed exceptionally " + t);
122+
throw new RuntimeException(t);
123+
});
124+
125+
try {
126+
cf.get();
127+
System.out.println("Done call " + count.incrementAndGet());
128+
} catch (InterruptedException | ExecutionException ex) {
129+
Logger.getLogger(ClientThread.class.getName()).log(Level.SEVERE, null, ex);
130+
}
131+
}
132+
133+
private static CompletionStage<Response> invokeResponse() {
134+
WebTarget target = client.target(url);
135+
MultivaluedHashMap hdrs = new MultivaluedHashMap<>();
136+
StringBuilder sb = new StringBuilder("{");
137+
for (int i = 0; i < 10000; i++) {
138+
sb.append("\"fname\":\"foo\", \"lname\":\"bar\"");
139+
}
140+
sb.append("}");
141+
String jsonPayload = sb.toString();
142+
Invocation.Builder builder = ((WebTarget) target).request().headers(hdrs);
143+
return builder.rx().method("POST", Entity.entity(jsonPayload, MediaType.APPLICATION_JSON_TYPE));
144+
}
145+
146+
@Override
147+
public void run() {
148+
for (int i = 0; i < nLoops; i++) {
149+
try {
150+
doCall();
151+
} catch (Throwable t) {
152+
throw new RuntimeException(t);
153+
}
154+
}
155+
}
156+
}
157+
158+
@Path("/console")
159+
public static class HangingEndpoint {
160+
@Path("/login")
161+
@POST
162+
public String post(String entity) {
163+
return "Welcome";
164+
}
165+
}
166+
167+
@Override
168+
protected Application configure() {
169+
return new ResourceConfig(HangingEndpoint.class);
170+
}
171+
172+
@Test
173+
public void testNoHangOnOfferInterrupt() throws InterruptedException {
174+
String path = getBaseUri() + "console/login";
175+
ClientThread.main(new InterruptedExceptionOffer(), new String[] {path, "5", "10"});
176+
Assertions.assertTrue(caught.get().getMessage().contains(EXCEPTION_MSG));
177+
}
178+
179+
@Test
180+
public void testNoHangOnPollInterrupt() throws InterruptedException {
181+
String path = getBaseUri() + "console/login";
182+
ClientThread.main(new DequePoll(), new String[] {path, "5", "10"});
183+
Assertions.assertNotNull(caught.get());
184+
}
185+
186+
@Test
187+
public void testNoHangOnOfferNoData() throws InterruptedException {
188+
String path = getBaseUri() + "console/login";
189+
ClientThread.main(new ReturnFalseOffer(), new String[] {path, "5", "10"});
190+
Assertions.assertTrue(caught.get().getMessage().contains("Buffer overflow")); //JerseyChunkedInput
191+
Thread.sleep(1_000L); // Sleep for the server to finish
192+
}
193+
194+
private interface DequeOffer {
195+
public boolean offer(ByteBuffer e, long timeout, TimeUnit unit) throws InterruptedException;
196+
}
197+
198+
private static class InterruptedExceptionOffer implements DequeOffer {
199+
private AtomicInteger ai = new AtomicInteger(0);
200+
201+
@Override
202+
public boolean offer(ByteBuffer e, long timeout, TimeUnit unit) throws InterruptedException {
203+
if ((ai.getAndIncrement() % 10) == 0) {
204+
throw new InterruptedException(EXCEPTION_MSG);
205+
}
206+
return true;
207+
}
208+
}
209+
210+
private static class ReturnFalseOffer implements DequeOffer {
211+
private AtomicInteger ai = new AtomicInteger(0);
212+
@Override
213+
public boolean offer(ByteBuffer e, long timeout, TimeUnit unit) throws InterruptedException {
214+
return !((ai.getAndIncrement() % 10) == 1);
215+
}
216+
}
217+
218+
private static class DequePoll extends InterruptedExceptionOffer {
219+
}
220+
221+
222+
private static ConnectorProvider getJerseyChunkedInputModifiedNettyConnector(DequeOffer offer) {
223+
return new ConnectorProvider() {
224+
@Override
225+
public Connector getConnector(Client client, Configuration runtimeConfig) {
226+
return new NettyConnector(client) {
227+
NettyEntityWriter nettyEntityWriter(ClientRequest clientRequest, Channel channel) {
228+
NettyEntityWriter wrapped = NettyEntityWriter.getInstance(clientRequest, channel);
229+
230+
JerseyChunkedInput chunkedInput = (JerseyChunkedInput) wrapped.getChunkedInput();
231+
try {
232+
Field field = JerseyChunkedInput.class.getDeclaredField("queue");
233+
field.setAccessible(true);
234+
235+
removeFinal(field);
236+
237+
field.set(chunkedInput, new LinkedBlockingDeque<ByteBuffer>() {
238+
@Override
239+
public boolean offer(ByteBuffer e, long timeout, TimeUnit unit) throws InterruptedException {
240+
if (!DequePoll.class.isInstance(offer) && !offer.offer(e, timeout, unit)) {
241+
return false;
242+
}
243+
return super.offer(e, timeout, unit);
244+
}
245+
246+
@Override
247+
public ByteBuffer poll(long timeout, TimeUnit unit) throws InterruptedException {
248+
if (DequePoll.class.isInstance(offer)) {
249+
offer.offer(null, timeout, unit);
250+
}
251+
return super.poll(timeout, unit);
252+
}
253+
});
254+
255+
} catch (Exception e) {
256+
throw new RuntimeException(e);
257+
}
258+
259+
NettyEntityWriter proxy = (NettyEntityWriter) Proxy.newProxyInstance(
260+
ConnectorProvider.class.getClassLoader(), new Class[]{NettyEntityWriter.class},
261+
(proxy1, method, args) -> {
262+
if (method.getName().equals("readChunk")) {
263+
try {
264+
return method.invoke(wrapped, args);
265+
} catch (RuntimeException e) {
266+
// consume
267+
}
268+
}
269+
return method.invoke(wrapped, args);
270+
});
271+
return proxy;
272+
}
273+
};
274+
}
275+
};
276+
}
277+
278+
public static void removeFinal(Field field) throws RuntimeException {
279+
try {
280+
Method[] classMethods = Class.class.getDeclaredMethods();
281+
Method declaredFieldMethod = Arrays
282+
.stream(classMethods).filter(x -> Objects.equals(x.getName(), "getDeclaredFields0"))
283+
.findAny().orElseThrow(() -> new NoSuchElementException("No value present"));
284+
declaredFieldMethod.setAccessible(true);
285+
Field[] declaredFieldsOfField = (Field[]) declaredFieldMethod.invoke(Field.class, false);
286+
Field modifiersField = Arrays
287+
.stream(declaredFieldsOfField).filter(x -> Objects.equals(x.getName(), "modifiers"))
288+
.findAny().orElseThrow(() -> new NoSuchElementException("No value present"));
289+
modifiersField.setAccessible(true);
290+
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
291+
} catch (RuntimeException re) {
292+
throw re;
293+
} catch (Exception e) {
294+
throw new RuntimeException(e);
295+
}
296+
}
297+
298+
}

0 commit comments

Comments
 (0)