@@ -724,19 +724,27 @@ private ResponseBuffers receiveResponseBuffers(final int additionalTimeout) thro
724
724
}
725
725
726
726
ByteBuf messageBuffer = stream .read (messageHeader .getMessageLength () - MESSAGE_HEADER_LENGTH , additionalTimeout );
727
+ boolean releaseMessageBuffer = true ;
728
+ try {
729
+ if (messageHeader .getOpCode () == OP_COMPRESSED .getValue ()) {
730
+ CompressedHeader compressedHeader = new CompressedHeader (messageBuffer , messageHeader );
727
731
728
- if (messageHeader .getOpCode () == OP_COMPRESSED .getValue ()) {
729
- CompressedHeader compressedHeader = new CompressedHeader (messageBuffer , messageHeader );
730
-
731
- Compressor compressor = getCompressor (compressedHeader );
732
+ Compressor compressor = getCompressor (compressedHeader );
732
733
733
- ByteBuf buffer = getBuffer (compressedHeader .getUncompressedSize ());
734
- compressor .uncompress (messageBuffer , buffer );
734
+ ByteBuf buffer = getBuffer (compressedHeader .getUncompressedSize ());
735
+ compressor .uncompress (messageBuffer , buffer );
735
736
736
- buffer .flip ();
737
- return new ResponseBuffers (new ReplyHeader (buffer , compressedHeader ), buffer );
738
- } else {
739
- return new ResponseBuffers (new ReplyHeader (messageBuffer , messageHeader ), messageBuffer );
737
+ buffer .flip ();
738
+ return new ResponseBuffers (new ReplyHeader (buffer , compressedHeader ), buffer );
739
+ } else {
740
+ ResponseBuffers responseBuffers = new ResponseBuffers (new ReplyHeader (messageBuffer , messageHeader ), messageBuffer );
741
+ releaseMessageBuffer = false ;
742
+ return responseBuffers ;
743
+ }
744
+ } finally {
745
+ if (releaseMessageBuffer ) {
746
+ messageBuffer .release ();
747
+ }
740
748
}
741
749
}
742
750
@@ -792,6 +800,7 @@ public void onResult(final ByteBuf result, final Throwable t) {
792
800
callback .onResult (null , t );
793
801
return ;
794
802
}
803
+ boolean releaseResult = true ;
795
804
try {
796
805
ReplyHeader replyHeader ;
797
806
ByteBuf responseBuffer ;
@@ -806,15 +815,21 @@ public void onResult(final ByteBuf result, final Throwable t) {
806
815
replyHeader = new ReplyHeader (buffer , compressedHeader );
807
816
responseBuffer = buffer ;
808
817
} finally {
818
+ releaseResult = false ;
809
819
result .release ();
810
820
}
811
821
} else {
812
822
replyHeader = new ReplyHeader (result , messageHeader );
813
823
responseBuffer = result ;
824
+ releaseResult = false ;
814
825
}
815
826
callback .onResult (new ResponseBuffers (replyHeader , responseBuffer ), null );
816
827
} catch (Throwable localThrowable ) {
817
828
callback .onResult (null , localThrowable );
829
+ } finally {
830
+ if (releaseResult ) {
831
+ result .release ();
832
+ }
818
833
}
819
834
}
820
835
}
0 commit comments