Skip to content

Commit 23365f0

Browse files
lhotarisrinath-ctds
authored andcommitted
[fix][client] Fix memory leak when message size exceeds max message size and batching is enabled (apache#23967)
(cherry picked from commit 2620871)
1 parent 81fd0e6 commit 23365f0

File tree

1 file changed

+2
-0
lines changed

1 file changed

+2
-0
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
273273

274274
// handle mgs size check as non-batched in `ProducerImpl.isMessageSizeExceeded`
275275
if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) {
276+
cmd.release();
276277
producer.semaphoreRelease(1);
277278
producer.client.getMemoryLimitController().releaseMemory(
278279
messages.get(0).getUncompressedSize() + batchAllocatedSizeBytes);
@@ -286,6 +287,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
286287
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
287288
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
288289
if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
290+
encryptedPayload.release();
289291
producer.semaphoreRelease(messages.size());
290292
messages.forEach(msg -> producer.client.getMemoryLimitController()
291293
.releaseMemory(msg.getUncompressedSize()));

0 commit comments

Comments
 (0)