diff --git a/frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java b/frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java index ffed975ab09..db3f7896940 100644 --- a/frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java +++ b/frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java @@ -1,7 +1,6 @@ package hello; -import static hello.HttpResponses.makeJsonResponse; -import static hello.HttpResponses.makePlaintextResponse; +import static hello.HttpResponses.*; import static hello.JsonUtils.acquireJsonStreamFromEventLoop; import static hello.JsonUtils.releaseJsonStreamFromEventLoop; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; @@ -11,6 +10,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import com.jsoniter.output.JsonStream; @@ -19,11 +19,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.DefaultHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.*; import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; @@ -37,15 +33,19 @@ protected DateFormat initialValue() { } }; - protected volatile AsciiString date = new AsciiString(FORMAT.get().format(new Date())); + private HttpHeaders jsonHeaders = makeJsonHeaders(new AsciiString(FORMAT.get().format(new Date()))); + private HttpHeaders plaintextHeaders = makePlaintextHeaders(new AsciiString(FORMAT.get().format(new Date()))); + private ScheduledFuture refreshHeaders; public HelloServerHandler(ScheduledExecutorService service) { - service.scheduleWithFixedDelay(new Runnable() { + refreshHeaders = service.scheduleWithFixedDelay(new Runnable() { private final DateFormat format = FORMAT.get(); @Override public void run() { - date = new AsciiString(format.format(new Date())); + var date = new AsciiString(format.format(new Date())); + jsonHeaders = makeJsonHeaders(date); + plaintextHeaders = makePlaintextHeaders(date); } }, 1000, 1000, TimeUnit.MILLISECONDS); } @@ -80,13 +80,13 @@ private void process(ChannelHandlerContext ctx, HttpRequest request) throws Exce String uri = request.uri(); switch (uri) { case "/plaintext": - writePlainResponse(ctx, date); + writePlainResponse(ctx, plaintextHeaders); return; case "/json": // even for the virtual thread case we expect virtual threads to be executed inlined! var stream = acquireJsonStreamFromEventLoop(); try { - writeJsonResponse(ctx, stream, date); + writeJsonResponse(ctx, stream, jsonHeaders); } finally { releaseJsonStreamFromEventLoop(stream); } @@ -98,12 +98,12 @@ private void process(ChannelHandlerContext ctx, HttpRequest request) throws Exce ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } - protected void writePlainResponse(ChannelHandlerContext ctx, AsciiString date) { - ctx.write(makePlaintextResponse(date), ctx.voidPromise()); + protected void writePlainResponse(ChannelHandlerContext ctx, HttpHeaders plaintextHeaders) { + ctx.write(makePlaintextResponse(plaintextHeaders), ctx.voidPromise()); } - protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, AsciiString date) { - ctx.write(makeJsonResponse(stream, date), ctx.voidPromise()); + protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, HttpHeaders jsonHeaders) { + ctx.write(makeJsonResponse(stream, jsonHeaders), ctx.voidPromise()); } @Override @@ -115,4 +115,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + if (refreshHeaders != null) { + refreshHeaders.cancel(false); + refreshHeaders = null; + } + } } diff --git a/frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java b/frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java index 67692802a83..5fc354e61a3 100644 --- a/frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java +++ b/frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java @@ -2,28 +2,45 @@ import java.util.concurrent.ScheduledExecutorService; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.DefaultHttpRequest; -import io.netty.handler.codec.http.HttpMessage; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequestDecoder; -import io.netty.handler.codec.http.HttpResponseEncoder; -import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.*; public class HelloServerInitializer extends ChannelInitializer { - protected final ScheduledExecutorService service; - - public HelloServerInitializer(ScheduledExecutorService service) { - this.service = service; + public HelloServerInitializer() { } @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("encoder", new HttpResponseEncoder() { + + private ByteBuf encodedHeaders; + private HttpHeaders lastSeenHeaders; + + @Override + protected void encodeHeaders(HttpHeaders headers, ByteBuf buf) { + if (lastSeenHeaders != headers) { + updateEncodedHttpHeaders(headers, buf); + } + var encodedHeaders = this.encodedHeaders; + buf.ensureWritable(encodedHeaders.readableBytes()); + encodedHeaders.getBytes(encodedHeaders.readerIndex(), buf, encodedHeaders.readableBytes()); + } + + private void updateEncodedHttpHeaders(HttpHeaders headers, ByteBuf buf) { + if (encodedHeaders == null) { + encodedHeaders = Unpooled.buffer(buf.writableBytes()); + } else { + encodedHeaders.clear().ensureWritable(buf.writableBytes()); + } + super.encodeHeaders(headers, encodedHeaders); + lastSeenHeaders = headers; + } + @Override public boolean acceptOutboundMessage(final Object msg) throws Exception { if (msg.getClass() == DefaultFullHttpResponse.class) { @@ -46,7 +63,7 @@ protected boolean isContentAlwaysEmpty(final HttpMessage msg) { return false; } }) - .addLast("handler", newHelloServerHandler(service)); + .addLast("handler", newHelloServerHandler(ch.eventLoop())); } protected HelloServerHandler newHelloServerHandler(ScheduledExecutorService service) { diff --git a/frameworks/Java/netty/src/main/java/hello/HelloWebServer.java b/frameworks/Java/netty/src/main/java/hello/HelloWebServer.java index cca8e555113..492a44121c9 100644 --- a/frameworks/Java/netty/src/main/java/hello/HelloWebServer.java +++ b/frameworks/Java/netty/src/main/java/hello/HelloWebServer.java @@ -67,9 +67,9 @@ public void run() throws Exception { } var channelB = b.group(group).channel(serverChannelClass); if (EVENT_LOOP_CARRIER) { - channelB.childHandler(new HelloLoomServerInitializer((MultithreadVirtualEventExecutorGroup) group, group.next())); + channelB.childHandler(new HelloLoomServerInitializer((MultithreadVirtualEventExecutorGroup) group)); } else { - channelB.childHandler(new HelloServerInitializer(group.next())); + channelB.childHandler(new HelloServerInitializer()); } b.childOption(ChannelOption.SO_REUSEADDR, true); diff --git a/frameworks/Java/netty/src/main/java/hello/HttpResponses.java b/frameworks/Java/netty/src/main/java/hello/HttpResponses.java index 64b88ccf086..5e36c43a269 100644 --- a/frameworks/Java/netty/src/main/java/hello/HttpResponses.java +++ b/frameworks/Java/netty/src/main/java/hello/HttpResponses.java @@ -20,26 +20,43 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpHeadersFactory; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpHeadersFactory; import io.netty.util.AsciiString; public class HttpResponses { - public static FullHttpResponse makePlaintextResponse(AsciiString date) { - return makeResponse(Unpooled.wrappedBuffer(STATIC_PLAINTEXT), TEXT_PLAIN, PLAINTEXT_CLHEADER_VALUE, date); - } + private static final HttpHeadersFactory HEADERS_FACTORY = DefaultHttpHeadersFactory.headersFactory() + .withValidation(false); - public static FullHttpResponse makeJsonResponse(JsonStream stream, AsciiString date) { - return makeResponse(Unpooled.wrappedBuffer(serializeMsg(newMsg(), stream)), APPLICATION_JSON, JSON_CLHEADER_VALUE, date); + public static HttpHeaders makeJsonHeaders(AsciiString date) { + return HEADERS_FACTORY.newHeaders() + .set(CONTENT_TYPE, APPLICATION_JSON) + .set(SERVER, SERVER_NAME) + .set(DATE, date) + .set(CONTENT_LENGTH, JSON_CLHEADER_VALUE); } - private static FullHttpResponse makeResponse(ByteBuf buf, CharSequence contentType, CharSequence contentLength, AsciiString date) { - final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buf, false); - response.headers() - .set(CONTENT_TYPE, contentType) + + public static HttpHeaders makePlaintextHeaders(AsciiString date) { + return HEADERS_FACTORY.newHeaders() + .set(CONTENT_TYPE, TEXT_PLAIN) .set(SERVER, SERVER_NAME) .set(DATE, date) - .set(CONTENT_LENGTH, contentLength); - return response; + .set(CONTENT_LENGTH, PLAINTEXT_CLHEADER_VALUE); + } + + public static FullHttpResponse makePlaintextResponse(HttpHeaders plaintextHeaders) { + return makeResponse(Unpooled.wrappedBuffer(STATIC_PLAINTEXT), plaintextHeaders); + } + + public static FullHttpResponse makeJsonResponse(JsonStream stream, HttpHeaders jsonHeaders) { + return makeResponse(Unpooled.wrappedBuffer(serializeMsg(newMsg(), stream)), jsonHeaders); + } + + private static FullHttpResponse makeResponse(ByteBuf buf, HttpHeaders headers) { + return new DefaultFullHttpResponse(HTTP_1_1, OK, buf, headers, HttpHeaders.EMPTY_HEADERS); } } diff --git a/frameworks/Java/netty/src/main/java/hello/loom/HelloLoomServerInitializer.java b/frameworks/Java/netty/src/main/java/hello/loom/HelloLoomServerInitializer.java index afa5db2cac5..ab407b4ffc5 100644 --- a/frameworks/Java/netty/src/main/java/hello/loom/HelloLoomServerInitializer.java +++ b/frameworks/Java/netty/src/main/java/hello/loom/HelloLoomServerInitializer.java @@ -9,8 +9,7 @@ public class HelloLoomServerInitializer extends HelloServerInitializer { private final MultithreadVirtualEventExecutorGroup group; - public HelloLoomServerInitializer(MultithreadVirtualEventExecutorGroup group, ScheduledExecutorService service) { - super(service); + public HelloLoomServerInitializer(MultithreadVirtualEventExecutorGroup group) { this.group = group; } diff --git a/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadHelloServerHandler.java b/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadHelloServerHandler.java index 9b93088fe0f..9da17a24adb 100644 --- a/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadHelloServerHandler.java +++ b/frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadHelloServerHandler.java @@ -9,6 +9,7 @@ import hello.HttpResponses; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.util.AsciiString; public class VirtualThreadHelloServerHandler extends HelloServerHandler { @@ -22,16 +23,16 @@ public VirtualThreadHelloServerHandler(ScheduledExecutorService service, Multith } @Override - protected void writePlainResponse(ChannelHandlerContext ctx, AsciiString date) { + protected void writePlainResponse(ChannelHandlerContext ctx, HttpHeaders plainTextHeaders) { group.eventLoopVirtualThreadFactory().newThread(() -> { - responses.add(HttpResponses.makePlaintextResponse(date)); + responses.add(HttpResponses.makePlaintextResponse(plainTextHeaders)); }).start(); } @Override - protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, AsciiString date) { + protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, HttpHeaders jsonHeaders) { group.eventLoopVirtualThreadFactory().newThread(() -> { - responses.add(HttpResponses.makeJsonResponse(stream, date)); + responses.add(HttpResponses.makeJsonResponse(stream, jsonHeaders)); }).start(); }