diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index 64e67445b..0a50b8d97 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -30,7 +30,7 @@ namespace Confluent.Kafka /// /// Implements an Apache Kafka admin client. /// - internal class AdminClient : IAdminClient + internal sealed class AdminClient : IAdminClient { private int cancellationDelayMaxMs; @@ -1353,7 +1353,7 @@ private Task StartPollTask(CancellationToken ct) // ignore the situation, we panic, destroy the librdkafka handle, // and exit the polling loop. Further usage of the AdminClient will // result in exceptions. People will be sure to notice and tell us. - this.DisposeResources(); + this.Dispose(true); break; } finally @@ -1651,63 +1651,72 @@ public string Name public Handle Handle => handle; + ~AdminClient() + { + Dispose(false); + } /// /// Releases all resources used by this AdminClient. In the current - /// implementation, this method may block for up to 100ms. This - /// will be replaced with a non-blocking version in the future. + /// implementation, this method may block for up to 100ms. With + /// .NET 6+, it is recommended to use DisposeAsync instead. /// public void Dispose() { + callbackCts.Cancel(); + try + { + callbackTask.Wait(); + } + catch (AggregateException e) + { + if (e.InnerException.GetType() != typeof(TaskCanceledException)) + { + // program execution should never get here. + throw e.InnerException; + } + } + finally + { + callbackCts.Dispose(); + } + Dispose(true); GC.SuppressFinalize(this); } +#if NET6_0_OR_GREATER + public async ValueTask DisposeAsync() + { + callbackCts.Cancel(); + try + { + await callbackTask; + } + catch (OperationCanceledException) + { + } + finally + { + callbackCts.Dispose(); + } - /// - /// Releases the unmanaged resources used by the - /// - /// and optionally disposes the managed resources. - /// - /// - /// true to release both managed and unmanaged resources; - /// false to release only unmanaged resources. - /// - protected virtual void Dispose(bool disposing) + Dispose(true); + GC.SuppressFinalize(this); + } +#endif + + private void Dispose(bool disposing) { if (disposing) { - callbackCts.Cancel(); - try - { - callbackTask.Wait(); - } - catch (AggregateException e) + if (handle.Owner == this) { - if (e.InnerException.GetType() != typeof(TaskCanceledException)) - { - // program execution should never get here. - throw e.InnerException; - } + ownedClient.Dispose(); } - finally - { - callbackCts.Dispose(); - } - - DisposeResources(); } - } - - private void DisposeResources() - { kafkaHandle.DestroyQueue(resultQueue); - - if (handle.Owner == this) - { - ownedClient.Dispose(); - } } /// diff --git a/src/Confluent.Kafka/IAdminClient.cs b/src/Confluent.Kafka/IAdminClient.cs index 12ef6c8d2..2ed4f6129 100644 --- a/src/Confluent.Kafka/IAdminClient.cs +++ b/src/Confluent.Kafka/IAdminClient.cs @@ -27,6 +27,9 @@ namespace Confluent.Kafka /// Defines an Apache Kafka admin client. /// public interface IAdminClient : IClient +#if NET6_0_OR_GREATER + , IAsyncDisposable +#endif { /// /// DEPRECATED. diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AdmninClient_DisposeAsync.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AdmninClient_DisposeAsync.cs new file mode 100644 index 000000000..f872a9b4e --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/Tests/AdmninClient_DisposeAsync.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; +using Confluent.Kafka.Admin; +using Xunit; + +namespace Confluent.Kafka.IntegrationTests; + +public partial class Tests +{ + [Theory, MemberData(nameof(KafkaParameters))] + public async Task DisposeAsyncDoesNotThrow(string bootstrapServers) + { + var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build(); + + string topicName = $"{nameof(DisposeAsyncDoesNotThrow)}-{Guid.NewGuid()}"; + await adminClient.CreateTopicsAsync(new[] + { + new TopicSpecification + { + Name = topicName, + NumPartitions = 1, + } + }); + await adminClient.DeleteTopicsAsync(new[] { topicName }); + + await adminClient.DisposeAsync(); + } +} \ No newline at end of file