From da0a3c05b313f86e2514fd675d4784f8b5f3a4bb Mon Sep 17 00:00:00 2001 From: verdie-g Date: Mon, 12 May 2025 18:36:11 -0400 Subject: [PATCH 1/4] Implement IAsyncDispoable on AdminClient --- src/Confluent.Kafka/AdminClient.cs | 74 ++++++++++++++--------------- src/Confluent.Kafka/IAdminClient.cs | 3 ++ 2 files changed, 40 insertions(+), 37 deletions(-) diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index 64e67445b..6cb0be1e4 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -30,7 +30,7 @@ namespace Confluent.Kafka /// /// Implements an Apache Kafka admin client. /// - internal class AdminClient : IAdminClient + internal sealed class AdminClient : IAdminClient { private int cancellationDelayMaxMs; @@ -1654,51 +1654,51 @@ public Handle Handle /// /// Releases all resources used by this AdminClient. In the current - /// implementation, this method may block for up to 100ms. This - /// will be replaced with a non-blocking version in the future. + /// implementation, this method may block for up to 100ms. It is + /// recommended to use instead. /// public void Dispose() { - Dispose(true); - GC.SuppressFinalize(this); - } - - - /// - /// Releases the unmanaged resources used by the - /// - /// and optionally disposes the managed resources. - /// - /// - /// true to release both managed and unmanaged resources; - /// false to release only unmanaged resources. - /// - protected virtual void Dispose(bool disposing) - { - if (disposing) + callbackCts.Cancel(); + try { - callbackCts.Cancel(); - try - { - callbackTask.Wait(); - } - catch (AggregateException e) - { - if (e.InnerException.GetType() != typeof(TaskCanceledException)) - { - // program execution should never get here. - throw e.InnerException; - } - } - finally + callbackTask.Wait(); + } + catch (AggregateException e) + { + if (e.InnerException.GetType() != typeof(TaskCanceledException)) { - callbackCts.Dispose(); + // program execution should never get here. + throw e.InnerException; } - - DisposeResources(); } + finally + { + callbackCts.Dispose(); + } + + DisposeResources(); } +#if NET6_0_OR_GREATER + public async ValueTask DisposeAsync() + { + callbackCts.Cancel(); + try + { + await callbackTask; + } + catch (TaskCanceledException) + { + } + finally + { + callbackCts.Dispose(); + } + + DisposeResources(); + } +#endif private void DisposeResources() { diff --git a/src/Confluent.Kafka/IAdminClient.cs b/src/Confluent.Kafka/IAdminClient.cs index 12ef6c8d2..2ed4f6129 100644 --- a/src/Confluent.Kafka/IAdminClient.cs +++ b/src/Confluent.Kafka/IAdminClient.cs @@ -27,6 +27,9 @@ namespace Confluent.Kafka /// Defines an Apache Kafka admin client. /// public interface IAdminClient : IClient +#if NET6_0_OR_GREATER + , IAsyncDisposable +#endif { /// /// DEPRECATED. From cf19ee65dae300b3586cb47b86f8f850649734c7 Mon Sep 17 00:00:00 2001 From: verdie-g Date: Mon, 12 May 2025 18:49:30 -0400 Subject: [PATCH 2/4] Add basic test --- .../Admin/DisposeAsync.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 test/Confluent.Kafka.UnitTests/Admin/DisposeAsync.cs diff --git a/test/Confluent.Kafka.UnitTests/Admin/DisposeAsync.cs b/test/Confluent.Kafka.UnitTests/Admin/DisposeAsync.cs new file mode 100644 index 000000000..85c73b583 --- /dev/null +++ b/test/Confluent.Kafka.UnitTests/Admin/DisposeAsync.cs @@ -0,0 +1,15 @@ +using System.Threading.Tasks; +using Xunit; + +namespace Confluent.Kafka.UnitTests; + +public class DisposeAsync +{ + [Fact] + public async Task TestDisposeAsyncDoesNotThrow() + { + var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = "localhost:90922" }).Build(); + + await adminClient.DisposeAsync(); + } +} \ No newline at end of file From c87d4cadb96b3354ffcfbab9c1c811a4d475d57c Mon Sep 17 00:00:00 2001 From: "g.verdier" Date: Tue, 13 May 2025 13:35:51 -0400 Subject: [PATCH 3/4] Address comments --- src/Confluent.Kafka/AdminClient.cs | 31 ++++++++++++------- .../Tests/AdmninClient_DisposeAsync.cs | 28 +++++++++++++++++ .../Admin/DisposeAsync.cs | 15 --------- 3 files changed, 48 insertions(+), 26 deletions(-) create mode 100644 test/Confluent.Kafka.IntegrationTests/Tests/AdmninClient_DisposeAsync.cs delete mode 100644 test/Confluent.Kafka.UnitTests/Admin/DisposeAsync.cs diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index 6cb0be1e4..ef81a4a80 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -1353,7 +1353,7 @@ private Task StartPollTask(CancellationToken ct) // ignore the situation, we panic, destroy the librdkafka handle, // and exit the polling loop. Further usage of the AdminClient will // result in exceptions. People will be sure to notice and tell us. - this.DisposeResources(); + this.Dispose(true); break; } finally @@ -1651,11 +1651,15 @@ public string Name public Handle Handle => handle; + ~AdminClient() + { + Dispose(false); + } /// /// Releases all resources used by this AdminClient. In the current - /// implementation, this method may block for up to 100ms. It is - /// recommended to use instead. + /// implementation, this method may block for up to 100ms. With + /// .NET 6+, it is recommended to use DisposeAsync instead. /// public void Dispose() { @@ -1677,7 +1681,8 @@ public void Dispose() callbackCts.Dispose(); } - DisposeResources(); + Dispose(true); + GC.SuppressFinalize(this); } #if NET6_0_OR_GREATER @@ -1688,7 +1693,7 @@ public async ValueTask DisposeAsync() { await callbackTask; } - catch (TaskCanceledException) + catch (OperationCanceledException) { } finally @@ -1696,18 +1701,22 @@ public async ValueTask DisposeAsync() callbackCts.Dispose(); } - DisposeResources(); + Dispose(false); + GC.SuppressFinalize(this); } #endif - private void DisposeResources() + private void Dispose(bool disposing) { - kafkaHandle.DestroyQueue(resultQueue); - - if (handle.Owner == this) + if (disposing) { - ownedClient.Dispose(); + if (handle.Owner == this) + { + ownedClient.Dispose(); + } } + + kafkaHandle.DestroyQueue(resultQueue); } /// diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AdmninClient_DisposeAsync.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AdmninClient_DisposeAsync.cs new file mode 100644 index 000000000..f872a9b4e --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/Tests/AdmninClient_DisposeAsync.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading.Tasks; +using Confluent.Kafka.Admin; +using Xunit; + +namespace Confluent.Kafka.IntegrationTests; + +public partial class Tests +{ + [Theory, MemberData(nameof(KafkaParameters))] + public async Task DisposeAsyncDoesNotThrow(string bootstrapServers) + { + var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build(); + + string topicName = $"{nameof(DisposeAsyncDoesNotThrow)}-{Guid.NewGuid()}"; + await adminClient.CreateTopicsAsync(new[] + { + new TopicSpecification + { + Name = topicName, + NumPartitions = 1, + } + }); + await adminClient.DeleteTopicsAsync(new[] { topicName }); + + await adminClient.DisposeAsync(); + } +} \ No newline at end of file diff --git a/test/Confluent.Kafka.UnitTests/Admin/DisposeAsync.cs b/test/Confluent.Kafka.UnitTests/Admin/DisposeAsync.cs deleted file mode 100644 index 85c73b583..000000000 --- a/test/Confluent.Kafka.UnitTests/Admin/DisposeAsync.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System.Threading.Tasks; -using Xunit; - -namespace Confluent.Kafka.UnitTests; - -public class DisposeAsync -{ - [Fact] - public async Task TestDisposeAsyncDoesNotThrow() - { - var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = "localhost:90922" }).Build(); - - await adminClient.DisposeAsync(); - } -} \ No newline at end of file From eb2110be6c8370b42a6a409146143a4856d5b45f Mon Sep 17 00:00:00 2001 From: verdie-g Date: Sun, 25 May 2025 16:55:43 -0400 Subject: [PATCH 4/4] Dipose managed resources in DisposeAsync --- src/Confluent.Kafka/AdminClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index ef81a4a80..0a50b8d97 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -1701,7 +1701,7 @@ public async ValueTask DisposeAsync() callbackCts.Dispose(); } - Dispose(false); + Dispose(true); GC.SuppressFinalize(this); } #endif