Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package hello;

import static hello.HttpResponses.makeJsonResponse;
import static hello.HttpResponses.makePlaintextResponse;
import static hello.HttpResponses.*;
import static hello.JsonUtils.acquireJsonStreamFromEventLoop;
import static hello.JsonUtils.releaseJsonStreamFromEventLoop;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
Expand All @@ -11,6 +10,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.jsoniter.output.JsonStream;
Expand All @@ -19,11 +19,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.*;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
Expand All @@ -37,15 +33,19 @@ protected DateFormat initialValue() {
}
};

protected volatile AsciiString date = new AsciiString(FORMAT.get().format(new Date()));
private HttpHeaders jsonHeaders = makeJsonHeaders(new AsciiString(FORMAT.get().format(new Date())));
private HttpHeaders plaintextHeaders = makePlaintextHeaders(new AsciiString(FORMAT.get().format(new Date())));
private ScheduledFuture<?> refreshHeaders;

public HelloServerHandler(ScheduledExecutorService service) {
service.scheduleWithFixedDelay(new Runnable() {
refreshHeaders = service.scheduleWithFixedDelay(new Runnable() {
private final DateFormat format = FORMAT.get();

@Override
public void run() {
date = new AsciiString(format.format(new Date()));
var date = new AsciiString(format.format(new Date()));
jsonHeaders = makeJsonHeaders(date);
plaintextHeaders = makePlaintextHeaders(date);
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -80,13 +80,13 @@ private void process(ChannelHandlerContext ctx, HttpRequest request) throws Exce
String uri = request.uri();
switch (uri) {
case "/plaintext":
writePlainResponse(ctx, date);
writePlainResponse(ctx, plaintextHeaders);
return;
case "/json":
// even for the virtual thread case we expect virtual threads to be executed inlined!
var stream = acquireJsonStreamFromEventLoop();
try {
writeJsonResponse(ctx, stream, date);
writeJsonResponse(ctx, stream, jsonHeaders);
} finally {
releaseJsonStreamFromEventLoop(stream);
}
Expand All @@ -98,12 +98,12 @@ private void process(ChannelHandlerContext ctx, HttpRequest request) throws Exce
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

protected void writePlainResponse(ChannelHandlerContext ctx, AsciiString date) {
ctx.write(makePlaintextResponse(date), ctx.voidPromise());
protected void writePlainResponse(ChannelHandlerContext ctx, HttpHeaders plaintextHeaders) {
ctx.write(makePlaintextResponse(plaintextHeaders), ctx.voidPromise());
}

protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, AsciiString date) {
ctx.write(makeJsonResponse(stream, date), ctx.voidPromise());
protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, HttpHeaders jsonHeaders) {
ctx.write(makeJsonResponse(stream, jsonHeaders), ctx.voidPromise());
}

@Override
Expand All @@ -115,4 +115,13 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (refreshHeaders != null) {
refreshHeaders.cancel(false);
refreshHeaders = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,45 @@

import java.util.concurrent.ScheduledExecutorService;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.*;

public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {

protected final ScheduledExecutorService service;

public HelloServerInitializer(ScheduledExecutorService service) {
this.service = service;
public HelloServerInitializer() {
}

@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("encoder", new HttpResponseEncoder() {

private ByteBuf encodedHeaders;
private HttpHeaders lastSeenHeaders;

@Override
protected void encodeHeaders(HttpHeaders headers, ByteBuf buf) {
if (lastSeenHeaders != headers) {
updateEncodedHttpHeaders(headers, buf);
}
var encodedHeaders = this.encodedHeaders;
buf.ensureWritable(encodedHeaders.readableBytes());
encodedHeaders.getBytes(encodedHeaders.readerIndex(), buf, encodedHeaders.readableBytes());
}

private void updateEncodedHttpHeaders(HttpHeaders headers, ByteBuf buf) {
if (encodedHeaders == null) {
encodedHeaders = Unpooled.buffer(buf.writableBytes());
} else {
encodedHeaders.clear().ensureWritable(buf.writableBytes());
}
super.encodeHeaders(headers, encodedHeaders);
lastSeenHeaders = headers;
}

@Override
public boolean acceptOutboundMessage(final Object msg) throws Exception {
if (msg.getClass() == DefaultFullHttpResponse.class) {
Expand All @@ -46,7 +63,7 @@ protected boolean isContentAlwaysEmpty(final HttpMessage msg) {
return false;
}
})
.addLast("handler", newHelloServerHandler(service));
.addLast("handler", newHelloServerHandler(ch.eventLoop()));
}

protected HelloServerHandler newHelloServerHandler(ScheduledExecutorService service) {
Expand Down
4 changes: 2 additions & 2 deletions frameworks/Java/netty/src/main/java/hello/HelloWebServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public void run() throws Exception {
}
var channelB = b.group(group).channel(serverChannelClass);
if (EVENT_LOOP_CARRIER) {
channelB.childHandler(new HelloLoomServerInitializer((MultithreadVirtualEventExecutorGroup) group, group.next()));
channelB.childHandler(new HelloLoomServerInitializer((MultithreadVirtualEventExecutorGroup) group));
} else {
channelB.childHandler(new HelloServerInitializer(group.next()));
channelB.childHandler(new HelloServerInitializer());
}
b.childOption(ChannelOption.SO_REUSEADDR, true);

Expand Down
39 changes: 28 additions & 11 deletions frameworks/Java/netty/src/main/java/hello/HttpResponses.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,43 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpHeadersFactory;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeadersFactory;
import io.netty.util.AsciiString;

public class HttpResponses {

public static FullHttpResponse makePlaintextResponse(AsciiString date) {
return makeResponse(Unpooled.wrappedBuffer(STATIC_PLAINTEXT), TEXT_PLAIN, PLAINTEXT_CLHEADER_VALUE, date);
}
private static final HttpHeadersFactory HEADERS_FACTORY = DefaultHttpHeadersFactory.headersFactory()
.withValidation(false);

public static FullHttpResponse makeJsonResponse(JsonStream stream, AsciiString date) {
return makeResponse(Unpooled.wrappedBuffer(serializeMsg(newMsg(), stream)), APPLICATION_JSON, JSON_CLHEADER_VALUE, date);
public static HttpHeaders makeJsonHeaders(AsciiString date) {
return HEADERS_FACTORY.newHeaders()
.set(CONTENT_TYPE, APPLICATION_JSON)
.set(SERVER, SERVER_NAME)
.set(DATE, date)
.set(CONTENT_LENGTH, JSON_CLHEADER_VALUE);
}

private static FullHttpResponse makeResponse(ByteBuf buf, CharSequence contentType, CharSequence contentLength, AsciiString date) {
final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buf, false);
response.headers()
.set(CONTENT_TYPE, contentType)

public static HttpHeaders makePlaintextHeaders(AsciiString date) {
return HEADERS_FACTORY.newHeaders()
.set(CONTENT_TYPE, TEXT_PLAIN)
.set(SERVER, SERVER_NAME)
.set(DATE, date)
.set(CONTENT_LENGTH, contentLength);
return response;
.set(CONTENT_LENGTH, PLAINTEXT_CLHEADER_VALUE);
}

public static FullHttpResponse makePlaintextResponse(HttpHeaders plaintextHeaders) {
return makeResponse(Unpooled.wrappedBuffer(STATIC_PLAINTEXT), plaintextHeaders);
}

public static FullHttpResponse makeJsonResponse(JsonStream stream, HttpHeaders jsonHeaders) {
return makeResponse(Unpooled.wrappedBuffer(serializeMsg(newMsg(), stream)), jsonHeaders);
}

private static FullHttpResponse makeResponse(ByteBuf buf, HttpHeaders headers) {
return new DefaultFullHttpResponse(HTTP_1_1, OK, buf, headers, HttpHeaders.EMPTY_HEADERS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ public class HelloLoomServerInitializer extends HelloServerInitializer {

private final MultithreadVirtualEventExecutorGroup group;

public HelloLoomServerInitializer(MultithreadVirtualEventExecutorGroup group, ScheduledExecutorService service) {
super(service);
public HelloLoomServerInitializer(MultithreadVirtualEventExecutorGroup group) {
this.group = group;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import hello.HttpResponses;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.AsciiString;

public class VirtualThreadHelloServerHandler extends HelloServerHandler {
Expand All @@ -22,16 +23,16 @@ public VirtualThreadHelloServerHandler(ScheduledExecutorService service, Multith
}

@Override
protected void writePlainResponse(ChannelHandlerContext ctx, AsciiString date) {
protected void writePlainResponse(ChannelHandlerContext ctx, HttpHeaders plainTextHeaders) {
group.eventLoopVirtualThreadFactory().newThread(() -> {
responses.add(HttpResponses.makePlaintextResponse(date));
responses.add(HttpResponses.makePlaintextResponse(plainTextHeaders));
}).start();
}

@Override
protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, AsciiString date) {
protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, HttpHeaders jsonHeaders) {
group.eventLoopVirtualThreadFactory().newThread(() -> {
responses.add(HttpResponses.makeJsonResponse(stream, date));
responses.add(HttpResponses.makeJsonResponse(stream, jsonHeaders));
}).start();
}

Expand Down
Loading