Skip to content

Commit 382f69e

Browse files
committed
Fix race condition on Exception in Netty Connector
Signed-off-by: jansupol <[email protected]>
1 parent b128e9c commit 382f69e

File tree

3 files changed

+188
-4
lines changed

3 files changed

+188
-4
lines changed

connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/NettyConnector.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,14 @@ public void run() {
491491
contentLengthSet.countDown();
492492
}
493493

494-
} catch (IOException e) {
494+
} catch (Exception e) {
495+
if (entityWriter.getChunkedInput() != null) {
496+
try {
497+
entityWriter.getChunkedInput().close();
498+
} catch (Exception ex) {
499+
// Ignore ex in favor of e
500+
}
501+
}
495502
responseDone.completeExceptionally(e);
496503
}
497504
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* This Source Code may also be made available under the following Secondary
9+
* Licenses when the conditions for such availability set forth in the
10+
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11+
* version 2 with the GNU Classpath Exception, which is available at
12+
* https://www.gnu.org/software/classpath/license.html.
13+
*
14+
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15+
*/
16+
17+
package org.glassfish.jersey.netty.connector;
18+
19+
import io.netty.channel.Channel;
20+
import io.netty.handler.stream.ChunkedInput;
21+
import org.glassfish.jersey.client.ClientConfig;
22+
import org.glassfish.jersey.client.ClientProperties;
23+
import org.glassfish.jersey.client.ClientRequest;
24+
import org.glassfish.jersey.client.spi.Connector;
25+
import org.glassfish.jersey.client.spi.ConnectorProvider;
26+
import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;
27+
import org.glassfish.jersey.server.ResourceConfig;
28+
import org.glassfish.jersey.test.JerseyTest;
29+
import org.junit.jupiter.api.Assertions;
30+
import org.junit.jupiter.api.Test;
31+
32+
import javax.ws.rs.ProcessingException;
33+
import javax.ws.rs.WebApplicationException;
34+
import javax.ws.rs.client.Client;
35+
import javax.ws.rs.client.ClientBuilder;
36+
import javax.ws.rs.client.Entity;
37+
import javax.ws.rs.core.Application;
38+
import javax.ws.rs.core.Configuration;
39+
import javax.ws.rs.core.MediaType;
40+
import javax.ws.rs.core.MultivaluedMap;
41+
import javax.ws.rs.core.Response;
42+
import javax.ws.rs.ext.MessageBodyWriter;
43+
import java.io.IOException;
44+
import java.io.OutputStream;
45+
import java.lang.annotation.Annotation;
46+
import java.lang.reflect.Type;
47+
import java.util.concurrent.CountDownLatch;
48+
import java.util.concurrent.atomic.AtomicReference;
49+
50+
/**
51+
* Bug 5837 reproducer
52+
*/
53+
public class ChunkedInputClosedOnErrorTest extends JerseyTest {
54+
55+
private static Client initClient(ConnectorProvider provider) {
56+
ClientConfig defaultConfig = new ClientConfig();
57+
defaultConfig.property(ClientProperties.CONNECT_TIMEOUT, 10 * 1000);
58+
defaultConfig.property(ClientProperties.READ_TIMEOUT, 10 * 1000);
59+
defaultConfig.connectorProvider(provider);
60+
Client client = ClientBuilder.newBuilder()
61+
.withConfig(defaultConfig)
62+
.build();
63+
return client;
64+
}
65+
66+
@Override
67+
protected Application configure() {
68+
return new ResourceConfig();
69+
}
70+
71+
@Test
72+
public void testChunkedInputNotStuckedTimes() throws InterruptedException {
73+
for (int i = 0; i != 10; i++) {
74+
boolean ret = testChunkedInputNotStucked();
75+
Assertions.assertTrue(ret, "JerseyChunkedInput was not closed on error");
76+
}
77+
}
78+
79+
public boolean testChunkedInputNotStucked() throws InterruptedException {
80+
final AtomicReference<NettyEntityWriter> writer = new AtomicReference<>();
81+
final CountDownLatch writerSetLatch = new CountDownLatch(1);
82+
final CountDownLatch flushLatch = new CountDownLatch(1);
83+
ConnectorProvider provider = new ConnectorProvider() {
84+
@Override
85+
public Connector getConnector(Client client, Configuration runtimeConfig) {
86+
return new NettyConnector(client) {
87+
@Override
88+
NettyEntityWriter nettyEntityWriter(ClientRequest clientRequest, Channel channel) {
89+
writer.set(super.nettyEntityWriter(clientRequest, channel));
90+
writerSetLatch.countDown();
91+
return new NettyEntityWriter() {
92+
private boolean slept = false;
93+
94+
@Override
95+
public void write(Object object) {
96+
writer.get().write(object);
97+
}
98+
99+
@Override
100+
public void writeAndFlush(Object object) {
101+
writer.get().writeAndFlush(object);
102+
}
103+
104+
@Override
105+
public void flush() throws IOException {
106+
writer.get().flush();
107+
flushLatch.countDown();
108+
}
109+
110+
@Override
111+
public ChunkedInput getChunkedInput() {
112+
for (StackTraceElement element : Thread.currentThread().getStackTrace()) {
113+
// caught from catch block in executorService.execute(new Runnable() {
114+
// "sleep" to simulate race condition
115+
if (element.getClassName().contains("NettyConnector")
116+
&& element.getMethodName().equals("run")) {
117+
try {
118+
flushLatch.await();
119+
} catch (InterruptedException e) {
120+
throw new RuntimeException(e);
121+
}
122+
}
123+
}
124+
return writer.get().getChunkedInput();
125+
}
126+
127+
@Override
128+
public OutputStream getOutputStream() {
129+
return writer.get().getOutputStream();
130+
}
131+
132+
@Override
133+
public long getLength() {
134+
return writer.get().getLength();
135+
}
136+
137+
@Override
138+
public Type getType() {
139+
return writer.get().getType();
140+
}
141+
};
142+
}
143+
};
144+
}
145+
};
146+
147+
Client client = initClient(provider);
148+
try {
149+
Response r = client
150+
.register(new MultipartWriter())
151+
.target(target().getUri()).request()
152+
.post(Entity.entity(new MultipartWriter(), MediaType.MULTIPART_FORM_DATA_TYPE));
153+
} catch (ProcessingException expected) {
154+
155+
}
156+
writerSetLatch.await();
157+
try {
158+
return writer.get().getChunkedInput().isEndOfInput();
159+
} catch (Exception e) {
160+
throw new RuntimeException(e);
161+
}
162+
}
163+
164+
private static class MultipartWriter implements MessageBodyWriter<Object> {
165+
166+
@Override
167+
public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
168+
return mediaType.equals(MediaType.MULTIPART_FORM_DATA_TYPE);
169+
}
170+
171+
@Override
172+
public void writeTo(Object object, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType,
173+
MultivaluedMap<String, Object> httpHeaders, OutputStream entityStream) throws IOException,
174+
WebApplicationException {
175+
throw new IllegalArgumentException("TestException");
176+
}
177+
}
178+
179+
}

connectors/netty-connector/src/test/java/org/glassfish/jersey/netty/connector/EmptyHeaderTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,10 @@
3434
import javax.ws.rs.core.Response;
3535
import java.util.Collections;
3636
import java.util.concurrent.ExecutionException;
37-
import java.util.logging.Logger;
3837

38+
/* Bug 5836 reproducer */
3939
public class EmptyHeaderTest extends JerseyTest {
4040

41-
private static final Logger LOGGER = Logger.getLogger(EmptyHeaderTest.class.getName());
42-
4341
public static void main(String[] args) throws ExecutionException, InterruptedException {
4442
new EmptyHeaderTest().testEmptyHeaders();
4543
}

0 commit comments

Comments
 (0)