Skip to content

Commit f0a73b2

Browse files
jeriveJerome Viveret
andauthored
[AdminClient] Allow AdminClient to be authenticated (#1608)
Set token refresh mechanism Co-authored-by: Jerome Viveret <[email protected]>
1 parent 3ceaec8 commit f0a73b2

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

src/Confluent.Kafka/AdminClient.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ internal AdminClient(AdminClientBuilder builder)
506506
if (builder.LogHandler != null) { producerBuilder.SetLogHandler((_, logMessage) => builder.LogHandler(this, logMessage)); }
507507
if (builder.ErrorHandler != null) { producerBuilder.SetErrorHandler((_, error) => builder.ErrorHandler(this, error)); }
508508
if (builder.StatisticsHandler != null) { producerBuilder.SetStatisticsHandler((_, stats) => builder.StatisticsHandler(this, stats)); }
509+
if (builder.OAuthBearerTokenRefreshHandler != null) { producerBuilder.SetOAuthBearerTokenRefreshHandler(builder.OAuthBearerTokenRefreshHandler); }
509510
this.ownedClient = producerBuilder.Build();
510511

511512
this.handle = new Handle

src/Confluent.Kafka/AdminClientBuilder.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public class AdminClientBuilder
4545
/// </summary>
4646
internal protected Action<IAdminClient, string> StatisticsHandler { get; set; }
4747

48+
/// <summary>
49+
/// The configured OAuthBearer Token Refresh handler.
50+
/// </summary>
51+
public Action<IProducer<Null, Null>, string> OAuthBearerTokenRefreshHandler { get; set; }
52+
4853
/// <summary>
4954
/// Initialize a new <see cref="AdminClientBuilder" /> instance.
5055
/// </summary>
@@ -129,6 +134,39 @@ public AdminClientBuilder SetLogHandler(Action<IAdminClient, LogMessage> logHand
129134
return this;
130135
}
131136

137+
/// <summary>
138+
/// Set SASL/OAUTHBEARER token refresh callback in provided
139+
/// conf object. The SASL/OAUTHBEARER token refresh callback
140+
/// is triggered via <see cref="IAdminClient"/>'s admin methods
141+
/// (or any of its overloads) whenever OAUTHBEARER is the SASL
142+
/// mechanism and a token needs to be retrieved, typically
143+
/// based on the configuration defined in
144+
/// sasl.oauthbearer.config. The callback should invoke
145+
/// <see cref="ClientExtensions.OAuthBearerSetToken"/>
146+
/// or <see cref="ClientExtensions.OAuthBearerSetTokenFailure"/>
147+
/// to indicate success or failure, respectively.
148+
///
149+
/// An unsecured JWT refresh handler is provided by librdkafka
150+
/// for development and testing purposes, it is enabled by
151+
/// setting the enable.sasl.oauthbearer.unsecure.jwt property
152+
/// to true and is mutually exclusive to using a refresh callback.
153+
/// </summary>
154+
/// <param name="oAuthBearerTokenRefreshHandler">
155+
/// the callback to set; callback function arguments:
156+
/// IProducer - instance of the admin client's inner producer instance
157+
/// which should be used to set token or token failure string - Value of configuration
158+
/// property sasl.oauthbearer.config
159+
/// </param>
160+
public AdminClientBuilder SetOAuthBearerTokenRefreshHandler(Action<IProducer<Null, Null>, string> oAuthBearerTokenRefreshHandler)
161+
{
162+
if (this.OAuthBearerTokenRefreshHandler != null)
163+
{
164+
throw new InvalidOperationException("OAuthBearer token refresh handler may not be specified more than once.");
165+
}
166+
this.OAuthBearerTokenRefreshHandler = oAuthBearerTokenRefreshHandler;
167+
return this;
168+
}
169+
132170
/// <summary>
133171
/// Build the <see cref="AdminClient" /> instance.
134172
/// </summary>

0 commit comments

Comments
 (0)