@@ -47,7 +47,7 @@ private enum States {
4747 private boolean decodingCompressedBuffer = false ;
4848
4949 @ Override
50- protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws Exception {
50+ protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws InvalidFrameProtocolException , IOException {
5151 if (!hasEnoughBytes (in )) {
5252 if (decodingCompressedBuffer ){
5353 throw new InvalidFrameProtocolException ("Insufficient bytes in compressed content to decode: " + currentState );
@@ -124,7 +124,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
124124 int fieldsCount = (int ) in .readUnsignedInt ();
125125 int count = 0 ;
126126
127- if (fieldsCount <= 0 ) {
127+ if (fieldsCount <= 0 ) {
128128 throw new InvalidFrameProtocolException ("Invalid number of fields, received: " + fieldsCount );
129129 }
130130
@@ -178,20 +178,19 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
178178
179179 case READ_COMPRESSED_FRAME : {
180180 logger .trace ("Running: READ_COMPRESSED_FRAME" );
181- // Use the compressed size as the safe start for the buffer.
182- ByteBuf buffer = inflateCompressedFrame (ctx , in );
183- transition (States .READ_HEADER );
181+ inflateCompressedFrame (ctx , in , (buffer ) -> {
182+ transition (States .READ_HEADER );
184183
185- decodingCompressedBuffer = true ;
186- try {
187- while (buffer .readableBytes () > 0 ) {
188- decode (ctx , buffer , out );
184+ decodingCompressedBuffer = true ;
185+ try {
186+ while (buffer .readableBytes () > 0 ) {
187+ decode (ctx , buffer , out );
188+ }
189+ } finally {
190+ decodingCompressedBuffer = false ;
191+ transition (States .READ_HEADER );
189192 }
190- } finally {
191- decodingCompressedBuffer = false ;
192- buffer .release ();
193- transition (States .READ_HEADER );
194- }
193+ });
195194 break ;
196195 }
197196 case READ_JSON : {
@@ -211,18 +210,28 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
211210 }
212211 }
213212
214- private ByteBuf inflateCompressedFrame (final ChannelHandlerContext ctx , final ByteBuf in ) throws IOException {
213+ private void inflateCompressedFrame (final ChannelHandlerContext ctx , final ByteBuf in , final CheckedConsumer <ByteBuf > fn )
214+ throws IOException {
215+ // Use the compressed size as the safe start for the buffer.
215216 ByteBuf buffer = ctx .alloc ().buffer (requiredBytes );
217+ try {
218+ decompressImpl (in , buffer );
219+ fn .accept (buffer );
220+ } finally {
221+ buffer .release ();
222+ }
223+ }
224+
225+ private void decompressImpl (final ByteBuf in , final ByteBuf out ) throws IOException {
216226 Inflater inflater = new Inflater ();
217227 try (
218- ByteBufOutputStream buffOutput = new ByteBufOutputStream (buffer );
228+ ByteBufOutputStream buffOutput = new ByteBufOutputStream (out );
219229 InflaterOutputStream inflaterStream = new InflaterOutputStream (buffOutput , inflater )
220230 ) {
221231 in .readBytes (inflaterStream , requiredBytes );
222- }finally {
232+ } finally {
223233 inflater .end ();
224234 }
225- return buffer ;
226235 }
227236
228237 private boolean hasEnoughBytes (ByteBuf in ) {
@@ -234,7 +243,7 @@ private void transition(States next) {
234243 }
235244
236245 private void transition (States nextState , int requiredBytes ) {
237- if (logger .isTraceEnabled ()) {
246+ if (logger .isTraceEnabled ()) {
238247 logger .trace ("Transition, from: " + currentState + ", to: " + nextState + ", requiring " + requiredBytes + " bytes" );
239248 }
240249 this .currentState = nextState ;
@@ -247,4 +256,9 @@ private void batchComplete() {
247256 batch = null ;
248257 }
249258
259+ @ FunctionalInterface
260+ private interface CheckedConsumer <T > {
261+ void accept (T t ) throws IOException ;
262+ }
263+
250264}
0 commit comments