Skip to content

Implement IAsyncDisposable on AdminClient #2470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 49 additions & 40 deletions src/Confluent.Kafka/AdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace Confluent.Kafka
/// <summary>
/// Implements an Apache Kafka admin client.
/// </summary>
internal class AdminClient : IAdminClient
internal sealed class AdminClient : IAdminClient
{
private int cancellationDelayMaxMs;

Expand Down Expand Up @@ -1353,7 +1353,7 @@ private Task StartPollTask(CancellationToken ct)
// ignore the situation, we panic, destroy the librdkafka handle,
// and exit the polling loop. Further usage of the AdminClient will
// result in exceptions. People will be sure to notice and tell us.
this.DisposeResources();
this.Dispose(true);
break;
}
finally
Expand Down Expand Up @@ -1651,63 +1651,72 @@ public string Name
public Handle Handle
=> handle;

~AdminClient()
{
Dispose(false);
}

/// <summary>
/// Releases all resources used by this AdminClient. In the current
/// implementation, this method may block for up to 100ms. This
/// will be replaced with a non-blocking version in the future.
/// implementation, this method may block for up to 100ms. With
/// .NET 6+, it is recommended to use DisposeAsync instead.
/// </summary>
public void Dispose()
{
callbackCts.Cancel();
try
{
callbackTask.Wait();
}
catch (AggregateException e)
{
if (e.InnerException.GetType() != typeof(TaskCanceledException))
{
// program execution should never get here.
throw e.InnerException;
}
}
finally
{
callbackCts.Dispose();
}

Dispose(true);
GC.SuppressFinalize(this);
}

#if NET6_0_OR_GREATER
public async ValueTask DisposeAsync()
{
callbackCts.Cancel();
try
{
await callbackTask;
}
catch (OperationCanceledException)
{
}
finally
{
callbackCts.Dispose();
}

/// <summary>
/// Releases the unmanaged resources used by the
/// <see cref="Confluent.Kafka.AdminClient" />
/// and optionally disposes the managed resources.
/// </summary>
/// <param name="disposing">
/// true to release both managed and unmanaged resources;
/// false to release only unmanaged resources.
/// </param>
protected virtual void Dispose(bool disposing)
Dispose(false);
GC.SuppressFinalize(this);
}
#endif

private void Dispose(bool disposing)
{
if (disposing)
{
callbackCts.Cancel();
try
{
callbackTask.Wait();
}
catch (AggregateException e)
{
if (e.InnerException.GetType() != typeof(TaskCanceledException))
{
// program execution should never get here.
throw e.InnerException;
}
}
finally
if (handle.Owner == this)
{
callbackCts.Dispose();
ownedClient.Dispose();
}

DisposeResources();
}
}


private void DisposeResources()
{
kafkaHandle.DestroyQueue(resultQueue);

if (handle.Owner == this)
{
ownedClient.Dispose();
}
}

/// <summary>
Expand Down
3 changes: 3 additions & 0 deletions src/Confluent.Kafka/IAdminClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ namespace Confluent.Kafka
/// Defines an Apache Kafka admin client.
/// </summary>
public interface IAdminClient : IClient
#if NET6_0_OR_GREATER
, IAsyncDisposable
#endif
{
/// <summary>
/// DEPRECATED.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;
using Confluent.Kafka.Admin;
using Xunit;

namespace Confluent.Kafka.IntegrationTests;

public partial class Tests
{
[Theory, MemberData(nameof(KafkaParameters))]
public async Task DisposeAsyncDoesNotThrow(string bootstrapServers)
{
var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build();

string topicName = $"{nameof(DisposeAsyncDoesNotThrow)}-{Guid.NewGuid()}";
await adminClient.CreateTopicsAsync(new[]
{
new TopicSpecification
{
Name = topicName,
NumPartitions = 1,
}
});
await adminClient.DeleteTopicsAsync(new[] { topicName });

await adminClient.DisposeAsync();
}
}