Skip to content

Commit 497cf29

Browse files
committed
netty: flush channel context when not reading
1 parent 5059700 commit 497cf29

File tree

2 files changed

+127
-75
lines changed

2 files changed

+127
-75
lines changed

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyContext.java

Lines changed: 67 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ public class NettyContext implements DefaultContext, ChannelFutureListener {
9393
private MediaType responseType;
9494
private Map<String, Object> attributes;
9595
private long contentLength = -1;
96-
private boolean needsFlush;
9796
private Map<String, String> cookies;
9897
private Map<String, String> responseCookies;
9998
private Boolean resetHeadersOnError;
@@ -105,14 +104,17 @@ public class NettyContext implements DefaultContext, ChannelFutureListener {
105104
private String scheme;
106105
private int port;
107106
private boolean filesCreated;
107+
private NettyHandler connection;
108108

109109
public NettyContext(
110+
NettyHandler connection,
110111
ChannelHandlerContext ctx,
111112
HttpRequest req,
112113
Router router,
113114
String path,
114115
int bufferSize,
115116
boolean http2) {
117+
this.connection = connection;
116118
this.path = path;
117119
this.ctx = ctx;
118120
this.req = req;
@@ -581,43 +583,34 @@ Context send(@NonNull ByteBuf data, CharSequence contentLength) {
581583
responseStarted = true;
582584
setHeaders.set(CONTENT_LENGTH, contentLength);
583585
var response = new DefaultFullHttpResponse(HTTP_1_1, status, data, setHeaders, NO_TRAILING);
584-
if (ctx.channel().eventLoop().inEventLoop()) {
585-
needsFlush = true;
586-
ctx.write(response, promise(this));
587-
} else {
588-
ctx.writeAndFlush(response, promise(this));
589-
}
586+
connection.writeHttpObject(response, promise(this));
590587
return this;
591588
} finally {
592589
requestComplete();
593590
}
594591
}
595592

596-
public void flush() {
597-
if (needsFlush) {
598-
needsFlush = false;
599-
ctx.flush();
600-
destroy(null);
601-
}
602-
}
603-
604593
@NonNull @Override
605594
public Context send(@NonNull ReadableByteChannel channel) {
606595
try {
607596
prepareChunked();
608-
var rsp = new DefaultHttpResponse(HTTP_1_1, status, setHeaders);
609597
int bufferSize = contentLength > 0 ? (int) contentLength : this.bufferSize;
610-
ctx.channel()
611-
.eventLoop()
612-
.execute(
613-
() -> {
614-
// Headers
615-
ctx.write(rsp, ctx.voidPromise());
616-
// Body
617-
ctx.write(new ChunkedNioStream(channel, bufferSize), ctx.voidPromise());
618-
// Finish
619-
ctx.writeAndFlush(EMPTY_LAST_CONTENT, promise(this));
620-
});
598+
connection.writeHttpChunk(
599+
new DefaultHttpResponse(HTTP_1_1, status, setHeaders),
600+
new ChunkedNioStream(channel, bufferSize),
601+
EMPTY_LAST_CONTENT,
602+
promise(this));
603+
// ctx.channel()
604+
// .eventLoop()
605+
// .execute(
606+
// () -> {
607+
// // Headers
608+
// ctx.write(rsp, ctx.voidPromise());
609+
// // Body
610+
// ctx.write(new ChunkedNioStream(channel, bufferSize), ctx.voidPromise());
611+
// // Finish
612+
// ctx.writeAndFlush(EMPTY_LAST_CONTENT, promise(this));
613+
// });
621614
return this;
622615
} finally {
623616
requestComplete();
@@ -634,21 +627,23 @@ public Context send(@NonNull InputStream in) {
634627
long len = responseLength();
635628
ByteRange range = ByteRange.parse(req.headers().get(RANGE), len).apply(this);
636629
prepareChunked();
637-
ChunkedStream chunkedStream = new ChunkedStream(range.apply(in), bufferSize);
638-
639-
var rsp = new DefaultHttpResponse(HTTP_1_1, status, setHeaders);
630+
connection.writeHttpChunk(
631+
new DefaultHttpResponse(HTTP_1_1, status, setHeaders),
632+
new ChunkedStream(range.apply(in), bufferSize),
633+
EMPTY_LAST_CONTENT,
634+
promise(this));
640635
responseStarted = true;
641-
ctx.channel()
642-
.eventLoop()
643-
.execute(
644-
() -> {
645-
// Headers
646-
ctx.write(rsp, ctx.voidPromise());
647-
// Body
648-
ctx.write(chunkedStream, ctx.voidPromise());
649-
// Finish
650-
ctx.writeAndFlush(EMPTY_LAST_CONTENT, promise(this));
651-
});
636+
// ctx.channel()
637+
// .eventLoop()
638+
// .execute(
639+
// () -> {
640+
// // Headers
641+
// ctx.write(rsp, ctx.voidPromise());
642+
// // Body
643+
// ctx.write(chunkedStream, ctx.voidPromise());
644+
// // Finish
645+
// ctx.writeAndFlush(EMPTY_LAST_CONTENT, promise(this));
646+
// });
652647
return this;
653648
} catch (Exception x) {
654649
throw SneakyThrows.propagate(x);
@@ -671,33 +666,39 @@ public Context send(@NonNull FileChannel file) {
671666
if (preferChunked()) {
672667
prepareChunked();
673668

674-
HttpChunkedInput chunkedInput =
669+
var chunkedInput =
675670
new HttpChunkedInput(
676671
new ChunkedNioFile(file, range.getStart(), range.getEnd(), bufferSize));
677672

678-
ctx.channel()
679-
.eventLoop()
680-
.execute(
681-
() -> {
682-
// Headers
683-
ctx.write(rsp, ctx.voidPromise());
684-
// Body
685-
ctx.writeAndFlush(chunkedInput, promise(this));
686-
});
673+
connection.writeHttpChunk(rsp, chunkedInput, promise(this));
674+
// ctx.channel()
675+
// .eventLoop()
676+
// .execute(
677+
// () -> {
678+
// // Headers
679+
// ctx.write(rsp, ctx.voidPromise());
680+
// // Body
681+
// ctx.writeAndFlush(chunkedInput, promise(this));
682+
// });
687683
} else {
688-
ctx.channel()
689-
.eventLoop()
690-
.execute(
691-
() -> {
692-
// Headers
693-
ctx.write(rsp, ctx.voidPromise());
694-
// Body
695-
ctx.write(
696-
new DefaultFileRegion(file, range.getStart(), range.getEnd()),
697-
ctx.voidPromise());
698-
// Finish
699-
ctx.writeAndFlush(EMPTY_LAST_CONTENT, promise(this));
700-
});
684+
connection.writeHttpChunk(
685+
rsp,
686+
new DefaultFileRegion(file, range.getStart(), range.getEnd()),
687+
EMPTY_LAST_CONTENT,
688+
promise(this));
689+
// ctx.channel()
690+
// .eventLoop()
691+
// .execute(
692+
// () -> {
693+
// // Headers
694+
// ctx.write(rsp, ctx.voidPromise());
695+
// // Body
696+
// ctx.write(
697+
// new DefaultFileRegion(file, range.getStart(), range.getEnd()),
698+
// ctx.voidPromise());
699+
// // Finish
700+
// ctx.writeAndFlush(EMPTY_LAST_CONTENT, promise(this));
701+
// });
701702
}
702703
} catch (IOException x) {
703704
throw SneakyThrows.propagate(x);
@@ -743,12 +744,7 @@ public Context send(@NonNull StatusCode statusCode) {
743744
var rsp =
744745
new DefaultFullHttpResponse(
745746
HTTP_1_1, status, Unpooled.EMPTY_BUFFER, setHeaders, NO_TRAILING);
746-
if (ctx.channel().eventLoop().inEventLoop()) {
747-
needsFlush = true;
748-
ctx.write(rsp, promise(this));
749-
} else {
750-
ctx.writeAndFlush(rsp, promise(this));
751-
}
747+
connection.writeHttpObject(rsp, promise(this));
752748
return this;
753749
} finally {
754750
requestComplete();
@@ -758,6 +754,7 @@ public Context send(@NonNull StatusCode statusCode) {
758754
void requestComplete() {
759755
fireCompleteEvent();
760756
ifSaveSession();
757+
destroy(null);
761758
}
762759

763760
@Override

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
import io.jooby.*;
1818
import io.jooby.netty.NettyServer;
19-
import io.netty.channel.ChannelHandlerContext;
20-
import io.netty.channel.ChannelInboundHandlerAdapter;
19+
import io.netty.channel.*;
2120
import io.netty.handler.codec.http.*;
2221
import io.netty.handler.codec.http.multipart.*;
2322
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
@@ -35,6 +34,9 @@ public class NettyHandler extends ChannelInboundHandlerAdapter {
3534
private long chunkSize;
3635
private final boolean http2;
3736
private NettyContext context;
37+
private boolean read;
38+
private boolean flush;
39+
private ChannelHandlerContext channelContext;
3840

3941
public NettyHandler(
4042
NettyDateService serverDate,
@@ -51,15 +53,22 @@ public NettyHandler(
5153
this.http2 = http2;
5254
}
5355

56+
@Override
57+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
58+
this.channelContext = ctx;
59+
super.handlerAdded(ctx);
60+
}
61+
5462
@Override
5563
public void channelRead(ChannelHandlerContext ctx, Object msg) {
5664
if (isHttpRequest(msg)) {
65+
this.read = true;
5766
var req = (HttpRequest) msg;
5867
var path = pathOnly(req.uri());
5968
var app = contextSelector.select(path);
6069
this.router = app.getRouter();
6170

62-
context = new NettyContext(ctx, req, app, path, bufferSize, http2);
71+
context = new NettyContext(this, ctx, req, app, path, bufferSize, http2);
6372

6473
if (defaultHeaders) {
6574
context.setHeaders.set(DATE, serverDate.date());
@@ -95,6 +104,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
95104
release(chunk);
96105
}
97106
} else if (isHttpContent(msg)) {
107+
this.read = true;
98108
var chunk = (HttpContent) msg;
99109
try {
100110
// when decoder == null, chunk is always a LastHttpContent.EMPTY, ignore it
@@ -118,17 +128,62 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
118128
}
119129
}
120130

131+
public void writeHttpObject(Object msg, ChannelPromise promise) {
132+
if (this.channelContext.executor().inEventLoop()) {
133+
if (this.read) {
134+
this.flush = true;
135+
this.channelContext.write(msg, promise);
136+
} else {
137+
this.channelContext.writeAndFlush(msg, promise);
138+
}
139+
} else {
140+
this.channelContext.executor().execute(() -> writeHttpObject(msg, promise));
141+
}
142+
}
143+
144+
public void writeHttpChunk(Object header, Object body, Object last, ChannelPromise promise) {
145+
if (this.channelContext.executor().inEventLoop()) {
146+
// Headers
147+
channelContext.write(header, channelContext.voidPromise());
148+
// Body
149+
channelContext.write(body, channelContext.voidPromise());
150+
// Finish
151+
channelContext.writeAndFlush(last, promise);
152+
} else {
153+
this.channelContext.executor().execute(() -> writeHttpChunk(header, body, last, promise));
154+
}
155+
}
156+
157+
public void writeHttpChunk(Object header, Object body, ChannelPromise promise) {
158+
if (this.channelContext.executor().inEventLoop()) {
159+
// Headers
160+
channelContext.write(header, channelContext.voidPromise());
161+
// Body + Last
162+
channelContext.writeAndFlush(body, promise);
163+
} else {
164+
this.channelContext.executor().execute(() -> writeHttpChunk(header, body, promise));
165+
}
166+
}
167+
121168
private void release(HttpContent ref) {
122169
if (ref.refCnt() > 0) {
123170
ref.release();
124171
}
125172
}
126173

127174
@Override
128-
public void channelReadComplete(ChannelHandlerContext ctx) {
175+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
176+
if (this.read) {
177+
this.read = false;
178+
if (this.flush) {
179+
this.flush = false;
180+
ctx.flush();
181+
}
182+
}
129183
if (context != null) {
130-
context.flush();
184+
// context.destroy(null);
131185
}
186+
super.channelReadComplete(ctx);
132187
}
133188

134189
@Override

0 commit comments

Comments
 (0)