Skip to content

Commit 461a9a6

Browse files
lhotarihanmz
authored andcommitted
[fix][ml] Fix memory leaks in ManagedCursorInfo and ManagedLedgerInfo decompression and compression (#23960)
1 parent 82738cc commit 461a9a6

File tree

1 file changed

+19
-15
lines changed

1 file changed

+19
-15
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,13 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
456456
try {
457457
MLDataFormats.ManagedLedgerInfoMetadata metadata =
458458
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
459-
return ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
460-
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
459+
ByteBuf uncompressed = getCompressionCodec(metadata.getCompressionType())
460+
.decode(byteBuf, metadata.getUncompressedSize());
461+
try {
462+
return ManagedLedgerInfo.parseFrom(uncompressed.nioBuffer());
463+
} finally {
464+
uncompressed.release();
465+
}
461466
} catch (Exception e) {
462467
log.error("Failed to parse managedLedgerInfo metadata, "
463468
+ "fall back to parse managedLedgerInfo directly.", e);
@@ -478,8 +483,13 @@ public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProto
478483
try {
479484
MLDataFormats.ManagedCursorInfoMetadata metadata =
480485
MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
481-
return ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
482-
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
486+
ByteBuf uncompressed = getCompressionCodec(metadata.getCompressionType())
487+
.decode(byteBuf, metadata.getUncompressedSize());
488+
try {
489+
return ManagedCursorInfo.parseFrom(uncompressed.nioBuffer());
490+
} finally {
491+
uncompressed.release();
492+
}
483493
} catch (Exception e) {
484494
log.error("Failed to parse ManagedCursorInfo metadata, "
485495
+ "fall back to parse ManagedCursorInfo directly", e);
@@ -503,29 +513,23 @@ private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSer
503513
if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
504514
return info;
505515
}
506-
ByteBuf metadataByteBuf = null;
507-
ByteBuf encodeByteBuf = null;
516+
517+
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
508518
try {
509-
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
519+
ByteBuf metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
510520
metadataSerializedSize + 6);
511521
metadataByteBuf.writeShort(MAGIC_MANAGED_INFO_METADATA);
512522
metadataByteBuf.writeInt(metadataSerializedSize);
513523
metadataByteBuf.writeBytes(metadata);
514-
encodeByteBuf = getCompressionCodec(compressionType)
524+
ByteBuf encodeByteBuf = getCompressionCodec(compressionType)
515525
.encode(Unpooled.wrappedBuffer(info));
516-
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
517526
compositeByteBuf.addComponent(true, metadataByteBuf);
518527
compositeByteBuf.addComponent(true, encodeByteBuf);
519528
byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
520529
compositeByteBuf.readBytes(dataBytes);
521530
return dataBytes;
522531
} finally {
523-
if (metadataByteBuf != null) {
524-
metadataByteBuf.release();
525-
}
526-
if (encodeByteBuf != null) {
527-
encodeByteBuf.release();
528-
}
532+
compositeByteBuf.release();
529533
}
530534
}
531535

0 commit comments

Comments
 (0)