Skip to content

Commit 141377b

Browse files
committed
- netty: code cleaup + minor fixes
1 parent ec8a153 commit 141377b

File tree

6 files changed

+122
-35
lines changed

6 files changed

+122
-35
lines changed

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyContext.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -422,15 +422,18 @@ public Context upgrade(WebSocket.Initializer handler) {
422422
? conf.getBytes("websocket.maxSize").intValue()
423423
: WebSocket.MAX_BUFFER_SIZE;
424424
String webSocketURL = getProtocol() + "://" + req.headers().get(HttpHeaderNames.HOST) + path;
425+
425426
var config =
426427
WebSocketDecoderConfig.newBuilder()
427428
.allowExtensions(true)
428429
.allowMaskMismatch(false)
429430
.withUTF8Validator(false)
430431
.maxFramePayloadLength(maxSize)
431432
.build();
433+
432434
webSocket = new NettyWebSocket(this);
433435
handler.init(Context.readOnly(this), webSocket);
436+
434437
var webSocketRequest =
435438
new DefaultFullHttpRequest(
436439
HTTP_1_1,
@@ -439,21 +442,48 @@ public Context upgrade(WebSocket.Initializer handler) {
439442
Unpooled.EMPTY_BUFFER,
440443
req.headers(),
441444
EmptyHttpHeaders.INSTANCE);
445+
442446
var codec = ctx.pipeline().get(NettyServerCodec.class);
443447
codec.webSocketHandshake(ctx);
448+
444449
WebSocketServerHandshakerFactory factory =
445450
new WebSocketServerHandshakerFactory(webSocketURL, null, config);
446451
WebSocketServerHandshaker handshaker = factory.newHandshaker(webSocketRequest);
447-
handshaker.handshake(ctx.channel(), webSocketRequest);
448-
webSocket.fireConnect();
449-
long timeout =
450-
conf.hasPath("websocket.idleTimeout")
451-
? conf.getDuration("websocket.idleTimeout", MILLISECONDS)
452-
: MINUTES.toMillis(5);
453-
if (timeout > 0) {
454-
IdleStateHandler idle = new IdleStateHandler(timeout, 0, 0, MILLISECONDS);
455-
ctx.pipeline().addBefore("handler", "idle", idle);
452+
453+
if (handshaker == null) {
454+
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
455+
return this;
456456
}
457+
458+
handshaker
459+
.handshake(ctx.channel(), webSocketRequest)
460+
.addListener(
461+
future -> {
462+
if (future.isSuccess()) {
463+
// Only notify the application once the TCP write completes successfully
464+
webSocket.fireConnect();
465+
466+
// FIX 3: Safely apply the WebSocket-specific idle timeout
467+
long timeout =
468+
conf.hasPath("websocket.idleTimeout")
469+
? conf.getDuration("websocket.idleTimeout", MILLISECONDS)
470+
: MINUTES.toMillis(5);
471+
472+
if (timeout > 0) {
473+
IdleStateHandler idle = new IdleStateHandler(timeout, 0, 0, MILLISECONDS);
474+
// If the global server timeout is already there, replace it. Otherwise, add it.
475+
if (ctx.pipeline().get(IdleStateHandler.class) != null) {
476+
ctx.pipeline().replace(IdleStateHandler.class, "idle", idle);
477+
} else {
478+
ctx.pipeline().addBefore("handler", "idle", idle);
479+
}
480+
}
481+
} else {
482+
// Handshake failed (e.g., client hung up during negotiation)
483+
ctx.fireExceptionCaught(future.cause());
484+
}
485+
});
486+
457487
} catch (Throwable x) {
458488
sendError(x);
459489
}

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcExchange.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
*/
66
package io.jooby.internal.netty;
77

8+
import java.net.URLEncoder;
89
import java.nio.ByteBuffer;
10+
import java.nio.charset.StandardCharsets;
911
import java.util.HashMap;
1012
import java.util.Map;
13+
import java.util.concurrent.atomic.AtomicBoolean;
1114
import java.util.function.Consumer;
1215

1316
import io.jooby.rpc.grpc.GrpcExchange;
@@ -17,7 +20,6 @@
1720
import io.netty.handler.codec.http.DefaultHttpContent;
1821
import io.netty.handler.codec.http.DefaultHttpResponse;
1922
import io.netty.handler.codec.http.DefaultLastHttpContent;
20-
import io.netty.handler.codec.http.HttpContent;
2123
import io.netty.handler.codec.http.HttpHeaderNames;
2224
import io.netty.handler.codec.http.HttpRequest;
2325
import io.netty.handler.codec.http.HttpResponse;
@@ -29,7 +31,8 @@ public class NettyGrpcExchange implements GrpcExchange {
2931

3032
private final ChannelHandlerContext ctx;
3133
private final HttpRequest request;
32-
private boolean headersSent = false;
34+
35+
private final AtomicBoolean headersSent = new AtomicBoolean(false);
3336

3437
public NettyGrpcExchange(ChannelHandlerContext ctx, HttpRequest request) {
3538
this.ctx = ctx;
@@ -58,12 +61,11 @@ public Map<String, String> getHeaders() {
5861
}
5962

6063
private void sendHeadersIfNecessary() {
61-
if (!headersSent) {
64+
if (headersSent.compareAndSet(false, true)) {
6265
// Send the initial HTTP/2 HEADERS frame (Status 200)
6366
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
6467
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/grpc");
6568
ctx.write(response);
66-
headersSent = true;
6769
}
6870
}
6971

@@ -72,7 +74,7 @@ public void send(ByteBuffer payload, Consumer<Throwable> callback) {
7274
sendHeadersIfNecessary();
7375

7476
// Wrap the NIO ByteBuffer in a Netty ByteBuf without copying
75-
HttpContent chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(payload));
77+
var chunk = new DefaultHttpContent(Unpooled.wrappedBuffer(payload));
7678

7779
// Write and flush, then map Netty's Future to your single-lambda callback
7880
ctx.writeAndFlush(chunk)
@@ -88,28 +90,41 @@ public void send(ByteBuffer payload, Consumer<Throwable> callback) {
8890

8991
@Override
9092
public void close(int statusCode, String description) {
91-
if (headersSent) {
92-
// Trailers-Appended: Send the final HTTP/2 HEADERS frame with END_STREAM flag
93+
var encodedDescription = encodeGrpcMessage(description);
94+
95+
// If headers were already sent, we just need to send the final trailers
96+
if (headersSent.get()) {
9397
LastHttpContent lastContent = new DefaultLastHttpContent();
9498
lastContent.trailingHeaders().set("grpc-status", String.valueOf(statusCode));
95-
if (description != null) {
96-
lastContent.trailingHeaders().set("grpc-message", description);
99+
100+
if (encodedDescription != null) {
101+
lastContent.trailingHeaders().set("grpc-message", encodedDescription);
97102
}
98-
// writeAndFlush the LastHttpContent, then close the Netty stream channel
103+
99104
ctx.writeAndFlush(lastContent).addListener(ChannelFutureListener.CLOSE);
105+
100106
} else {
101-
// Trailers-Only: No body was sent, so standard headers become the trailers
102-
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
107+
// Trailers-Only fast path: Headers and trailers combined
108+
var response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
103109
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/grpc");
104110
response.headers().set("grpc-status", String.valueOf(statusCode));
105-
if (description != null) {
106-
response.headers().set("grpc-message", description);
111+
112+
if (encodedDescription != null) {
113+
response.headers().set("grpc-message", encodedDescription);
107114
}
108-
ctx.write(response);
109115

110-
// Close out the stream with an empty DATA frame possessing the END_STREAM flag
116+
ctx.write(response);
111117
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)
112118
.addListener(ChannelFutureListener.CLOSE);
113119
}
114120
}
121+
122+
/** gRPC specification requires the grpc-message trailer to be percent-encoded. */
123+
private static String encodeGrpcMessage(String description) {
124+
if (description == null) {
125+
return null;
126+
}
127+
// URLEncoder uses '+' for spaces, but gRPC strictly expects '%20'
128+
return URLEncoder.encode(description, StandardCharsets.UTF_8).replace("+", "%20");
129+
}
115130
}

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
*/
66
package io.jooby.internal.netty;
77

8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
811
import io.jooby.rpc.grpc.GrpcProcessor;
912
import io.netty.channel.ChannelFutureListener;
1013
import io.netty.channel.ChannelHandlerContext;
@@ -14,6 +17,8 @@
1417

1518
public class NettyGrpcHandler extends ChannelInboundHandlerAdapter {
1619

20+
private static final Logger log = LoggerFactory.getLogger(NettyGrpcHandler.class);
21+
1722
private final GrpcProcessor processor;
1823
private final boolean isHttp2;
1924

@@ -39,7 +44,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
3944
if (processor.isGrpcMethod(path)
4045
&& contentType != null
4146
&& contentType.startsWith("application/grpc")) {
42-
isGrpc = true;
4347

4448
if (!isHttp2) {
4549
// gRPC requires HTTP/2. Reject HTTP/1.1 calls immediately.
@@ -53,6 +57,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5357
return;
5458
}
5559

60+
// This prevents leaking state on rejected HTTP/1.1 connections.
61+
isGrpc = true;
62+
5663
// We will implement NettyGrpcExchange in the next step
5764
var exchange = new NettyGrpcExchange(ctx, req);
5865
var subscriber = processor.process(exchange);
@@ -92,6 +99,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
9299

93100
@Override
94101
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
102+
log.debug("gRPC stream exception caught", cause);
95103
if (isGrpc) {
96104
ctx.close();
97105
} else {

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyGrpcInputBridge.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,27 @@ public void cancel() {
5050
ctx.close(); // Abort the connection
5151
}
5252

53-
/** Called by the NettyGrpcHandler when a new chunk arrives from the network. */
5453
public void onChunk(HttpContent chunk) {
5554
try {
5655
ByteBuf content = chunk.content();
5756
if (content.isReadable()) {
58-
// Convert Netty ByteBuf to standard Java ByteBuffer
59-
ByteBuffer buffer = content.nioBuffer();
6057

61-
// Pass to the gRPC deframer
62-
subscriber.onNext(buffer);
58+
// Copy the bytes. content.nioBuffer() shares memory with the ByteBuf,
59+
// which gets released in the finally block of NettyGrpcHandler.
60+
// We must copy it to an unmanaged heap buffer to prevent use-after-free corruption.
61+
byte[] bytes = new byte[content.readableBytes()];
62+
content.readBytes(bytes);
63+
ByteBuffer buffer = ByteBuffer.wrap(bytes);
6364

65+
// If onNext synchronously calls request(1), that request() will see demand transition
66+
// from 0 to 1 and safely trigger ctx.read().
6467
long currentDemand = demand.decrementAndGet();
68+
69+
// Pass the isolated buffer to the gRPC deframer
70+
subscriber.onNext(buffer);
71+
6572
if (currentDemand > 0) {
66-
// Still have demand, ask Netty for the next chunk
73+
// Still have demand from previous requests, ask Netty for the next chunk
6774
ctx.read();
6875
}
6976
}

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.handler.codec.http.multipart.*;
2222
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
2323
import io.netty.handler.timeout.IdleStateEvent;
24+
import io.netty.util.ReferenceCountUtil;
2425

2526
public class NettyHandler extends ChannelInboundHandlerAdapter {
2627
private final Logger log = LoggerFactory.getLogger(NettyServer.class);
@@ -88,6 +89,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
8889
if (req.getClass() == DefaultFullHttpRequest.class) {
8990
// HTTP2 aggregates all into a full http request.
9091
if (((DefaultFullHttpRequest) req).content().readableBytes() > maxRequestSize) {
92+
release(req);
9193
router.match(context).execute(context, Route.REQUEST_ENTITY_TOO_LARGE);
9294
return;
9395
}
@@ -127,6 +129,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
127129
} else if (isWebSocketFrame(msg)) {
128130
if (context.webSocket != null) {
129131
context.webSocket.handleFrame((WebSocketFrame) msg);
132+
} else {
133+
release(msg);
130134
}
131135
}
132136
}
@@ -189,9 +193,9 @@ public void writeChunks(Object header, Object body, ChannelPromise promise) {
189193
}
190194
}
191195

192-
private void release(HttpContent ref) {
193-
if (ref.refCnt() > 0) {
194-
ref.release();
196+
private void release(Object ref) {
197+
if (ReferenceCountUtil.refCnt(ref) > 0) {
198+
ReferenceCountUtil.release(ref);
195199
}
196200
}
197201

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyPipeline.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,29 @@ private void setupHttp11Upgrade(ChannelPipeline pipeline) {
112112
protocol -> "h2c".equals(protocol.toString()) ? createH2CUpgradeCodec() : null,
113113
(int) maxRequestSize));
114114

115+
pipeline.addLast(
116+
"h2upgrade-cleaner",
117+
new ChannelInboundHandlerAdapter() {
118+
@Override
119+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
120+
if (evt instanceof HttpServerUpgradeHandler.UpgradeEvent) {
121+
ChannelPipeline p = ctx.pipeline();
122+
123+
// Strip out HTTP/1.1 handlers that are no longer needed on the parent channel.
124+
// The Http2StreamInitializer will attach new ones for the multiplexed child channels.
125+
if (p.context("grpc") != null) p.remove("grpc");
126+
if (p.context("handler") != null) p.remove("handler");
127+
if (p.context("expect-continue") != null) p.remove("expect-continue");
128+
if (p.context("compressor") != null) p.remove("compressor");
129+
if (p.context("ws-compressor") != null) p.remove("ws-compressor");
130+
131+
// Remove this cleaner itself so it doesn't linger
132+
p.remove(this);
133+
}
134+
super.userEventTriggered(ctx, evt);
135+
}
136+
});
137+
115138
addCommonHandlers(pipeline);
116139

117140
// Inject gRPC handler (isHttp2 = false to trigger 426 Upgrade Required)

0 commit comments

Comments
 (0)