Skip to content

Commit d87acd0

Browse files
author
Matt Howlett
authored
Fixed memory leak in SendOffsetsToTransaction (#1835)
1 parent 818380b commit d87acd0

File tree

1 file changed

+14
-4
lines changed

1 file changed

+14
-4
lines changed

src/Confluent.Kafka/Impl/SafeKafkaHandle.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -563,16 +563,19 @@ internal void AbortTransaction(int millisecondsTimeout)
563563

564564
internal void SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata, int millisecondsTimeout)
565565
{
566-
IntPtr offsetsPtr = GetCTopicPartitionList(offsets);
567-
568566
if (!(groupMetadata is ConsumerGroupMetadata))
569567
{
570568
throw new ArgumentException("groupMetadata object must be a value acquired via Consumer.ConsumerGroupMetadata.");
571569
}
572570
var serializedMetadata = ((ConsumerGroupMetadata)groupMetadata).serializedMetadata;
573-
var cgmdPtr = this.DeserializeConsumerGroupMetadata(serializedMetadata);
571+
572+
var cgmdPtr = IntPtr.Zero;
573+
var offsetsPtr = IntPtr.Zero;
574574
try
575575
{
576+
cgmdPtr = this.DeserializeConsumerGroupMetadata(serializedMetadata);
577+
offsetsPtr = GetCTopicPartitionList(offsets);
578+
576579
var error = new Error(Librdkafka.send_offsets_to_transaction(this.handle, offsetsPtr, cgmdPtr, (IntPtr)millisecondsTimeout));
577580
if (error.Code != ErrorCode.NoError)
578581
{
@@ -589,7 +592,14 @@ internal void SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset> offsets
589592
}
590593
finally
591594
{
592-
this.DestroyConsumerGroupMetadata(cgmdPtr);
595+
if (offsetsPtr != IntPtr.Zero)
596+
{
597+
Librdkafka.topic_partition_list_destroy(offsetsPtr);
598+
}
599+
if (cgmdPtr != IntPtr.Zero)
600+
{
601+
this.DestroyConsumerGroupMetadata(cgmdPtr);
602+
}
593603
}
594604
}
595605

0 commit comments

Comments
 (0)