Skip to content

Commit bb8f2e4

Browse files
DHiranimhowlett
authored andcommitted
ProduceAsync method was not disposing the CancellationTokenRegister object (#640)
1 parent fae9dfb commit bb8f2e4

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

src/Confluent.Kafka/Producer.cs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -564,19 +564,20 @@ public void HandleDeliveryReport(Producer.UntypedDeliveryReport deliveryReport)
564564
producer.enableDeliveryReportKey ? message.Key : default(TKey),
565565
producer.enableDeliveryReportValue ? message.Value : default(TValue));
566566

567-
cancellationToken.Register(() => handler.TrySetException(new TaskCanceledException()));
567+
using (cancellationToken.Register(() => handler.TrySetException(new TaskCanceledException())))
568+
{
569+
var keyBytes = keySerializer(topic, message.Key);
570+
var valBytes = valueSerializer(topic, message.Value);
568571

569-
var keyBytes = keySerializer(topic, message.Key);
570-
var valBytes = valueSerializer(topic, message.Value);
572+
producer.ProduceImpl(
573+
topic,
574+
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
575+
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
576+
message.Timestamp, Partition.Any, message.Headers,
577+
handler);
571578

572-
producer.ProduceImpl(
573-
topic,
574-
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
575-
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
576-
message.Timestamp, Partition.Any, message.Headers,
577-
handler);
578-
579-
return handler.Task;
579+
return handler.Task;
580+
}
580581
}
581582
else
582583
{

0 commit comments

Comments
 (0)