Skip to content

Commit b321c72

Browse files
committed
netty/jetty: HTTP Outputstream.close method must close once and ignore any other call
- fix #3554
1 parent 99ac09c commit b321c72

File tree

6 files changed

+162
-7
lines changed

6 files changed

+162
-7
lines changed

modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyOutputStream.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
package io.jooby.internal.jetty;
77

88
import java.io.OutputStream;
9+
import java.util.concurrent.atomic.AtomicBoolean;
910

1011
import io.jooby.SneakyThrows;
1112

1213
public class JettyOutputStream extends OutputStream {
1314

14-
private JettyContext jetty;
15-
private OutputStream out;
15+
private final JettyContext jetty;
16+
private final OutputStream out;
17+
private AtomicBoolean closed = new AtomicBoolean(false);
1618

1719
public JettyOutputStream(OutputStream out, JettyContext jetty) {
1820
this.out = out;
@@ -41,7 +43,9 @@ public void flush() {
4143

4244
@Override
4345
public void close() {
44-
block(out::close, true);
46+
if (closed.compareAndSet(false, true)) {
47+
block(out::close, true);
48+
}
4549
}
4650

4751
private void block(SneakyThrows.Runnable task, boolean complete) {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.internal.jetty;
7+
8+
import static org.mockito.Mockito.*;
9+
10+
import java.io.IOException;
11+
import java.io.OutputStream;
12+
13+
import org.junit.jupiter.api.Test;
14+
15+
public class Issue3554 {
16+
17+
@Test
18+
public void shouldCloseOutputStreamOnce() throws IOException {
19+
var out = mock(OutputStream.class);
20+
var ctx = mock(JettyContext.class);
21+
22+
var jettyOutputStream = new JettyOutputStream(out, ctx);
23+
jettyOutputStream.close();
24+
jettyOutputStream.close();
25+
26+
verify(out, times(1)).close();
27+
}
28+
}

modules/jooby-netty/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,18 @@
7070
<classifier>runtime</classifier>
7171
<scope>test</scope>
7272
</dependency>
73+
74+
<dependency>
75+
<groupId>org.mockito</groupId>
76+
<artifactId>mockito-core</artifactId>
77+
<scope>test</scope>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.slf4j</groupId>
81+
<artifactId>slf4j-simple</artifactId>
82+
<version>${slf4j.version}</version>
83+
<scope>test</scope>
84+
</dependency>
7385
</dependencies>
7486

7587
<profiles>

modules/jooby-netty/src/main/java/io/jooby/internal/netty/NettyOutputStream.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.io.IOException;
99
import java.io.OutputStream;
10+
import java.util.concurrent.atomic.AtomicBoolean;
1011

1112
import io.netty.buffer.ByteBuf;
1213
import io.netty.channel.ChannelFuture;
@@ -22,6 +23,7 @@ public class NettyOutputStream extends OutputStream {
2223
private final ChannelHandlerContext context;
2324
private final ChannelFutureListener closeListener;
2425
private HttpResponse headers;
26+
private AtomicBoolean closed = new AtomicBoolean(false);
2527

2628
public NettyOutputStream(
2729
NettyContext ctx, ChannelHandlerContext context, int bufferSize, HttpResponse headers) {
@@ -106,10 +108,12 @@ private void flush(ChannelFutureListener callback, ChannelFutureListener listene
106108

107109
@Override
108110
public void close() {
109-
try {
110-
flush(null, closeListener);
111-
} finally {
112-
ctx.requestComplete();
111+
if (closed.compareAndSet(false, true)) {
112+
try {
113+
flush(null, closeListener);
114+
} finally {
115+
ctx.requestComplete();
116+
}
113117
}
114118
}
115119
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.internal.netty;
7+
8+
import static org.mockito.Mockito.*;
9+
10+
import java.io.IOException;
11+
12+
import org.junit.jupiter.api.Test;
13+
14+
import io.netty.buffer.ByteBuf;
15+
import io.netty.buffer.ByteBufAllocator;
16+
import io.netty.channel.ChannelFuture;
17+
import io.netty.channel.ChannelHandlerContext;
18+
import io.netty.handler.codec.http.HttpResponse;
19+
import io.netty.handler.codec.http.LastHttpContent;
20+
21+
public class Issue3554 {
22+
23+
@Test
24+
public void shouldCloseOutputStreamOnce() throws IOException {
25+
var ctx = mock(NettyContext.class);
26+
var headers = mock(HttpResponse.class);
27+
28+
var buffer = mock(ByteBuf.class);
29+
when(buffer.readableBytes()).thenReturn(0);
30+
31+
var bufferAllocator = mock(ByteBufAllocator.class);
32+
when(bufferAllocator.buffer(0, 1024)).thenReturn(buffer);
33+
34+
var future = mock(ChannelFuture.class);
35+
var channelContext = mock(ChannelHandlerContext.class);
36+
when(channelContext.alloc()).thenReturn(bufferAllocator);
37+
when(channelContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT)).thenReturn(future);
38+
39+
var httpOutputStream = new NettyOutputStream(ctx, channelContext, 1024, headers);
40+
httpOutputStream.close();
41+
httpOutputStream.close();
42+
43+
verify(channelContext, times(1)).writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
44+
}
45+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.i3554;
7+
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
10+
import java.io.BufferedOutputStream;
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
16+
import io.jooby.ExecutionMode;
17+
import io.jooby.ReactiveSupport;
18+
import io.jooby.ServerOptions;
19+
import io.jooby.junit.ServerTest;
20+
import io.jooby.junit.ServerTestRunner;
21+
22+
public class Issue3554 {
23+
@ServerTest(executionMode = ExecutionMode.EVENT_LOOP)
24+
public void shouldNotThrowErrorOnCompletableWithSideEffect(ServerTestRunner runner) {
25+
runner
26+
.define(
27+
app -> {
28+
ExecutorService threadPool = Executors.newSingleThreadExecutor();
29+
30+
var serverOptions = new ServerOptions().setPort(9000).setDefaultHeaders(false);
31+
app.setServerOptions(serverOptions);
32+
33+
app.use(ReactiveSupport.concurrent());
34+
35+
app.onStop(threadPool::shutdown);
36+
37+
app.get(
38+
"/3554",
39+
ctx ->
40+
CompletableFuture.supplyAsync(
41+
() -> {
42+
try (var os = ctx.responseStream();
43+
var bos = new BufferedOutputStream(os)) {
44+
bos.write("test".getBytes(StandardCharsets.UTF_8));
45+
} catch (Exception ex) {
46+
throw new RuntimeException(ex);
47+
}
48+
return ctx;
49+
},
50+
threadPool));
51+
})
52+
.ready(
53+
http -> {
54+
http.get(
55+
"/3554",
56+
rsp -> {
57+
assertEquals("test", rsp.body().string());
58+
assertEquals(200, rsp.code());
59+
});
60+
});
61+
}
62+
}

0 commit comments

Comments
 (0)