Skip to content

Commit aa5824a

Browse files
kulabunKonstantin LabunstIncMale
committed
ByteBuffer is not released when compression is enabled, causing pool leakage (#883)
JAVA-4510 Co-authored-by: Konstantin Labun <[email protected]> Co-authored-by: Valentin Kovalenko <[email protected]>
1 parent 8624506 commit aa5824a

File tree

1 file changed

+25
-10
lines changed

1 file changed

+25
-10
lines changed

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -585,19 +585,27 @@ private ResponseBuffers receiveResponseBuffers() throws IOException {
585585
}
586586

587587
ByteBuf messageBuffer = stream.read(messageHeader.getMessageLength() - MESSAGE_HEADER_LENGTH);
588+
boolean releaseMessageBuffer = true;
589+
try {
590+
if (messageHeader.getOpCode() == OP_COMPRESSED.getValue()) {
591+
CompressedHeader compressedHeader = new CompressedHeader(messageBuffer, messageHeader);
588592

589-
if (messageHeader.getOpCode() == OP_COMPRESSED.getValue()) {
590-
CompressedHeader compressedHeader = new CompressedHeader(messageBuffer, messageHeader);
591-
592-
Compressor compressor = getCompressor(compressedHeader);
593+
Compressor compressor = getCompressor(compressedHeader);
593594

594-
ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize());
595-
compressor.uncompress(messageBuffer, buffer);
595+
ByteBuf buffer = getBuffer(compressedHeader.getUncompressedSize());
596+
compressor.uncompress(messageBuffer, buffer);
596597

597-
buffer.flip();
598-
return new ResponseBuffers(new ReplyHeader(buffer, compressedHeader), buffer);
599-
} else {
600-
return new ResponseBuffers(new ReplyHeader(messageBuffer, messageHeader), messageBuffer);
598+
buffer.flip();
599+
return new ResponseBuffers(new ReplyHeader(buffer, compressedHeader), buffer);
600+
} else {
601+
ResponseBuffers responseBuffers = new ResponseBuffers(new ReplyHeader(messageBuffer, messageHeader), messageBuffer);
602+
releaseMessageBuffer = false;
603+
return responseBuffers;
604+
}
605+
} finally {
606+
if (releaseMessageBuffer) {
607+
messageBuffer.release();
608+
}
601609
}
602610
}
603611

@@ -653,6 +661,7 @@ public void onResult(final ByteBuf result, final Throwable t) {
653661
callback.onResult(null, t);
654662
return;
655663
}
664+
boolean releaseResult = true;
656665
try {
657666
ReplyHeader replyHeader;
658667
ByteBuf responseBuffer;
@@ -667,15 +676,21 @@ public void onResult(final ByteBuf result, final Throwable t) {
667676
replyHeader = new ReplyHeader(buffer, compressedHeader);
668677
responseBuffer = buffer;
669678
} finally {
679+
releaseResult = false;
670680
result.release();
671681
}
672682
} else {
673683
replyHeader = new ReplyHeader(result, messageHeader);
674684
responseBuffer = result;
685+
releaseResult = false;
675686
}
676687
callback.onResult(new ResponseBuffers(replyHeader, responseBuffer), null);
677688
} catch (Throwable localThrowable) {
678689
callback.onResult(null, localThrowable);
690+
} finally {
691+
if (releaseResult) {
692+
result.release();
693+
}
679694
}
680695
}
681696
}

0 commit comments

Comments
 (0)