diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d92a3b56..c5566e86c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# 2.12.0 + +## Enhancements + +* References librdkafka.redist 2.12.0. Refer to the [librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) for more information. +* OAuth OIDC method for Schema Registry metadata based authentication with + an Azure IMDS endpoint using an attached managed identity as principal (#2523). + + # 2.11.1 ## Enhancements diff --git a/examples/OAuthOIDCAzureIMDS/OAuthOIDCAzureIMDS.csproj b/examples/OAuthOIDCAzureIMDS/OAuthOIDCAzureIMDS.csproj new file mode 100644 index 000000000..c56280697 --- /dev/null +++ b/examples/OAuthOIDCAzureIMDS/OAuthOIDCAzureIMDS.csproj @@ -0,0 +1,16 @@ + + + + {FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} + OAuthOIDC + net8.0 + Exe + 7.1 + + + + + + + + diff --git a/examples/OAuthOIDCAzureIMDS/Program.cs b/examples/OAuthOIDCAzureIMDS/Program.cs new file mode 100644 index 000000000..fb93cab31 --- /dev/null +++ b/examples/OAuthOIDCAzureIMDS/Program.cs @@ -0,0 +1,190 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Confluent.Kafka.SyncOverAsync; +using Newtonsoft.Json; +using Confluent.SchemaRegistry; +using Confluent.SchemaRegistry.Serdes; + +/// +/// An example demonstrating how to produce a message to +/// a topic, and then reading it back again using a consumer. +/// The authentication uses the OpenID Connect method of the OAUTHBEARER SASL mechanism. +/// The token is acquired from the Azure Instance Metadata Service (IMDS) +/// using metadata based secret-less authentication. +/// +namespace Confluent.Kafka.Examples.OAuthOIDCAzureIMDS +{ + + class User + { + [JsonRequired] // use Newtonsoft.Json annotations + [JsonProperty("name")] + public string Name { get; set; } + + [JsonRequired] + [JsonProperty("favorite_color")] + public string FavoriteColor { get; set; } + + [JsonProperty("favorite_number")] + public long FavoriteNumber { get; set; } + } + + public class Program + { + private const string azureIMDSQueryParams = "api-version=&resource=&client_id="; + private const string kafkaLogicalCluster = "your-logical-cluster"; + private const string identityPoolId = "your-identity-pool-id"; + + public static async Task Main(string[] args) + { + if (args.Length != 3) + { + Console.WriteLine("Usage: .. brokerList schemaRegistryUrl"); + return; + } + var bootstrapServers = args[1]; + var schemaRegistryUrl = args[2]; + var topicName = Guid.NewGuid().ToString(); + var groupId = Guid.NewGuid().ToString(); + + var commonConfig = new ClientConfig + { + BootstrapServers = bootstrapServers, + SecurityProtocol = SecurityProtocol.SaslPlaintext, + SaslMechanism = SaslMechanism.OAuthBearer, + SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc, + SaslOauthbearerMetadataAuthenticationType = SaslOauthbearerMetadataAuthenticationType.AzureIMDS, + SaslOauthbearerConfig = $"query={azureIMDSQueryParams}", + SaslOauthbearerExtensions = $"logicalCluster={kafkaLogicalCluster},identityPoolId={identityPoolId}" + }; + + var consumerConfig = new ConsumerConfig + { + BootstrapServers = bootstrapServers, + SecurityProtocol = SecurityProtocol.SaslPlaintext, + SaslMechanism = SaslMechanism.OAuthBearer, + SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc, + GroupId = groupId, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoOffsetStore = false + }; + + var schemaRegistryConfig = new SchemaRegistryConfig + { + Url = schemaRegistryUrl, + BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS, + BearerAuthTokenEndpointQuery = azureIMDSQueryParams, + BearerAuthLogicalCluster = kafkaLogicalCluster, + BearerAuthIdentityPoolId = identityPoolId + }; + + try + { + createTopic(commonConfig, topicName); + } + catch (CreateTopicsException e) + { + Console.WriteLine($"An error occurred creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + Environment.Exit(1); + } + + using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig)) + using (var producer = new ProducerBuilder(commonConfig) + .SetValueSerializer(new JsonSerializer(schemaRegistry)) + .Build()) + using (var consumer = new ConsumerBuilder(consumerConfig) + .SetValueDeserializer(new JsonDeserializer(schemaRegistry).AsSyncOverAsync()).Build()) + { + consumer.Subscribe(topicName); + + var cancelled = false; + CancellationTokenSource cts = new CancellationTokenSource(); + + Console.CancelKeyPress += (_, e) => + { + e.Cancel = true; // prevent the process from terminating. + cancelled = true; + cts.Cancel(); + }; + + try + { + while (!cancelled) + { + var msg = new User + { + Name = "user-" + Guid.NewGuid().ToString(), + FavoriteColor = "blue", + FavoriteNumber = 7 + }; + + try + { + var deliveryReport = await producer.ProduceAsync(topicName, new Message { Value = msg }); + Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}, {msg}"); + } + catch (ProduceException e) + { + Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]"); + } + + try + { + var consumeResult = consumer.Consume(cts.Token); + Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}"); + try + { + consumer.StoreOffset(consumeResult); + } + catch (KafkaException e) + { + Console.WriteLine($"Store Offset error: {e.Error.Reason}"); + } + } + catch (ConsumeException e) + { + Console.WriteLine($"Consume error: {e.Error.Reason}"); + } + } + } + catch (OperationCanceledException) + { + Console.WriteLine("Closing consumer."); + consumer.Close(); + } + } + } + + private static void createTopic(ClientConfig config, String topicName) + { + using (var adminClient = new AdminClientBuilder(config).Build()) + { + adminClient.CreateTopicsAsync(new TopicSpecification[] { + new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait(); ; + } + } + } + +} diff --git a/examples/SchemaRegistryOAuth/Program.cs b/examples/SchemaRegistryOAuth/Program.cs index e67b667d4..8c140e83f 100644 --- a/examples/SchemaRegistryOAuth/Program.cs +++ b/examples/SchemaRegistryOAuth/Program.cs @@ -50,9 +50,9 @@ public ExampleBearerAuthProvider(string token, string logicalCluster, string ide public static async Task Main(string[] args) { - if (args.Length != 9) + if (args.Length < 9) { - Console.WriteLine("Usage: .. schemaRegistryUrl clientId clientSecret scope tokenEndpoint logicalCluster identityPool token"); + Console.WriteLine("Usage: .. schemaRegistryUrl clientId clientSecret scope tokenEndpoint logicalCluster identityPool token [azureIMDSQueryParams]"); return; } string schemaRegistryUrl = args[1]; @@ -63,10 +63,15 @@ public static async Task Main(string[] args) string logicalCluster = args[6]; string identityPool = args[7]; string token = args[8]; + string? azureIMDSQueryParams = null; + if (args.Length >= 10) + { + azureIMDSQueryParams = args[9]; + } //using BearerAuthCredentialsSource.OAuthBearer var clientCredentialsSchemaRegistryConfig = new SchemaRegistryConfig - { + { Url = schemaRegistryUrl, BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearer, BearerAuthClientId = clientId, @@ -85,7 +90,7 @@ public static async Task Main(string[] args) //using BearerAuthCredentialsSource.StaticToken var staticSchemaRegistryConfig = new SchemaRegistryConfig - { + { Url = schemaRegistryUrl, BearerAuthCredentialsSource = BearerAuthCredentialsSource.StaticToken, BearerAuthToken = token, @@ -101,7 +106,7 @@ public static async Task Main(string[] args) //Using BearerAuthCredentialsSource.Custom var customSchemaRegistryConfig = new SchemaRegistryConfig - { + { Url = schemaRegistryUrl, BearerAuthCredentialsSource = BearerAuthCredentialsSource.Custom }; @@ -112,6 +117,27 @@ public static async Task Main(string[] args) var subjects = await schemaRegistry.GetAllSubjectsAsync(); Console.WriteLine(string.Join(", ", subjects)); } + + if (azureIMDSQueryParams == null) + { + return; + } + + //Using BearerAuthCredentialsSource.OAuthOIDCAzureIMDS + var azureIMDSSchemaRegistryConfig = new SchemaRegistryConfig + { + Url = schemaRegistryUrl, + BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS, + BearerAuthTokenEndpointQuery = azureIMDSQueryParams, + BearerAuthLogicalCluster = logicalCluster, + BearerAuthIdentityPoolId = identityPool, + }; + + using (var schemaRegistry = new CachedSchemaRegistryClient(azureIMDSSchemaRegistryConfig)) + { + var subjects = await schemaRegistry.GetAllSubjectsAsync(); + Console.WriteLine(string.Join(", ", subjects)); + } } } } diff --git a/src/ConfigGen/Program.cs b/src/ConfigGen/Program.cs index 8e7c2c9bb..66b52f46d 100644 --- a/src/ConfigGen/Program.cs +++ b/src/ConfigGen/Program.cs @@ -320,6 +320,7 @@ static string ConfigNameToDotnetName(string configName) { "resolve_canonical_bootstrap_servers_only", "ResolveCanonicalBootstrapServersOnly"}, { "client_credentials", "ClientCredentials"}, { "urn:ietf:params:oauth:grant-type:jwt-bearer", "JwtBearer"}, + { "azure_imds", "AzureIMDS"}, }; static string EnumNameToDotnetName(string enumName) diff --git a/src/Confluent.Kafka/Config_gen.cs b/src/Confluent.Kafka/Config_gen.cs index 8562a0a23..5bbe44410 100644 --- a/src/Confluent.Kafka/Config_gen.cs +++ b/src/Confluent.Kafka/Config_gen.cs @@ -1,4 +1,4 @@ -// *** Auto-generated from librdkafka v2.11.1 *** - do not modify manually. +// *** Auto-generated from librdkafka dev_oauthbearer_metadata_based *** - do not modify manually. // // Copyright 2018-2022 Confluent Inc. // @@ -203,6 +203,22 @@ public enum SaslOauthbearerAssertionAlgorithm ES256 } + /// + /// SaslOauthbearerMetadataAuthenticationType enum values + /// + public enum SaslOauthbearerMetadataAuthenticationType + { + /// + /// None + /// + None, + + /// + /// AzureIMDS + /// + AzureIMDS + } + /// /// PartitionAssignmentStrategy enum values /// @@ -1318,6 +1334,16 @@ public Acks? Acks /// public string SaslOauthbearerAssertionJwtTemplateFile { get { return Get("sasl.oauthbearer.assertion.jwt.template.file"); } set { this.SetObject("sasl.oauthbearer.assertion.jwt.template.file", value); } } + /// + /// + /// + public SaslOauthbearerMetadataAuthenticationType? SaslOauthbearerMetadataAuthenticationType { get { return (SaslOauthbearerMetadataAuthenticationType?)GetEnum(typeof(SaslOauthbearerMetadataAuthenticationType), "sasl.oauthbearer.metadata.authentication.type"); } set { this.SetObject("sasl.oauthbearer.metadata.authentication.type", value); } } + /// /// > config, authenticationHeaderValueProvider = DekRestService.AuthenticationHeaderValueProvider( config, authenticationHeaderValueProvider, maxRetries, retriesWaitMs, retriesMaxWaitMs); + + var propertyNames = new HashSet + { + { SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointQuery }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryKeySubjectNameStrategy }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy }, + { SchemaRegistryConfig.PropertyNames.SslCaLocation }, + { SchemaRegistryConfig.PropertyNames.SslKeystoreLocation }, + { SchemaRegistryConfig.PropertyNames.SslKeystorePassword }, + { SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification } + }; + foreach (var property in config) { if (!property.Key.StartsWith("schema.registry.")) @@ -201,30 +231,7 @@ public CachedDekRegistryClient(IEnumerable> config, continue; } - if (property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryKeySubjectNameStrategy && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy && - property.Key != SchemaRegistryConfig.PropertyNames.SslCaLocation && - property.Key != SchemaRegistryConfig.PropertyNames.SslKeystoreLocation && - property.Key != SchemaRegistryConfig.PropertyNames.SslKeystorePassword && - property.Key != SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification) + if (!propertyNames.Contains(property.Key)) { throw new ArgumentException($"Unknown configuration parameter {property.Key}"); } diff --git a/src/Confluent.SchemaRegistry/AuthCredentialsSource.cs b/src/Confluent.SchemaRegistry/AuthCredentialsSource.cs index e9a334799..3ac53de56 100644 --- a/src/Confluent.SchemaRegistry/AuthCredentialsSource.cs +++ b/src/Confluent.SchemaRegistry/AuthCredentialsSource.cs @@ -45,10 +45,17 @@ public enum BearerAuthCredentialsSource StaticToken, /// - /// Credentials are specified via the `schema.registry.oauthbearer.auth.credentials.source` config property. + /// Credentials are specified via the `schema.registry.bearer.auth.client.id` and + /// `schema.registry.bearer.auth.client.secret` config properties. /// OAuthBearer, + /// + /// Metadata based authentication using Azure Instance Metadata Service (IMDS). + /// Only the token endpoint URL and/or query parameters need to be specified. + /// + OAuthBearerAzureIMDS, + /// /// User provides a custom implementation of IAuthenticationHeaderValueProvider. /// diff --git a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs index 84f28b4d0..748c8079a 100644 --- a/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs +++ b/src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs @@ -326,6 +326,35 @@ public CachedSchemaRegistryClient(IEnumerable> conf authenticationHeaderValueProvider = RestService.AuthenticationHeaderValueProvider( config, authenticationHeaderValueProvider, maxRetries, retriesWaitMs, retriesMaxWaitMs); + var propertyNames = new HashSet + { + { SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointQuery }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryKeySubjectNameStrategy }, + { SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy }, + { SchemaRegistryConfig.PropertyNames.SslCaLocation }, + { SchemaRegistryConfig.PropertyNames.SslKeystoreLocation }, + { SchemaRegistryConfig.PropertyNames.SslKeystorePassword }, + { SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification } + }; + foreach (var property in config) { if (!property.Key.StartsWith("schema.registry.")) @@ -333,30 +362,7 @@ public CachedSchemaRegistryClient(IEnumerable> conf continue; } - if (property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRequestTimeoutMs && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxRetries && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesWaitMs && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryRetriesMaxWaitMs && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxConnectionsPerServer && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryMaxCachedSchemas && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryKeySubjectNameStrategy && - property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy && - property.Key != SchemaRegistryConfig.PropertyNames.SslCaLocation && - property.Key != SchemaRegistryConfig.PropertyNames.SslKeystoreLocation && - property.Key != SchemaRegistryConfig.PropertyNames.SslKeystorePassword && - property.Key != SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification) + if (!propertyNames.Contains(property.Key)) { throw new ArgumentException($"Unknown configuration parameter {property.Key}"); } diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/AbstractBearerAuthenticationHeaderValueProvider.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/AbstractBearerAuthenticationHeaderValueProvider.cs new file mode 100644 index 000000000..016a5b5c8 --- /dev/null +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/AbstractBearerAuthenticationHeaderValueProvider.cs @@ -0,0 +1,119 @@ +using System; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading.Tasks; + +namespace Confluent.SchemaRegistry +{ + /// + /// Abstract base class for providers that supply Bearer authentication header values. + /// + public abstract class AbstractBearerAuthenticationHeaderValueProvider : IAuthenticationBearerHeaderValueProvider, IDisposable + { + private readonly string logicalCluster; + private readonly string identityPool; + private readonly int maxRetries; + private readonly int retriesWaitMs; + private readonly int retriesMaxWaitMs; + private string token; + + /// + /// Initializes a new instance of the class. + /// + /// The logical cluster name. + /// The identity pool. + /// The maximum number of retries for token generation. + /// The base wait time in milliseconds between retries. + /// The maximum wait time in milliseconds between retries. + protected AbstractBearerAuthenticationHeaderValueProvider( + string logicalCluster, + string identityPool, + int maxRetries, + int retriesWaitMs, + int retriesMaxWaitMs) + { + this.logicalCluster = logicalCluster; + this.identityPool = identityPool; + this.maxRetries = maxRetries; + this.retriesWaitMs = retriesWaitMs; + this.retriesMaxWaitMs = retriesMaxWaitMs; + } + + /// + public async Task InitOrRefreshAsync() + { + await GenerateToken().ConfigureAwait(false); + } + + /// + public abstract bool NeedsInitOrRefresh(); + + /// + /// Fetches a token using the provided reusable HTTP request message. + /// Can also generate a new request if needed. + /// + /// The HTTP request message to use for fetching the token. + /// A task that represents the asynchronous operation. The task result contains the fetched token as a string. + protected abstract Task FetchToken(HttpRequestMessage request); + + /// + /// Creates an HTTP request message for token acquisition. + /// + /// An HTTP request message configured for token acquisition. + protected abstract HttpRequestMessage CreateTokenRequest(); + + private async Task GenerateToken() + { + var request = CreateTokenRequest(); + + for (int i = 0; i < maxRetries + 1; i++) + { + try + { + token = await FetchToken(request); + return; + } + catch (Exception e) + { + if (i == maxRetries) + { + throw new Exception("Failed to fetch token from server: " + e.Message); + } + await Task.Delay(RetryUtility.CalculateRetryDelay(retriesWaitMs, retriesMaxWaitMs, i)) + .ConfigureAwait(false); + } + } + } + + /// + public AuthenticationHeaderValue GetAuthenticationHeader() + { + if (this.token == null) + { + throw new InvalidOperationException("Token not initialized"); + } + + return new AuthenticationHeaderValue("Bearer", this.token); + } + + /// + public string GetLogicalCluster() => this.logicalCluster; + + /// + public string GetIdentityPool() => this.identityPool; + + /// + /// Disposes of any sensitive information held by this provider. + /// + protected virtual void DisposeSecrets() + { + token = null; + } + + /// + public void Dispose() + { + DisposeSecrets(); + } + } +} \ No newline at end of file diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/BearerAuthenticationHeaderValueProvider.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/BearerAuthenticationHeaderValueProvider.cs index 576411d92..8e12d39b9 100644 --- a/src/Confluent.SchemaRegistry/Rest/Authentication/BearerAuthenticationHeaderValueProvider.cs +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/BearerAuthenticationHeaderValueProvider.cs @@ -1,6 +1,5 @@ using System; using System.Net.Http; -using System.Net.Http.Headers; using System.Threading.Tasks; using System.Collections.Generic; using Newtonsoft.Json; @@ -22,59 +21,75 @@ class BearerToken public double ExpiryTime { get; set; } } - public class BearerAuthenticationHeaderValueProvider : IAuthenticationBearerHeaderValueProvider, IDisposable + /// + /// Provides bearer token authentication header values for schema registry requests + /// based on OAuth2 client credentials flow. + /// + public class BearerAuthenticationHeaderValueProvider : AbstractBearerAuthenticationHeaderValueProvider { private readonly string clientId; private readonly string clientSecret; private readonly string scope; private readonly string tokenEndpoint; - private readonly string logicalCluster; - private readonly string identityPool; - private readonly int maxRetries; - private readonly int retriesWaitMs; - private readonly int retriesMaxWaitMs; private readonly HttpClient httpClient; - private volatile BearerToken token; + private volatile BearerToken tokenObject; private const float tokenExpiryThreshold = 0.8f; + /// + /// Initializes a new instance of the class. + /// + /// The HTTP client used for requests. + /// The OAuth client ID. + /// The OAuth client secret. + /// The OAuth scope. + /// The OAuth token endpoint URL. + /// The logical cluster name. + /// The identity pool. + /// The maximum number of retries. + /// The initial wait time between retries in milliseconds. + /// The maximum wait time between retries in milliseconds. public BearerAuthenticationHeaderValueProvider( HttpClient httpClient, - string clientId, - string clientSecret, - string scope, - string tokenEndpoint, - string logicalCluster, - string identityPool, - int maxRetries, - int retriesWaitMs, + string clientId, + string clientSecret, + string scope, + string tokenEndpoint, + string logicalCluster, + string identityPool, + int maxRetries, + int retriesWaitMs, int retriesMaxWaitMs) + : base(logicalCluster, identityPool, maxRetries, retriesWaitMs, retriesMaxWaitMs) { this.httpClient = httpClient; this.clientId = clientId; this.clientSecret = clientSecret; this.scope = scope; this.tokenEndpoint = tokenEndpoint; - this.logicalCluster = logicalCluster; - this.identityPool = identityPool; - this.maxRetries = maxRetries; - this.retriesWaitMs = retriesWaitMs; - this.retriesMaxWaitMs = retriesMaxWaitMs; } - public async Task InitOrRefreshAsync() + /// + public override bool NeedsInitOrRefresh() { - await GenerateToken().ConfigureAwait(false); + return tokenObject == null || DateTimeOffset.UtcNow.ToUnixTimeSeconds() >= tokenObject.ExpiryTime; } - public bool NeedsInitOrRefresh() + /// + protected override async Task FetchToken(HttpRequestMessage request) { - return token == null || DateTimeOffset.UtcNow.ToUnixTimeSeconds() >= token.ExpiryTime; + var response = await httpClient.SendAsync(request).ConfigureAwait(continueOnCapturedContext: false); + response.EnsureSuccessStatusCode(); + var tokenResponse = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + tokenObject = JObject.Parse(tokenResponse).ToObject(JsonSerializer.Create()); + tokenObject.ExpiryTime = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + (int)(tokenObject.ExpiresIn * tokenExpiryThreshold); + return tokenObject.AccessToken; } - private HttpRequestMessage CreateTokenRequest() + /// + protected override HttpRequestMessage CreateTokenRequest() { HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, tokenEndpoint); - + request.Content = new FormUrlEncodedContent(new[] { new KeyValuePair("grant_type", "client_credentials"), @@ -85,50 +100,12 @@ private HttpRequestMessage CreateTokenRequest() return request; } - - private async Task GenerateToken() - { - var request = CreateTokenRequest(); - - for (int i = 0; i < maxRetries + 1; i++){ - try - { - var response = await httpClient.SendAsync(request).ConfigureAwait(continueOnCapturedContext: false); - response.EnsureSuccessStatusCode(); - var tokenResponse = await response.Content.ReadAsStringAsync().ConfigureAwait(false); - token = JObject.Parse(tokenResponse).ToObject(JsonSerializer.Create()); - token.ExpiryTime = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + (int)(token.ExpiresIn * tokenExpiryThreshold); - return; - } - catch (Exception e) - { - if (i == maxRetries) - { - throw new Exception("Failed to fetch token from server: " + e.Message); - } - await Task.Delay(RetryUtility.CalculateRetryDelay(retriesWaitMs, retriesMaxWaitMs, i)) - .ConfigureAwait(false); - } - } - } - - public AuthenticationHeaderValue GetAuthenticationHeader() - { - if (this.token == null) - { - throw new InvalidOperationException("Token not initialized"); - } - - return new AuthenticationHeaderValue("Bearer", this.token.AccessToken); - } - - public string GetLogicalCluster() => this.logicalCluster; - - public string GetIdentityPool() => this.identityPool; - - public void Dispose() + + /// + protected override void DisposeSecrets() { - this.token = null; + base.DisposeSecrets(); + tokenObject = null; } } } \ No newline at end of file diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/IAuthenticationBearerHeaderValueProvider.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/IAuthenticationBearerHeaderValueProvider.cs index 124db8f08..6c3a51621 100644 --- a/src/Confluent.SchemaRegistry/Rest/Authentication/IAuthenticationBearerHeaderValueProvider.cs +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/IAuthenticationBearerHeaderValueProvider.cs @@ -23,19 +23,17 @@ namespace Confluent.SchemaRegistry /// public interface IAuthenticationBearerHeaderValueProvider : IAuthenticationHeaderValueProvider { - public Task InitOrRefreshAsync(); - - public bool NeedsInitOrRefresh(); /// - /// Get the authentication header for HTTP requests + /// Initializes or refreshes the authentication credentials. /// /// - /// The authentication header for HTTP request messages + /// A task representing the asynchronous initialization or refresh operation. /// - /// + public Task InitOrRefreshAsync(); - AuthenticationHeaderValue GetAuthenticationHeader(); + /// + public bool NeedsInitOrRefresh(); /// /// Get the logical cluster for HTTP requests diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AbstractAuthenticationHeaderValueProviderBuilder.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AbstractAuthenticationHeaderValueProviderBuilder.cs new file mode 100644 index 000000000..9b73cbb91 --- /dev/null +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AbstractAuthenticationHeaderValueProviderBuilder.cs @@ -0,0 +1,66 @@ +// Copyright 2025 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; +using System.Linq; + + +namespace Confluent.SchemaRegistry +{ + + + abstract class AbstractAuthenticationHeaderValueProviderBuilder : IAuthenticationBearerHeaderValueProviderBuilder + { + protected string logicalCluster; + + protected string identityPoolId; + + protected IEnumerable> config; + + private IAuthenticationHeaderValueProvider authenticationHeaderValueProvider; + + protected AbstractAuthenticationHeaderValueProviderBuilder( + IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, + IEnumerable> config) + { + this.authenticationHeaderValueProvider = authenticationHeaderValueProvider; + this.config = config; + } + + protected virtual void Validate() + { + if (authenticationHeaderValueProvider != null) + { + throw new ArgumentException( + $"Invalid authentication header value provider configuration: Cannot specify both custom provider and bearer authentication"); + } + logicalCluster = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster).Value; + + identityPoolId = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId).Value; + if (logicalCluster == null || identityPoolId == null) + { + throw new ArgumentException( + $"Invalid bearer authentication provider configuration: Logical cluster and identity pool ID must be specified"); + } + } + + public abstract IAuthenticationBearerHeaderValueProvider Build( + int maxRetries, int retriesWaitMs, int retriesMaxWaitMs); + } +} diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AzureIMDS/AzureIMDSBearerAuthenticationHeaderValueProvider.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AzureIMDS/AzureIMDSBearerAuthenticationHeaderValueProvider.cs new file mode 100644 index 000000000..c77c191f2 --- /dev/null +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AzureIMDS/AzureIMDSBearerAuthenticationHeaderValueProvider.cs @@ -0,0 +1,97 @@ +// Copyright 2025 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Net.Http; +using System.Threading.Tasks; +using System.Collections.Generic; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Confluent.SchemaRegistry +{ + class AzureIMDSBearerToken + { + [JsonProperty("access_token")] + public string AccessToken { get; set; } + [JsonProperty("expires_in")] + public int ExpiresIn { get; set; } + [JsonIgnore] + public double ExpiryTime { get; set; } + } + + /// + /// Provider for authentication header values that uses Azure Instance Metadata Service (IMDS) to obtain bearer tokens. + /// + public class AzureIMDSBearerAuthenticationHeaderValueProvider : AbstractBearerAuthenticationHeaderValueProvider + { + private readonly string tokenEndpoint; + private readonly HttpClient httpClient; + private volatile AzureIMDSBearerToken tokenObject; + private const float tokenExpiryThreshold = 0.8f; + + /// + /// Initializes a new instance of the class. + /// + /// The HTTP client used to make requests. + /// The endpoint URL to request tokens from. + /// The logical cluster name. + /// The identity pool identifier. + /// The maximum number of retry attempts. + /// The initial wait time between retries in milliseconds. + /// The maximum wait time between retries in milliseconds. + public AzureIMDSBearerAuthenticationHeaderValueProvider( + HttpClient httpClient, + string tokenEndpoint, + string logicalCluster, + string identityPool, + int maxRetries, + int retriesWaitMs, + int retriesMaxWaitMs) + : base(logicalCluster, identityPool, maxRetries, retriesWaitMs, retriesMaxWaitMs) + { + this.httpClient = httpClient; + this.tokenEndpoint = tokenEndpoint; + } + + /// + public override bool NeedsInitOrRefresh() + { + return tokenObject == null || DateTimeOffset.UtcNow.ToUnixTimeSeconds() >= tokenObject.ExpiryTime; + } + + /// + protected override async Task FetchToken(HttpRequestMessage request) + { + var response = await httpClient.SendAsync(request).ConfigureAwait(continueOnCapturedContext: false); + response.EnsureSuccessStatusCode(); + var tokenResponse = await response.Content.ReadAsStringAsync().ConfigureAwait(false); + tokenObject = JObject.Parse(tokenResponse).ToObject(JsonSerializer.Create()); + tokenObject.ExpiryTime = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + (int)(tokenObject.ExpiresIn * tokenExpiryThreshold); + return tokenObject.AccessToken; + } + + /// + protected override HttpRequestMessage CreateTokenRequest() + { + HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, tokenEndpoint); + + request.Headers.Add("Metadata", "true"); + + return request; + } + } +} \ No newline at end of file diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AzureIMDS/AzureIMDSBearerAuthenticationHeaderValueProviderBuilder.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AzureIMDS/AzureIMDSBearerAuthenticationHeaderValueProviderBuilder.cs new file mode 100644 index 000000000..d238fbf96 --- /dev/null +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/AzureIMDS/AzureIMDSBearerAuthenticationHeaderValueProviderBuilder.cs @@ -0,0 +1,76 @@ +// Copyright 2025 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Linq; +using System.Collections.Generic; +using System.Net.Http; + + +namespace Confluent.SchemaRegistry +{ + class AzureIMDSBearerAuthenticationHeaderValueProviderBuilder : AbstractAuthenticationHeaderValueProviderBuilder + { + private string tokenEndpointUrl = "http://169.254.169.254/metadata/identity/oauth2/token"; + + internal AzureIMDSBearerAuthenticationHeaderValueProviderBuilder( + IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, + IEnumerable> config) : base(authenticationHeaderValueProvider, config) + { + } + + protected override void Validate() + { + base.Validate(); + + bool tokenEndpointUrlOverride = false; + var tokenEndpointUrl = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl).Value; + + if (tokenEndpointUrl != null) + { + this.tokenEndpointUrl = tokenEndpointUrl; + tokenEndpointUrlOverride = true; + } + + var tokenQuery = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointQuery).Value; + + if (tokenQuery != null) + { + var requestWithQuery = new UriBuilder(new Uri(this.tokenEndpointUrl)); + requestWithQuery.Query = tokenQuery; + requestWithQuery.Fragment = ""; + this.tokenEndpointUrl = requestWithQuery.Uri.ToString(); + } + else if (!tokenEndpointUrlOverride) + { + throw new ArgumentException($"Missing required configuration property: {SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointQuery}"); + } + } + + public override IAuthenticationBearerHeaderValueProvider Build( + int maxRetries, int retriesWaitMs, int retriesMaxWaitMs) + { + Validate(); + return new AzureIMDSBearerAuthenticationHeaderValueProvider( + new HttpClient(), + tokenEndpointUrl, + logicalCluster, identityPoolId, + maxRetries, retriesWaitMs, retriesMaxWaitMs); + } + } +} \ No newline at end of file diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/BearerAuthenticationHeaderValueProviderBuilder.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/BearerAuthenticationHeaderValueProviderBuilder.cs new file mode 100644 index 000000000..70bef46de --- /dev/null +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/BearerAuthenticationHeaderValueProviderBuilder.cs @@ -0,0 +1,73 @@ +// Copyright 2025 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Linq; +using System.Collections.Generic; +using System.Net.Http; + + +namespace Confluent.SchemaRegistry +{ + class BearerAuthenticationHeaderValueProviderBuilder : AbstractAuthenticationHeaderValueProviderBuilder + { + private string clientId; + private string clientSecret; + private string tokenEndpointUrl; + private string scope; + + internal BearerAuthenticationHeaderValueProviderBuilder( + IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, + IEnumerable> config) : base(authenticationHeaderValueProvider, config) + { + } + + protected override void Validate() + { + base.Validate(); + + clientId = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId).Value; + + clientSecret = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret).Value; + + scope = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope).Value; + + tokenEndpointUrl = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl).Value; + + if (tokenEndpointUrl == null || clientId == null || clientSecret == null || scope == null) + { + throw new ArgumentException( + $"Invalid bearer authentication provider configuration: Token endpoint URL, client ID, client secret, and scope must be specified"); + } + } + + public override IAuthenticationBearerHeaderValueProvider Build( + int maxRetries, int retriesWaitMs, int retriesMaxWaitMs) + { + Validate(); + return new BearerAuthenticationHeaderValueProvider( + new HttpClient(), + clientId, clientSecret, scope, + tokenEndpointUrl, + logicalCluster, identityPoolId, + maxRetries, retriesWaitMs, retriesMaxWaitMs); + } + } +} \ No newline at end of file diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/CustomAuthenticationHeaderValueProviderBuilder.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/CustomAuthenticationHeaderValueProviderBuilder.cs new file mode 100644 index 000000000..b0fa86225 --- /dev/null +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/CustomAuthenticationHeaderValueProviderBuilder.cs @@ -0,0 +1,56 @@ +// Copyright 2025 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; + + +namespace Confluent.SchemaRegistry +{ + class CustomAuthenticationHeaderValueProviderBuilder : IAuthenticationBearerHeaderValueProviderBuilder + { + private IAuthenticationHeaderValueProvider authenticationHeaderValueProvider; + + internal CustomAuthenticationHeaderValueProviderBuilder( + IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, + IEnumerable> config) + { + this.authenticationHeaderValueProvider = authenticationHeaderValueProvider; + } + + private void Validate() + { + if (authenticationHeaderValueProvider == null) + { + throw new ArgumentException( + $"Invalid authentication header value provider configuration: Custom authentication provider must be specified"); + } + if (!(authenticationHeaderValueProvider is IAuthenticationBearerHeaderValueProvider)) + { + throw new ArgumentException( + $"Invalid authentication header value provider configuration: Custom authentication provider must implement IAuthenticationBearerHeaderValueProvider"); + } + } + + public IAuthenticationBearerHeaderValueProvider Build( + int maxRetries, int retriesWaitMs, int retriesMaxWaitMs) + { + Validate(); + return (IAuthenticationBearerHeaderValueProvider) + authenticationHeaderValueProvider; + } + } +} diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/IAuthenticationBearerHeaderValueProviderBuilder.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/IAuthenticationBearerHeaderValueProviderBuilder.cs new file mode 100644 index 000000000..389ed1350 --- /dev/null +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/IAuthenticationBearerHeaderValueProviderBuilder.cs @@ -0,0 +1,25 @@ +// Copyright 2025 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + + +namespace Confluent.SchemaRegistry +{ + interface IAuthenticationBearerHeaderValueProviderBuilder + { + IAuthenticationBearerHeaderValueProvider Build( + int maxRetries, int retriesWaitMs, int retriesMaxWaitMs); + } +} diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/StaticAuthenticationHeaderValueProviderBuilder.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/StaticAuthenticationHeaderValueProviderBuilder.cs new file mode 100644 index 000000000..28e921445 --- /dev/null +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/OAuth/StaticAuthenticationHeaderValueProviderBuilder.cs @@ -0,0 +1,57 @@ +// Copyright 2025 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; +using System.Linq; + + +namespace Confluent.SchemaRegistry +{ + class StaticAuthenticationHeaderValueProviderBuilder : + AbstractAuthenticationHeaderValueProviderBuilder + { + private string bearerToken; + + internal StaticAuthenticationHeaderValueProviderBuilder( + IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, + IEnumerable> config) : + base(authenticationHeaderValueProvider, config) + { + } + + protected override void Validate() + { + base.Validate(); + bearerToken = config.FirstOrDefault(prop => + prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken).Value; + + if (bearerToken == null) + { + throw new ArgumentException( + $"Invalid authentication header value provider configuration: Bearer authentication token not specified"); + } + } + + public override IAuthenticationBearerHeaderValueProvider Build( + int maxRetries, int retriesWaitMs, int retriesMaxWaitMs) + { + Validate(); + return new StaticBearerAuthenticationHeaderValueProvider( + bearerToken, logicalCluster, identityPoolId); + } + } +} diff --git a/src/Confluent.SchemaRegistry/Rest/Authentication/StaticBearerAuthenticationHeaderValueProvider.cs b/src/Confluent.SchemaRegistry/Rest/Authentication/StaticBearerAuthenticationHeaderValueProvider.cs index b2da14418..445d28639 100644 --- a/src/Confluent.SchemaRegistry/Rest/Authentication/StaticBearerAuthenticationHeaderValueProvider.cs +++ b/src/Confluent.SchemaRegistry/Rest/Authentication/StaticBearerAuthenticationHeaderValueProvider.cs @@ -10,13 +10,21 @@ namespace Confluent.SchemaRegistry { - + /// + /// Provides authentication header values using a static bearer token. + /// public class StaticBearerAuthenticationHeaderValueProvider : IAuthenticationBearerHeaderValueProvider, IDisposable { private readonly string token; private readonly string logicalCluster; private readonly string identityPool; + /// + /// Initializes a new instance of the class. + /// + /// The bearer token to use for authentication. + /// The logical cluster identifier. + /// The identity pool identifier. public StaticBearerAuthenticationHeaderValueProvider(string token, string logicalCluster, string identityPool) { this.token = token; @@ -24,23 +32,29 @@ public StaticBearerAuthenticationHeaderValueProvider(string token, string logica this.identityPool = identityPool; } - public async Task InitOrRefreshAsync() + /// + public Task InitOrRefreshAsync() { - return; + return Task.CompletedTask; } + /// public bool NeedsInitOrRefresh() { return false; } + /// public AuthenticationHeaderValue GetAuthenticationHeader() => new AuthenticationHeaderValue("Bearer", token); + /// public string GetLogicalCluster() => this.logicalCluster; + /// public string GetIdentityPool() => this.identityPool; + /// public void Dispose() { return; diff --git a/src/Confluent.SchemaRegistry/Rest/RestService.cs b/src/Confluent.SchemaRegistry/Rest/RestService.cs index 3fb593fbe..ad752602c 100644 --- a/src/Confluent.SchemaRegistry/Rest/RestService.cs +++ b/src/Confluent.SchemaRegistry/Rest/RestService.cs @@ -67,6 +67,27 @@ public class RestService : IRestService private int retriesMaxWaitMs; + private static readonly + Dictionary>, + IAuthenticationBearerHeaderValueProviderBuilder>> + bearerAuthenticationHeaderValueProviderBuilders = + new Dictionary>, + IAuthenticationBearerHeaderValueProviderBuilder>>() + { + {"STATIC_TOKEN", (authenticationHeaderValueProvider, config) => + new StaticAuthenticationHeaderValueProviderBuilder(authenticationHeaderValueProvider, config) }, + {"CUSTOM", (authenticationHeaderValueProvider, config) => + new CustomAuthenticationHeaderValueProviderBuilder(authenticationHeaderValueProvider, config) }, + {"OAUTHBEARER", (authenticationHeaderValueProvider, config) => + new BearerAuthenticationHeaderValueProviderBuilder(authenticationHeaderValueProvider, config) }, + {"OAUTHBEARER_AZURE_IMDS", (authenticationHeaderValueProvider, config) => + new AzureIMDSBearerAuthenticationHeaderValueProviderBuilder(authenticationHeaderValueProvider, config) } + }; + /// /// Initializes a new instance of the RestService class. /// @@ -677,88 +698,19 @@ protected internal static IAuthenticationHeaderValueProvider AuthenticationHeade $"Invalid authentication header value provider configuration: Cannot specify both basic and bearer authentication"); } - string logicalCluster = null; - string identityPoolId = null; - string bearerToken = null; - string clientId = null; - string clientSecret = null; - string scope = null; - string tokenEndpointUrl = null; - - if (bearerAuthSource == "STATIC_TOKEN" || bearerAuthSource == "OAUTHBEARER") + if (bearerAuthSource != "") { - if (authenticationHeaderValueProvider != null) - { - throw new ArgumentException( - $"Invalid authentication header value provider configuration: Cannot specify both custom provider and bearer authentication"); - } - logicalCluster = config.FirstOrDefault(prop => - prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster).Value; - - identityPoolId = config.FirstOrDefault(prop => - prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId).Value; - if (logicalCluster == null || identityPoolId == null) + if (!bearerAuthenticationHeaderValueProviderBuilders.ContainsKey(bearerAuthSource)) { throw new ArgumentException( - $"Invalid bearer authentication provider configuration: Logical cluster and identity pool ID must be specified"); + $"Invalid value '{bearerAuthSource}' specified for property '{SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource}'"); } - } - - switch (bearerAuthSource) - { - case "STATIC_TOKEN": - bearerToken = config.FirstOrDefault(prop => - prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken).Value; - - if (bearerToken == null) - { - throw new ArgumentException( - $"Invalid authentication header value provider configuration: Bearer authentication token not specified"); - } - authenticationHeaderValueProvider = new StaticBearerAuthenticationHeaderValueProvider(bearerToken, logicalCluster, identityPoolId); - break; - - case "OAUTHBEARER": - clientId = config.FirstOrDefault(prop => - prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId).Value; - clientSecret = config.FirstOrDefault(prop => - prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret).Value; - - scope = config.FirstOrDefault(prop => - prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope).Value; - - tokenEndpointUrl = config.FirstOrDefault(prop => - prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl).Value; - - if (tokenEndpointUrl == null || clientId == null || clientSecret == null || scope == null) - { - throw new ArgumentException( - $"Invalid bearer authentication provider configuration: Token endpoint URL, client ID, client secret, and scope must be specified"); - } - authenticationHeaderValueProvider = new BearerAuthenticationHeaderValueProvider( - new HttpClient(), clientId, clientSecret, scope, tokenEndpointUrl, logicalCluster, identityPoolId, maxRetries, retriesWaitMs, retriesMaxWaitMs); - break; - - case "CUSTOM": - if (authenticationHeaderValueProvider == null) - { - throw new ArgumentException( - $"Invalid authentication header value provider configuration: Custom authentication provider must be specified"); - } - if(!(authenticationHeaderValueProvider is IAuthenticationBearerHeaderValueProvider)) - { - throw new ArgumentException( - $"Invalid authentication header value provider configuration: Custom authentication provider must implement IAuthenticationBearerHeaderValueProvider"); - } - break; - - case "": - break; - - default: - throw new ArgumentException( - $"Invalid value '{bearerAuthSource}' specified for property '{SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource}'"); + var bearerAuthenticationHeaderValueProviderBuilder = + bearerAuthenticationHeaderValueProviderBuilders[bearerAuthSource]; + authenticationHeaderValueProvider = bearerAuthenticationHeaderValueProviderBuilder( + authenticationHeaderValueProvider, config) + .Build(maxRetries, retriesWaitMs, retriesMaxWaitMs); } return authenticationHeaderValueProvider; diff --git a/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs b/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs index 0aad2cca3..cdb2bed7b 100644 --- a/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs +++ b/src/Confluent.SchemaRegistry/SchemaRegistryConfig.cs @@ -149,6 +149,12 @@ public static class PropertyNames /// public const string SchemaRegistryBearerAuthTokenEndpointUrl = "schema.registry.bearer.auth.token.endpoint.url"; + /// + /// Specifies the token query parameters for the bearer token endpoint. + /// Currently valid only when using Azure IMDS for token retrieval. + /// + public const string SchemaRegistryBearerAuthTokenEndpointQuery = "schema.registry.bearer.auth.token.endpoint.query"; + /// /// Key subject name strategy. /// @@ -409,6 +415,11 @@ public BearerAuthCredentialsSource? BearerAuthCredentialsSource return Confluent.SchemaRegistry.BearerAuthCredentialsSource.OAuthBearer; } + if (r == "OAUTHBEARER_AZURE_IMDS") + { + return Confluent.SchemaRegistry.BearerAuthCredentialsSource.OAuthBearerAzureIMDS; + } + if (r == "CUSTOM") { return Confluent.SchemaRegistry.BearerAuthCredentialsSource.Custom; @@ -431,6 +442,10 @@ public BearerAuthCredentialsSource? BearerAuthCredentialsSource { this.properties[PropertyNames.SchemaRegistryBearerAuthCredentialsSource] = "OAUTHBEARER"; } + else if (value == Confluent.SchemaRegistry.BearerAuthCredentialsSource.OAuthBearerAzureIMDS) + { + this.properties[PropertyNames.SchemaRegistryBearerAuthCredentialsSource] = "OAUTHBEARER_AZURE_IMDS"; + } else if (value == Confluent.SchemaRegistry.BearerAuthCredentialsSource.Custom) { this.properties[PropertyNames.SchemaRegistryBearerAuthCredentialsSource] = "CUSTOM"; @@ -506,6 +521,17 @@ public string BearerAuthTokenEndpointUrl set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl, value); } } + /// + /// Specifies the token query parameters for the bearer token endpoint. + /// Currently valid only when using Azure IMDS for token retrieval. + /// + public string BearerAuthTokenEndpointQuery + { + get { return Get(SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointQuery); } + set { SetObject(SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointQuery, value); } + } + + /// /// Key subject name strategy. /// diff --git a/test/Confluent.SchemaRegistry.UnitTests/CachedSchemaRegistryClient.cs b/test/Confluent.SchemaRegistry.UnitTests/CachedSchemaRegistryClient.cs index 0464b1a27..3112b7d5c 100644 --- a/test/Confluent.SchemaRegistry.UnitTests/CachedSchemaRegistryClient.cs +++ b/test/Confluent.SchemaRegistry.UnitTests/CachedSchemaRegistryClient.cs @@ -96,6 +96,78 @@ public void BearerAuthWithOAuthBearer() Assert.Null(client.AuthHeaderProvider); } + [Fact] + public void BearerAuthWithOAuthBearerAzureIMDS() + { + // Override token url and specify query parameters + var config = new SchemaRegistryConfig + { + Url = "irrelevanthost:8081", + BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS, + BearerAuthTokenEndpointUrl = "https://test.com/token", + BearerAuthTokenEndpointQuery = "resource=&client_id=&api-version=", + BearerAuthLogicalCluster = "test-cluster", + BearerAuthIdentityPoolId = "test-pool" + }; + var client = new CachedSchemaRegistryClient(config); + Assert.Null(client.AuthHeaderProvider); + + // Specify query parameters only, token url defaults to IMDS endpoint + config = new SchemaRegistryConfig + { + Url = "irrelevanthost:8081", + BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS, + BearerAuthTokenEndpointQuery = "resource=&client_id=&api-version=", + BearerAuthLogicalCluster = "test-cluster", + BearerAuthIdentityPoolId = "test-pool" + }; + client = new CachedSchemaRegistryClient(config); + Assert.Null(client.AuthHeaderProvider); + + // Specify query parameters together with a different token url + config = new SchemaRegistryConfig + { + Url = "irrelevanthost:8081", + BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS, + BearerAuthTokenEndpointQuery = "https://test.com/token?resource=&client_id=&api-version=", + BearerAuthLogicalCluster = "test-cluster", + BearerAuthIdentityPoolId = "test-pool" + }; + client = new CachedSchemaRegistryClient(config); + Assert.Null(client.AuthHeaderProvider); + + // Throws an `ArgumentException` when both `BearerAuthTokenEndpointUrl` + // and `BearerAuthTokenEndpointQuery` are missing + config = new SchemaRegistryConfig + { + Url = "irrelevanthost:8081", + BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS, + BearerAuthLogicalCluster = "test-cluster", + BearerAuthIdentityPoolId = "test-pool" + }; + Assert.Throws(() => new CachedSchemaRegistryClient(config)); + + // Throws an `ArgumentException` when `BearerAuthLogicalCluster` is missing + config = new SchemaRegistryConfig + { + Url = "irrelevanthost:8081", + BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS, + BearerAuthTokenEndpointQuery = "https://test.com/token?resource=&client_id=&api-version=", + BearerAuthIdentityPoolId = "test-pool" + }; + Assert.Throws(() => new CachedSchemaRegistryClient(config)); + + // Throws an `ArgumentException` when `BearerAuthIdentityPoolId` is missing + config = new SchemaRegistryConfig + { + Url = "irrelevanthost:8081", + BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS, + BearerAuthTokenEndpointQuery = "https://test.com/token?resource=&client_id=&api-version=", + BearerAuthLogicalCluster = "test-cluster", + }; + Assert.Throws(() => new CachedSchemaRegistryClient(config)); + } + [Fact] public void CustomAuthProvider() { diff --git a/test/Confluent.SchemaRegistry.UnitTests/Rest/Authentication/StaticBearerAuthenticationHeaderValueProvider.cs b/test/Confluent.SchemaRegistry.UnitTests/Rest/Authentication/StaticBearerAuthenticationHeaderValueProvider.cs index 282a9d55d..64346e5b9 100644 --- a/test/Confluent.SchemaRegistry.UnitTests/Rest/Authentication/StaticBearerAuthenticationHeaderValueProvider.cs +++ b/test/Confluent.SchemaRegistry.UnitTests/Rest/Authentication/StaticBearerAuthenticationHeaderValueProvider.cs @@ -19,12 +19,13 @@ public async Task ProviderInitializedOrExpired() } [Fact] - public async Task GetAuthenticationHeader() + public void GetAuthenticationHeader() { var provider = new StaticBearerAuthenticationHeaderValueProvider(token, logicalCluster, identityPool); var header = provider.GetAuthenticationHeader(); Assert.Equal("Bearer test-token", header.ToString()); } + [Fact] public void GetLogicalClusterAndIdentityPool() {