Skip to content

Commit 0878fdb

Browse files
committed
netty: cleanup response listeners
- redo how we handle file cleanup (download or uploaded) - remove redundant checks
1 parent 5d84a42 commit 0878fdb

File tree

4 files changed

+85
-98
lines changed

4 files changed

+85
-98
lines changed

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyContext.java

Lines changed: 79 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import static io.netty.handler.codec.http.HttpHeaderNames.SET_COOKIE;
1515
import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
1616
import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
17-
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
1817
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
1918
import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT;
2019
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -36,12 +35,13 @@
3635

3736
import javax.net.ssl.SSLPeerUnverifiedException;
3837

38+
import org.slf4j.Logger;
39+
3940
import com.typesafe.config.Config;
4041
import edu.umd.cs.findbugs.annotations.NonNull;
4142
import edu.umd.cs.findbugs.annotations.Nullable;
4243
import io.jooby.*;
4344
import io.jooby.Cookie;
44-
import io.jooby.FileUpload;
4545
import io.jooby.output.Output;
4646
import io.jooby.value.Value;
4747
import io.netty.buffer.ByteBuf;
@@ -66,22 +66,51 @@
6666
import io.netty.handler.timeout.IdleStateHandler;
6767
import io.netty.util.IllegalReferenceCountException;
6868

69-
public class NettyContext implements DefaultContext, ChannelFutureListener {
69+
public class NettyContext implements DefaultContext {
70+
71+
private static class DestroyDecoder implements ChannelFutureListener {
72+
73+
private final Logger log;
7074

71-
private interface DeleteFileTask {
72-
void delete() throws IOException;
75+
private InterfaceHttpPostRequestDecoder decoder;
7376

74-
static DeleteFileTask of(FileUpload file) {
75-
return file::close;
77+
public DestroyDecoder(Logger log, InterfaceHttpPostRequestDecoder decoder) {
78+
this.log = log;
79+
this.decoder = decoder;
7680
}
7781

78-
static DeleteFileTask of(FileDownload file) {
79-
return () -> {
80-
var path = file.getFile();
81-
if (path != null) {
82-
Files.delete(path);
83-
}
84-
};
82+
@Override
83+
public void operationComplete(ChannelFuture future) {
84+
try {
85+
decoder.destroy();
86+
} catch (IllegalReferenceCountException ex) {
87+
log.trace("decoder was released already", ex);
88+
} catch (Exception ex) {
89+
log.debug("body decoder destroy resulted in exception", ex);
90+
}
91+
decoder = null;
92+
}
93+
}
94+
95+
private static class DeleteFileTask implements ChannelFutureListener {
96+
97+
private final Logger log;
98+
private final String filePath;
99+
private final SneakyThrows.Runnable deleteTask;
100+
101+
public DeleteFileTask(Logger log, String filePath, SneakyThrows.Runnable deleteTask) {
102+
this.log = log;
103+
this.filePath = filePath;
104+
this.deleteTask = deleteTask;
105+
}
106+
107+
@Override
108+
public void operationComplete(ChannelFuture future) {
109+
try {
110+
deleteTask.run();
111+
} catch (Exception ex) {
112+
log.debug("deletion of {} resulted in exception", filePath, ex);
113+
}
85114
}
86115
}
87116

@@ -102,7 +131,6 @@ static DeleteFileTask of(FileDownload file) {
102131
private boolean responseStarted;
103132
private QueryString query;
104133
private Formdata formdata;
105-
private List<DeleteFileTask> files;
106134
private Value headers;
107135
private Map<String, String> pathMap = Collections.EMPTY_MAP;
108136
private MediaType responseType;
@@ -118,8 +146,8 @@ static DeleteFileTask of(FileDownload file) {
118146
private String host;
119147
private String scheme;
120148
private int port;
121-
private boolean filesCreated;
122149
NettyHandler connection;
150+
private ChannelPromise responsePromise;
123151

124152
public NettyContext(
125153
NettyHandler connection,
@@ -598,7 +626,7 @@ Context send(@NonNull ByteBuf data, CharSequence contentLength) {
598626
responseStarted = true;
599627
setHeaders.set(CONTENT_LENGTH, contentLength);
600628
var response = new DefaultFullHttpResponse(HTTP_1_1, status, data, setHeaders, NO_TRAILING);
601-
connection.writeMessage(response, promise(this));
629+
connection.writeMessage(response, promise());
602630
return this;
603631
} finally {
604632
requestComplete();
@@ -614,7 +642,7 @@ public Context send(@NonNull ReadableByteChannel channel) {
614642
new DefaultHttpResponse(HTTP_1_1, status, setHeaders),
615643
new ChunkedNioStream(channel, bufferSize),
616644
EMPTY_LAST_CONTENT,
617-
promise(this));
645+
promise());
618646
return this;
619647
} finally {
620648
requestComplete();
@@ -624,7 +652,9 @@ public Context send(@NonNull ReadableByteChannel channel) {
624652
@Override
625653
public @NonNull Context send(@NonNull FileDownload file) {
626654
if (file.deleteOnComplete()) {
627-
register(DeleteFileTask.of(file));
655+
register(
656+
new DeleteFileTask(
657+
router.getLog(), file.getFile().toString(), () -> Files.delete(file.getFile())));
628658
}
629659
return DefaultContext.super.send(file);
630660
}
@@ -643,7 +673,7 @@ public Context send(@NonNull InputStream in) {
643673
new DefaultHttpResponse(HTTP_1_1, status, setHeaders),
644674
new ChunkedStream(range.apply(in), bufferSize),
645675
EMPTY_LAST_CONTENT,
646-
promise(this));
676+
promise());
647677
responseStarted = true;
648678
return this;
649679
} catch (Exception x) {
@@ -671,13 +701,13 @@ public Context send(@NonNull FileChannel file) {
671701
new HttpChunkedInput(
672702
new ChunkedNioFile(file, range.getStart(), range.getEnd(), bufferSize));
673703

674-
connection.writeChunks(rsp, chunkedInput, promise(this));
704+
connection.writeChunks(rsp, chunkedInput, promise());
675705
} else {
676706
connection.writeChunks(
677707
rsp,
678708
new DefaultFileRegion(file, range.getStart(), range.getEnd()),
679709
EMPTY_LAST_CONTENT,
680-
promise(this));
710+
promise());
681711
}
682712
} catch (IOException x) {
683713
throw SneakyThrows.propagate(x);
@@ -723,7 +753,7 @@ public Context send(@NonNull StatusCode statusCode) {
723753
var rsp =
724754
new DefaultFullHttpResponse(
725755
HTTP_1_1, status, Unpooled.EMPTY_BUFFER, setHeaders, NO_TRAILING);
726-
connection.writeMessage(rsp, promise(this));
756+
connection.writeMessage(rsp, promise());
727757
return this;
728758
} finally {
729759
requestComplete();
@@ -733,18 +763,6 @@ public Context send(@NonNull StatusCode statusCode) {
733763
void requestComplete() {
734764
fireCompleteEvent();
735765
ifSaveSession();
736-
destroy(null);
737-
}
738-
739-
@Override
740-
public void operationComplete(ChannelFuture future) {
741-
try {
742-
destroy(future.cause());
743-
} finally {
744-
if (!isKeepAlive(req)) {
745-
future.channel().close();
746-
}
747-
}
748766
}
749767

750768
private void fireCompleteEvent() {
@@ -765,56 +783,27 @@ private Session getSession() {
765783
return attributes == null ? null : (Session) attributes.get(Session.NAME);
766784
}
767785

768-
private ChannelPromise promise(ChannelFutureListener listener) {
769-
if (pendingTasks()) {
770-
return ctx.newPromise().addListener(listener);
771-
}
772-
return ctx.voidPromise();
786+
public ChannelPromise promise() {
787+
return responsePromise == null ? ctx.voidPromise() : responsePromise;
773788
}
774789

775-
private boolean pendingTasks() {
776-
return getSession() != null || filesCreated || decoder != null || listeners != null;
790+
public void setDecoder(InterfaceHttpPostRequestDecoder decoder) {
791+
this.decoder = decoder;
792+
responsePromise = getOrCreateResponsePromise();
793+
responsePromise.addListener(new DestroyDecoder(router.getLog(), decoder));
777794
}
778795

779-
void destroy(Throwable cause) {
780-
if (cause != null) {
781-
if (Server.connectionLost(cause)) {
782-
router
783-
.getLog()
784-
.debug(
785-
"exception found while sending response {} {}",
786-
getMethod(),
787-
getRequestPath(),
788-
cause);
789-
} else {
790-
router
791-
.getLog()
792-
.error(
793-
"exception found while sending response {} {}",
794-
getMethod(),
795-
getRequestPath(),
796-
cause);
797-
}
798-
}
799-
if (files != null) {
800-
for (var task : files) {
801-
try {
802-
task.delete();
803-
} catch (Exception x) {
804-
router.getLog().debug("file destroy resulted in exception", x);
805-
}
806-
}
807-
files = null;
808-
}
809-
if (decoder != null) {
810-
try {
811-
decoder.destroy();
812-
} catch (IllegalReferenceCountException ex) {
813-
router.getLog().trace("decoder was released already", ex);
814-
} catch (Exception ex) {
815-
router.getLog().debug("body decoder destroy resulted in exception", ex);
816-
}
817-
decoder = null;
796+
void log(Throwable cause) {
797+
if (Server.connectionLost(cause)) {
798+
router
799+
.getLog()
800+
.debug(
801+
"exception found while sending response {} {}", getMethod(), getRequestPath(), cause);
802+
} else {
803+
router
804+
.getLog()
805+
.error(
806+
"exception found while sending response {} {}", getMethod(), getRequestPath(), cause);
818807
}
819808
}
820809

@@ -825,11 +814,15 @@ private NettyOutputStream newOutputStream() {
825814
}
826815

827816
private void register(DeleteFileTask deleteFileTask) {
828-
filesCreated = true;
829-
if (this.files == null) {
830-
this.files = new ArrayList<>();
817+
responsePromise = getOrCreateResponsePromise();
818+
responsePromise.addListener(deleteFileTask);
819+
}
820+
821+
private ChannelPromise getOrCreateResponsePromise() {
822+
if (responsePromise == null) {
823+
responsePromise = ctx.newPromise();
831824
}
832-
this.files.add(deleteFileTask);
825+
return responsePromise;
833826
}
834827

835828
private void decodeForm(Formdata form) {
@@ -844,7 +837,7 @@ private void decodeForm(Formdata form) {
844837
var fileUpoad =
845838
new NettyFileUpload(
846839
router.getTmpdir(), (io.netty.handler.codec.http.multipart.FileUpload) next);
847-
register(DeleteFileTask.of(fileUpoad));
840+
register(new DeleteFileTask(router.getLog(), fileUpoad.getFileName(), fileUpoad::close));
848841
form.put(next.getName(), fileUpoad);
849842
} else {
850843
form.put(next.getName(), next.getString(UTF_8));

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
8484
if (contentLength > 0 || isTransferEncodingChunked(req)) {
8585
context.httpDataFactory = new DefaultHttpDataFactory(bufferSize);
8686
context.httpDataFactory.setBaseDir(app.getTmpdir().toString());
87-
context.decoder = newDecoder(req, context.httpDataFactory);
87+
context.setDecoder(newDecoder(req, context.httpDataFactory));
8888
} else {
8989
// no body, move on
9090
router.match(context).execute(context);

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyOutputStream.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
import java.util.concurrent.atomic.AtomicBoolean;
1111

1212
import io.netty.buffer.ByteBuf;
13-
import io.netty.channel.ChannelFuture;
14-
import io.netty.channel.ChannelFutureListener;
1513
import io.netty.channel.ChannelHandlerContext;
1614
import io.netty.handler.codec.http.DefaultHttpContent;
1715
import io.netty.handler.codec.http.HttpResponse;
@@ -20,7 +18,6 @@
2018
public class NettyOutputStream extends OutputStream {
2119
private final ByteBuf buffer;
2220
private final NettyContext ctx;
23-
private final ChannelFutureListener closeListener;
2421
private HttpResponse headers;
2522
private final AtomicBoolean closed = new AtomicBoolean(false);
2623

@@ -29,7 +26,6 @@ public NettyOutputStream(
2926
this.ctx = ctx;
3027
this.buffer = context.alloc().heapBuffer(bufferSize, bufferSize);
3128
this.headers = headers;
32-
this.closeListener = ctx;
3329
}
3430

3531
@Override
@@ -75,7 +71,7 @@ private void flush(boolean close) {
7571
writeHeaders(context);
7672
context.write(new DefaultHttpContent(chunk), context.voidPromise());
7773
if (close) {
78-
context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(closeListener);
74+
context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, ctx.promise());
7975
}
8076
});
8177
if (!close) {
@@ -86,10 +82,8 @@ private void flush(boolean close) {
8682
ctx.connection.write(
8783
context -> {
8884
writeHeaders(context);
89-
ChannelFuture future = context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
90-
if (close) {
91-
future.addListener(closeListener);
92-
}
85+
var promise = close ? ctx.promise() : context.voidPromise();
86+
context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, promise);
9387
});
9488
}
9589
}

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettySender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public Sender write(@NonNull Output output, @NonNull Callback callback) {
4444

4545
@Override
4646
public void close() {
47-
context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ctx);
47+
context.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, ctx.promise());
4848
ctx.requestComplete();
4949
}
5050

@@ -58,7 +58,7 @@ private static ChannelFutureListener newChannelFutureListener(
5858
try {
5959
callback.onComplete(ctx, cause);
6060
} finally {
61-
ctx.destroy(cause);
61+
ctx.log(cause);
6262
}
6363
}
6464
};

0 commit comments

Comments
 (0)