21
21
import org .elasticsearch .common .bytes .ReleasableBytesReference ;
22
22
import org .elasticsearch .http .HttpBody ;
23
23
import org .elasticsearch .test .ESTestCase ;
24
- import org .junit .Before ;
25
24
26
25
import java .util .ArrayList ;
27
26
import java .util .concurrent .atomic .AtomicBoolean ;
@@ -33,20 +32,26 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
33
32
Netty4HttpRequestBodyStream stream ;
34
33
static HttpBody .ChunkHandler discardHandler = (chunk , isLast ) -> chunk .close ();
35
34
36
- @ Before
37
- public void createStream () {
35
+ @ Override
36
+ public void setUp () throws Exception {
37
+ super .setUp ();
38
38
channel = new EmbeddedChannel ();
39
39
stream = new Netty4HttpRequestBodyStream (channel );
40
40
stream .setHandler (discardHandler ); // set default handler, each test might override one
41
- channel .pipeline ().addLast (new SimpleChannelInboundHandler <HttpContent >() {
41
+ channel .pipeline ().addLast (new SimpleChannelInboundHandler <HttpContent >(false ) {
42
42
@ Override
43
43
protected void channelRead0 (ChannelHandlerContext ctx , HttpContent msg ) {
44
- msg .retain ();
45
44
stream .handleNettyContent (msg );
46
45
}
47
46
});
48
47
}
49
48
49
+ @ Override
50
+ public void tearDown () throws Exception {
51
+ super .tearDown ();
52
+ stream .close ();
53
+ }
54
+
50
55
// ensures that no chunks are sent downstream without request
51
56
public void testEnqueueChunksBeforeRequest () {
52
57
var totalChunks = randomIntBetween (1 , 100 );
@@ -63,6 +68,7 @@ public void testFlushAllReceivedChunks() {
63
68
stream .setHandler ((chunk , isLast ) -> {
64
69
chunks .add (chunk );
65
70
totalBytes .addAndGet (chunk .length ());
71
+ chunk .close ();
66
72
});
67
73
68
74
var chunkSize = 1024 ;
@@ -84,6 +90,7 @@ public void testReadFromChannel() {
84
90
stream .setHandler ((chunk , isLast ) -> {
85
91
gotChunks .add (chunk );
86
92
gotLast .set (isLast );
93
+ chunk .close ();
87
94
});
88
95
channel .pipeline ().addFirst (new FlowControlHandler ()); // block all incoming messages, need explicit channel.read()
89
96
var chunkSize = 1024 ;
0 commit comments