Skip to content

Commit 479c6b2

Browse files
Add client credentials OAuth support (confluentinc#2426)
* Add client credentials OAuth support * Remove imports * Add Oauth support to cached schema registry client * Set BasicAuthSource to USER_INFO if not specified * Fix formatting for initialization * Fix formatting * Add double checked locking * Add expiry time to token * Remove unused variables in bearer token provider * Make token volatile
1 parent aa03f1c commit 479c6b2

11 files changed

+851
-32
lines changed

src/Confluent.SchemaRegistry/AuthCredentialsSource.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,25 @@ public enum AuthCredentialsSource
3333
/// </summary>
3434
SaslInherit
3535
}
36+
37+
/// <summary>
38+
/// Bearer auth credentials source.
39+
/// </summary>
40+
public enum BearerAuthCredentialsSource
41+
{
42+
/// <summary>
43+
/// Credentials are specified via the `schema.registry.bearer.auth.token` config property.
44+
/// </summary>
45+
StaticToken,
46+
47+
/// <summary>
48+
/// Credentials are specified via the `schema.registry.oauthbearer.auth.credentials.source` config property.
49+
/// </summary>
50+
OAuthBearer,
51+
52+
/// <summary>
53+
/// User provides a custom implementation of IAuthenticationHeaderValueProvider.
54+
/// </summary>
55+
Custom
56+
}
3657
}

src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs

Lines changed: 102 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
using System.Threading.Tasks;
2424
using System.Linq;
2525
using System;
26+
using System.Net.Http;
2627
using System.Collections.Concurrent;
2728
using System.Net;
2829
using System.Threading;
@@ -329,6 +330,12 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
329330

330331
username = userPass[0];
331332
password = userPass[1];
333+
if (authenticationHeaderValueProvider != null)
334+
{
335+
throw new ArgumentException(
336+
$"Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password");
337+
}
338+
authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider(username, password);
332339
}
333340
}
334341
else if (basicAuthSource == "SASL_INHERIT")
@@ -355,40 +362,112 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
355362

356363
username = saslUsername.Value;
357364
password = saslPassword.Value;
365+
if (authenticationHeaderValueProvider != null)
366+
{
367+
throw new ArgumentException(
368+
$"Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password");
369+
}
370+
authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider(username, password);
358371
}
359372
else
360373
{
361374
throw new ArgumentException(
362375
$"Invalid value '{basicAuthSource}' specified for property '{SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource}'");
363376
}
364377

365-
if (authenticationHeaderValueProvider != null)
378+
var bearerAuthSource = config.FirstOrDefault(prop =>
379+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource).Value ?? "";
380+
381+
if (bearerAuthSource != "" && basicAuthSource != "")
366382
{
367-
if (username != null || password != null)
368-
{
369-
throw new ArgumentException(
370-
$"Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password");
371-
}
383+
throw new ArgumentException(
384+
$"Invalid authentication header value provider configuration: Cannot specify both basic and bearer authentication");
372385
}
373-
else
386+
387+
string logicalCluster = null;
388+
string identityPoolId = null;
389+
string bearerToken = null;
390+
string clientId = null;
391+
string clientSecret = null;
392+
string scope = null;
393+
string tokenEndpointUrl = null;
394+
395+
if (bearerAuthSource == "STATIC_TOKEN" || bearerAuthSource == "OAUTHBEARER")
374396
{
375-
if (username != null && password == null)
397+
if (authenticationHeaderValueProvider != null)
376398
{
377399
throw new ArgumentException(
378-
$"Invalid authentication header value provider configuration: Basic authentication username specified, but password not specified");
400+
$"Invalid authentication header value provider configuration: Cannot specify both custom provider and bearer authentication");
379401
}
402+
logicalCluster = config.FirstOrDefault(prop =>
403+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster).Value;
380404

381-
if (username == null && password != null)
405+
identityPoolId = config.FirstOrDefault(prop =>
406+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId).Value;
407+
if (logicalCluster == null || identityPoolId == null)
382408
{
383409
throw new ArgumentException(
384-
$"Invalid authentication header value provider configuration: Basic authentication password specified, but username not specified");
385-
}
386-
else if (username != null && password != null)
387-
{
388-
authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider(username, password);
410+
$"Invalid bearer authentication provider configuration: Logical cluster and identity pool ID must be specified");
389411
}
390412
}
391413

414+
switch (bearerAuthSource)
415+
{
416+
case "STATIC_TOKEN":
417+
bearerToken = config.FirstOrDefault(prop =>
418+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken).Value;
419+
420+
if (bearerToken == null)
421+
{
422+
throw new ArgumentException(
423+
$"Invalid authentication header value provider configuration: Bearer authentication token not specified");
424+
}
425+
authenticationHeaderValueProvider = new StaticBearerAuthenticationHeaderValueProvider(bearerToken, logicalCluster, identityPoolId);
426+
break;
427+
428+
case "OAUTHBEARER":
429+
clientId = config.FirstOrDefault(prop =>
430+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId).Value;
431+
432+
clientSecret = config.FirstOrDefault(prop =>
433+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret).Value;
434+
435+
scope = config.FirstOrDefault(prop =>
436+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope).Value;
437+
438+
tokenEndpointUrl = config.FirstOrDefault(prop =>
439+
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl).Value;
440+
441+
if (tokenEndpointUrl == null || clientId == null || clientSecret == null || scope == null)
442+
{
443+
throw new ArgumentException(
444+
$"Invalid bearer authentication provider configuration: Token endpoint URL, client ID, client secret, and scope must be specified");
445+
}
446+
authenticationHeaderValueProvider = new BearerAuthenticationHeaderValueProvider(
447+
new HttpClient(), clientId, clientSecret, scope, tokenEndpointUrl, logicalCluster, identityPoolId, maxRetries, retriesWaitMs, retriesMaxWaitMs);
448+
break;
449+
450+
case "CUSTOM":
451+
if (authenticationHeaderValueProvider == null)
452+
{
453+
throw new ArgumentException(
454+
$"Invalid authentication header value provider configuration: Custom authentication provider must be specified");
455+
}
456+
if(!(authenticationHeaderValueProvider is IAuthenticationBearerHeaderValueProvider))
457+
{
458+
throw new ArgumentException(
459+
$"Invalid authentication header value provider configuration: Custom authentication provider must implement IAuthenticationBearerHeaderValueProvider");
460+
}
461+
break;
462+
463+
case "":
464+
break;
465+
466+
default:
467+
throw new ArgumentException(
468+
$"Invalid value '{bearerAuthSource}' specified for property '{SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource}'");
469+
}
470+
392471
foreach (var property in config)
393472
{
394473
if (!property.Key.StartsWith("schema.registry."))
@@ -405,6 +484,14 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
405484
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs &&
406485
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource &&
407486
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo &&
487+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource &&
488+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken &&
489+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId &&
490+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret &&
491+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope &&
492+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl &&
493+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster &&
494+
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId &&
408495
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryKeySubjectNameStrategy &&
409496
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy &&
410497
property.Key != SchemaRegistryConfig.PropertyNames.SslCaLocation &&
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
using System;
2+
using System.Net.Http;
3+
using System.Net.Http.Headers;
4+
using System.Threading.Tasks;
5+
using System.Collections.Generic;
6+
using Newtonsoft.Json;
7+
using Newtonsoft.Json.Linq;
8+
9+
namespace Confluent.SchemaRegistry
10+
{
11+
class BearerToken
12+
{
13+
[JsonProperty("access_token")]
14+
public string AccessToken { get; set; }
15+
[JsonProperty("token_type")]
16+
public string TokenType { get; set; }
17+
[JsonProperty("expires_in")]
18+
public int ExpiresIn { get; set; }
19+
[JsonProperty("scope")]
20+
public string Scope { get; set; }
21+
[JsonIgnore]
22+
public double ExpiryTime { get; set; }
23+
}
24+
25+
public class BearerAuthenticationHeaderValueProvider : IAuthenticationBearerHeaderValueProvider, IDisposable
26+
{
27+
private readonly string clientId;
28+
private readonly string clientSecret;
29+
private readonly string scope;
30+
private readonly string tokenEndpoint;
31+
private readonly string logicalCluster;
32+
private readonly string identityPool;
33+
private readonly int maxRetries;
34+
private readonly int retriesWaitMs;
35+
private readonly int retriesMaxWaitMs;
36+
private readonly HttpClient httpClient;
37+
private volatile BearerToken token;
38+
private const float tokenExpiryThreshold = 0.8f;
39+
40+
public BearerAuthenticationHeaderValueProvider(
41+
HttpClient httpClient,
42+
string clientId,
43+
string clientSecret,
44+
string scope,
45+
string tokenEndpoint,
46+
string logicalCluster,
47+
string identityPool,
48+
int maxRetries,
49+
int retriesWaitMs,
50+
int retriesMaxWaitMs)
51+
{
52+
this.httpClient = httpClient;
53+
this.clientId = clientId;
54+
this.clientSecret = clientSecret;
55+
this.scope = scope;
56+
this.tokenEndpoint = tokenEndpoint;
57+
this.logicalCluster = logicalCluster;
58+
this.identityPool = identityPool;
59+
this.maxRetries = maxRetries;
60+
this.retriesWaitMs = retriesWaitMs;
61+
this.retriesMaxWaitMs = retriesMaxWaitMs;
62+
}
63+
64+
public async Task InitOrRefreshAsync()
65+
{
66+
await GenerateToken();
67+
}
68+
69+
public bool NeedsInitOrRefresh()
70+
{
71+
return token == null || DateTimeOffset.UtcNow.ToUnixTimeSeconds() >= token.ExpiryTime;
72+
}
73+
74+
private HttpRequestMessage CreateTokenRequest()
75+
{
76+
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, tokenEndpoint);
77+
78+
request.Content = new FormUrlEncodedContent(new[]
79+
{
80+
new KeyValuePair<string, string>("grant_type", "client_credentials"),
81+
new KeyValuePair<string, string>("client_id", clientId),
82+
new KeyValuePair<string, string>("client_secret", clientSecret),
83+
new KeyValuePair<string, string>("scope", scope)
84+
});
85+
86+
return request;
87+
}
88+
89+
private async Task GenerateToken()
90+
{
91+
var request = CreateTokenRequest();
92+
93+
for (int i = 0; i < maxRetries + 1; i++){
94+
try
95+
{
96+
var response = await httpClient.SendAsync(request).ConfigureAwait(continueOnCapturedContext: false);
97+
response.EnsureSuccessStatusCode();
98+
var tokenResponse = await response.Content.ReadAsStringAsync();
99+
token = JObject.Parse(tokenResponse).ToObject<BearerToken>(JsonSerializer.Create());
100+
token.ExpiryTime = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + (int)(token.ExpiresIn * tokenExpiryThreshold);
101+
return;
102+
}
103+
catch (Exception e)
104+
{
105+
if (i == maxRetries)
106+
{
107+
throw new Exception("Failed to fetch token from server: " + e.Message);
108+
}
109+
await Task.Delay(RetryUtility.CalculateRetryDelay(retriesWaitMs, retriesMaxWaitMs, i));
110+
}
111+
}
112+
}
113+
114+
public AuthenticationHeaderValue GetAuthenticationHeader()
115+
{
116+
if (this.token == null)
117+
{
118+
throw new InvalidOperationException("Token not initialized");
119+
}
120+
121+
return new AuthenticationHeaderValue("Bearer", this.token.AccessToken);
122+
}
123+
124+
public string GetLogicalCluster() => this.logicalCluster;
125+
126+
public string GetIdentityPool() => this.identityPool;
127+
128+
public void Dispose()
129+
{
130+
this.token = null;
131+
}
132+
}
133+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2025 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Net.Http.Headers;
18+
using System.Threading.Tasks;
19+
namespace Confluent.SchemaRegistry
20+
{
21+
/// <summary>
22+
/// An interface defining HTTP client authentication header values.
23+
/// </summary>
24+
public interface IAuthenticationBearerHeaderValueProvider : IAuthenticationHeaderValueProvider
25+
{
26+
public Task InitOrRefreshAsync();
27+
28+
public bool NeedsInitOrRefresh();
29+
30+
/// <summary>
31+
/// Get the authentication header for HTTP requests
32+
/// </summary>
33+
/// <returns>
34+
/// The authentication header for HTTP request messages
35+
/// </returns>
36+
///
37+
38+
AuthenticationHeaderValue GetAuthenticationHeader();
39+
40+
/// <summary>
41+
/// Get the logical cluster for HTTP requests
42+
/// </summary>
43+
/// <returns>
44+
/// The logical cluster for HTTP request messages
45+
/// </returns>
46+
string GetLogicalCluster();
47+
/// <summary>
48+
/// Get the identity pool for HTTP requests
49+
/// </summary>
50+
/// <returns>
51+
/// The identity pool for HTTP request messages
52+
/// </returns>
53+
string GetIdentityPool();
54+
}
55+
}

0 commit comments

Comments
 (0)