3838import io .grpc .internal .WritableBuffer ;
3939import io .netty .buffer .ByteBuf ;
4040import io .netty .buffer .ByteBufAllocator ;
41- import io .netty .buffer .ByteBufUtil ;
4241import io .netty .buffer .CompositeByteBuf ;
4342import io .netty .buffer .Unpooled ;
4443import io .netty .buffer .UnpooledByteBufAllocator ;
6867import java .nio .ByteBuffer ;
6968import java .util .concurrent .Delayed ;
7069import java .util .concurrent .TimeUnit ;
70+ import org .junit .After ;
7171import org .junit .Assert ;
7272import org .junit .Test ;
7373import org .junit .runner .RunWith ;
8484public abstract class NettyHandlerTestBase <T extends Http2ConnectionHandler > {
8585
8686 protected static final int STREAM_ID = 3 ;
87- private ByteBuf content ;
8887
8988 private EmbeddedChannel channel ;
9089
@@ -106,18 +105,24 @@ protected void manualSetUp() throws Exception {}
106105 protected final TransportTracer transportTracer = new TransportTracer ();
107106 protected int flowControlWindow = DEFAULT_WINDOW_SIZE ;
108107 protected boolean autoFlowControl = false ;
109-
110108 private final FakeClock fakeClock = new FakeClock ();
111109
112110 FakeClock fakeClock () {
113111 return fakeClock ;
114112 }
115113
114+ @ After
115+ public void tearDown () throws Exception {
116+ if (channel () != null ) {
117+ channel ().releaseInbound ();
118+ channel ().releaseOutbound ();
119+ }
120+ }
121+
116122 /**
117123 * Must be called by subclasses to initialize the handler and channel.
118124 */
119125 protected final void initChannel (Http2HeadersDecoder headersDecoder ) throws Exception {
120- content = Unpooled .copiedBuffer ("hello world" , UTF_8 );
121126 frameWriter = mock (Http2FrameWriter .class , delegatesTo (new DefaultHttp2FrameWriter ()));
122127 frameReader = new DefaultHttp2FrameReader (headersDecoder );
123128
@@ -233,11 +238,11 @@ protected final Http2FrameReader frameReader() {
233238 }
234239
235240 protected final ByteBuf content () {
236- return content ;
241+ return Unpooled . copiedBuffer ( contentAsArray ()) ;
237242 }
238243
239244 protected final byte [] contentAsArray () {
240- return ByteBufUtil .getBytes (content () );
245+ return " \000 \000 \000 \000 \r hello world" .getBytes (UTF_8 );
241246 }
242247
243248 protected final Http2FrameWriter verifyWrite () {
@@ -252,8 +257,8 @@ protected final void channelRead(Object obj) throws Exception {
252257 channel .writeInbound (obj );
253258 }
254259
255- protected ByteBuf grpcDataFrame ( int streamId , boolean endStream , byte [] content ) {
256- final ByteBuf compressionFrame = Unpooled .buffer (content .length );
260+ protected ByteBuf grpcFrame ( byte [] message ) {
261+ final ByteBuf compressionFrame = Unpooled .buffer (message .length );
257262 MessageFramer framer = new MessageFramer (
258263 new MessageFramer .Sink () {
259264 @ Override
@@ -262,23 +267,22 @@ public void deliverFrame(
262267 if (frame != null ) {
263268 ByteBuf bytebuf = ((NettyWritableBuffer ) frame ).bytebuf ();
264269 compressionFrame .writeBytes (bytebuf );
270+ bytebuf .release ();
265271 }
266272 }
267273 },
268274 new NettyWritableBufferAllocator (ByteBufAllocator .DEFAULT ),
269275 StatsTraceContext .NOOP );
270- framer .writePayload (new ByteArrayInputStream (content ));
271- framer .flush ();
272- ChannelHandlerContext ctx = newMockContext ();
273- new DefaultHttp2FrameWriter ().writeData (ctx , streamId , compressionFrame , 0 , endStream ,
274- newPromise ());
275- return captureWrite (ctx );
276+ framer .writePayload (new ByteArrayInputStream (message ));
277+ framer .close ();
278+ return compressionFrame ;
276279 }
277280
278- protected final ByteBuf dataFrame (int streamId , boolean endStream , ByteBuf content ) {
279- // Need to retain the content since the frameWriter releases it.
280- content . retain ();
281+ protected final ByteBuf grpcDataFrame (int streamId , boolean endStream , byte [] content ) {
282+ return dataFrame ( streamId , endStream , grpcFrame ( content ));
283+ }
281284
285+ protected final ByteBuf dataFrame (int streamId , boolean endStream , ByteBuf content ) {
282286 ChannelHandlerContext ctx = newMockContext ();
283287 new DefaultHttp2FrameWriter ().writeData (ctx , streamId , content , 0 , endStream , newPromise ());
284288 return captureWrite (ctx );
@@ -410,6 +414,7 @@ public void dataSizeSincePingAccumulates() throws Exception {
410414 channelRead (dataFrame (3 , false , buff .copy ()));
411415
412416 assertEquals (length * 3 , handler .flowControlPing ().getDataSincePing ());
417+ buff .release ();
413418 }
414419
415420 @ Test
@@ -608,12 +613,14 @@ public void bdpPingWindowResizing() throws Exception {
608613
609614 private void readPingAck (long pingData ) throws Exception {
610615 channelRead (pingFrame (true , pingData ));
616+ channel ().releaseOutbound ();
611617 }
612618
613619 private void readXCopies (int copies , byte [] data ) throws Exception {
614620 for (int i = 0 ; i < copies ; i ++) {
615621 channelRead (grpcDataFrame (STREAM_ID , false , data )); // buffer it
616622 stream ().request (1 ); // consume it
623+ channel ().releaseOutbound ();
617624 }
618625 }
619626
0 commit comments