Skip to content

Commit 4d19a3e

Browse files
committed
netty: refactor outputstream
- it groups write calls - #3778
1 parent 497cf29 commit 4d19a3e

File tree

5 files changed

+64
-100
lines changed

5 files changed

+64
-100
lines changed

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyContext.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public class NettyContext implements DefaultContext, ChannelFutureListener {
104104
private String scheme;
105105
private int port;
106106
private boolean filesCreated;
107-
private NettyHandler connection;
107+
NettyHandler connection;
108108

109109
public NettyContext(
110110
NettyHandler connection,
@@ -530,7 +530,7 @@ public PrintWriter responseWriter(MediaType type) {
530530
public Sender responseSender() {
531531
prepareChunked();
532532
ctx.write(new DefaultHttpResponse(HTTP_1_1, status, setHeaders));
533-
return new NettySender(this, ctx);
533+
return new NettySender(this);
534534
}
535535

536536
@NonNull @Override
@@ -583,7 +583,7 @@ Context send(@NonNull ByteBuf data, CharSequence contentLength) {
583583
responseStarted = true;
584584
setHeaders.set(CONTENT_LENGTH, contentLength);
585585
var response = new DefaultFullHttpResponse(HTTP_1_1, status, data, setHeaders, NO_TRAILING);
586-
connection.writeHttpObject(response, promise(this));
586+
connection.writeMessage(response, promise(this));
587587
return this;
588588
} finally {
589589
requestComplete();
@@ -595,7 +595,7 @@ public Context send(@NonNull ReadableByteChannel channel) {
595595
try {
596596
prepareChunked();
597597
int bufferSize = contentLength > 0 ? (int) contentLength : this.bufferSize;
598-
connection.writeHttpChunk(
598+
connection.writeChunks(
599599
new DefaultHttpResponse(HTTP_1_1, status, setHeaders),
600600
new ChunkedNioStream(channel, bufferSize),
601601
EMPTY_LAST_CONTENT,
@@ -627,7 +627,7 @@ public Context send(@NonNull InputStream in) {
627627
long len = responseLength();
628628
ByteRange range = ByteRange.parse(req.headers().get(RANGE), len).apply(this);
629629
prepareChunked();
630-
connection.writeHttpChunk(
630+
connection.writeChunks(
631631
new DefaultHttpResponse(HTTP_1_1, status, setHeaders),
632632
new ChunkedStream(range.apply(in), bufferSize),
633633
EMPTY_LAST_CONTENT,
@@ -670,7 +670,7 @@ public Context send(@NonNull FileChannel file) {
670670
new HttpChunkedInput(
671671
new ChunkedNioFile(file, range.getStart(), range.getEnd(), bufferSize));
672672

673-
connection.writeHttpChunk(rsp, chunkedInput, promise(this));
673+
connection.writeChunks(rsp, chunkedInput, promise(this));
674674
// ctx.channel()
675675
// .eventLoop()
676676
// .execute(
@@ -681,7 +681,7 @@ public Context send(@NonNull FileChannel file) {
681681
// ctx.writeAndFlush(chunkedInput, promise(this));
682682
// });
683683
} else {
684-
connection.writeHttpChunk(
684+
connection.writeChunks(
685685
rsp,
686686
new DefaultFileRegion(file, range.getStart(), range.getEnd()),
687687
EMPTY_LAST_CONTENT,
@@ -744,7 +744,7 @@ public Context send(@NonNull StatusCode statusCode) {
744744
var rsp =
745745
new DefaultFullHttpResponse(
746746
HTTP_1_1, status, Unpooled.EMPTY_BUFFER, setHeaders, NO_TRAILING);
747-
connection.writeHttpObject(rsp, promise(this));
747+
connection.writeMessage(rsp, promise(this));
748748
return this;
749749
} finally {
750750
requestComplete();

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class NettyHandler extends ChannelInboundHandlerAdapter {
3636
private NettyContext context;
3737
private boolean read;
3838
private boolean flush;
39-
private ChannelHandlerContext channelContext;
39+
ChannelHandlerContext channelContext;
4040

4141
public NettyHandler(
4242
NettyDateService serverDate,
@@ -128,7 +128,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
128128
}
129129
}
130130

131-
public void writeHttpObject(Object msg, ChannelPromise promise) {
131+
public void writeMessage(Object msg, ChannelPromise promise) {
132132
if (this.channelContext.executor().inEventLoop()) {
133133
if (this.read) {
134134
this.flush = true;
@@ -137,31 +137,40 @@ public void writeHttpObject(Object msg, ChannelPromise promise) {
137137
this.channelContext.writeAndFlush(msg, promise);
138138
}
139139
} else {
140-
this.channelContext.executor().execute(() -> writeHttpObject(msg, promise));
140+
this.channelContext.executor().execute(() -> writeMessage(msg, promise));
141141
}
142142
}
143143

144-
public void writeHttpChunk(Object header, Object body, Object last, ChannelPromise promise) {
144+
public void writeChunks(Object header, Object body, Object last, ChannelPromise promise) {
145145
if (this.channelContext.executor().inEventLoop()) {
146146
// Headers
147-
channelContext.write(header, channelContext.voidPromise());
147+
var voidPromise = channelContext.voidPromise();
148+
channelContext.write(header, voidPromise);
148149
// Body
149-
channelContext.write(body, channelContext.voidPromise());
150+
channelContext.write(body, voidPromise);
150151
// Finish
151152
channelContext.writeAndFlush(last, promise);
152153
} else {
153-
this.channelContext.executor().execute(() -> writeHttpChunk(header, body, last, promise));
154+
this.channelContext.executor().execute(() -> writeChunks(header, body, last, promise));
155+
}
156+
}
157+
158+
public void write(SneakyThrows.Consumer<ChannelHandlerContext> action) {
159+
if (this.channelContext.executor().inEventLoop()) {
160+
action.accept(this.channelContext);
161+
} else {
162+
this.channelContext.executor().execute(() -> write(action));
154163
}
155164
}
156165

157-
public void writeHttpChunk(Object header, Object body, ChannelPromise promise) {
166+
public void writeChunks(Object header, Object body, ChannelPromise promise) {
158167
if (this.channelContext.executor().inEventLoop()) {
159168
// Headers
160169
channelContext.write(header, channelContext.voidPromise());
161170
// Body + Last
162171
channelContext.writeAndFlush(body, promise);
163172
} else {
164-
this.channelContext.executor().execute(() -> writeHttpChunk(header, body, promise));
173+
this.channelContext.executor().execute(() -> writeChunks(header, body, promise));
165174
}
166175
}
167176

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyOutputStream.java

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
public class NettyOutputStream extends OutputStream {
2121
private final ByteBuf buffer;
2222
private final NettyContext ctx;
23-
private final ChannelHandlerContext context;
2423
private final ChannelFutureListener closeListener;
2524
private HttpResponse headers;
2625
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -29,7 +28,6 @@ public NettyOutputStream(
2928
NettyContext ctx, ChannelHandlerContext context, int bufferSize, HttpResponse headers) {
3029
this.ctx = ctx;
3130
this.buffer = context.alloc().heapBuffer(bufferSize, bufferSize);
32-
this.context = context;
3331
this.headers = headers;
3432
this.closeListener = ctx;
3533
}
@@ -41,12 +39,6 @@ public void write(int b) {
4139

4240
@Override
4341
public void write(byte[] src, int off, int len) {
44-
write(src, off, len, null);
45-
}
46-
47-
public void write(byte[] src, int off, int len, ChannelFutureListener callback) {
48-
writeHeaders();
49-
5042
int dataLengthLeftToWrite = len;
5143
int dataToWriteOffset = off;
5244
int spaceLeftInCurrentChunk;
@@ -55,57 +47,65 @@ public void write(byte[] src, int off, int len, ChannelFutureListener callback)
5547
buffer.writeBytes(src, dataToWriteOffset, spaceLeftInCurrentChunk);
5648
dataToWriteOffset = dataToWriteOffset + spaceLeftInCurrentChunk;
5749
dataLengthLeftToWrite = dataLengthLeftToWrite - spaceLeftInCurrentChunk;
58-
flush(callback, null);
50+
flush(false);
5951
}
6052
if (dataLengthLeftToWrite > 0) {
6153
buffer.writeBytes(src, dataToWriteOffset, dataLengthLeftToWrite);
6254
}
6355
}
6456

65-
private void writeHeaders() {
66-
if (headers != null) {
67-
context.write(headers, context.voidPromise());
68-
headers = null;
69-
}
70-
}
71-
7257
@Override
7358
public void flush() throws IOException {
74-
flush(null, null);
59+
flush(false);
7560
}
7661

77-
private void flush(ChannelFutureListener callback, ChannelFutureListener listener) {
62+
private void flush(boolean close) {
7863
int chunkSize = buffer.readableBytes();
7964
if (chunkSize > 0) {
80-
if (listener != null) {
81-
if (callback == null) {
82-
context.write(new DefaultHttpContent(buffer.copy()), context.voidPromise());
83-
} else {
84-
context.write(new DefaultHttpContent(buffer.copy())).addListener(callback);
85-
}
86-
context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(listener);
87-
buffer.release();
65+
ByteBuf chunk;
66+
if (close) {
67+
// don't copy on close
68+
chunk = buffer;
8869
} else {
89-
if (callback == null) {
90-
context.write(new DefaultHttpContent(buffer.copy()), context.voidPromise());
91-
} else {
92-
context.write(new DefaultHttpContent(buffer.copy())).addListener(callback);
93-
}
70+
// make a copy on flush
71+
chunk = buffer.copy();
72+
}
73+
ctx.connection.write(
74+
context -> {
75+
writeHeaders(context);
76+
context.write(new DefaultHttpContent(chunk), context.voidPromise());
77+
if (close) {
78+
context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(closeListener);
79+
}
80+
});
81+
if (!close) {
82+
// reset
9483
buffer.clear();
9584
}
9685
} else {
97-
ChannelFuture future = context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
98-
if (listener != null) {
99-
future.addListener(listener);
100-
}
86+
ctx.connection.write(
87+
context -> {
88+
writeHeaders(context);
89+
ChannelFuture future = context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
90+
if (close) {
91+
future.addListener(closeListener);
92+
}
93+
});
94+
}
95+
}
96+
97+
private void writeHeaders(ChannelHandlerContext context) {
98+
if (headers != null) {
99+
context.write(headers, context.voidPromise());
100+
headers = null;
101101
}
102102
}
103103

104104
@Override
105105
public void close() {
106106
if (closed.compareAndSet(false, true)) {
107107
try {
108-
flush(null, closeListener);
108+
flush(true);
109109
} finally {
110110
ctx.requestComplete();
111111
}

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettySender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ public class NettySender implements Sender {
2121
private final NettyContext ctx;
2222
private final ChannelHandlerContext context;
2323

24-
public NettySender(NettyContext ctx, ChannelHandlerContext context) {
24+
public NettySender(NettyContext ctx) {
2525
this.ctx = ctx;
26-
this.context = context;
26+
this.context = ctx.ctx;
2727
}
2828

2929
@Override

modules/jooby-netty/src/test/java/io/jooby/internal/netty/Issue3554.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

0 commit comments

Comments
 (0)