Skip to content

Commit cd8dfe8

Browse files
authored
Dispose ProduceAsync's CancellationTokenRegistration (#1289)
1 parent b6a4dd0 commit cd8dfe8

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

src/Confluent.Kafka/Producer.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -716,18 +716,19 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
716716
enableDeliveryReportKey ? message.Key : default(TKey),
717717
enableDeliveryReportValue ? message.Value : default(TValue));
718718

719+
if (cancellationToken != null && cancellationToken.CanBeCanceled)
720+
{
721+
handler.CancellationTokenRegistration
722+
= cancellationToken.Register(() => handler.TrySetCanceled());
723+
}
724+
719725
ProduceImpl(
720726
topicPartition.Topic,
721727
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
722728
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
723729
message.Timestamp, topicPartition.Partition, headers,
724730
handler);
725731

726-
if (cancellationToken != null && cancellationToken.CanBeCanceled)
727-
{
728-
cancellationToken.Register(() => handler.TrySetCanceled());
729-
}
730-
731732
return await handler.Task.ConfigureAwait(false);
732733
}
733734
else
@@ -867,6 +868,8 @@ public TypedTaskDeliveryHandlerShim(string topic, TKey key, TValue val)
867868
Value = val;
868869
}
869870

871+
public CancellationTokenRegistration CancellationTokenRegistration;
872+
870873
public string Topic;
871874

872875
public TKey Key;
@@ -875,6 +878,11 @@ public TypedTaskDeliveryHandlerShim(string topic, TKey key, TValue val)
875878

876879
public void HandleDeliveryReport(DeliveryReport<Null, Null> deliveryReport)
877880
{
881+
if (CancellationTokenRegistration != null)
882+
{
883+
CancellationTokenRegistration.Dispose();
884+
}
885+
878886
if (deliveryReport == null)
879887
{
880888
#if NET45

0 commit comments

Comments
 (0)