diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d92a3b56..38409abbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# 2.12.0 + +## Enhancements + +* References librdkafka.redist 2.12.0. Refer to the [librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) for more information. +* OAuth OIDC method example for Kafka metadata based authentication with + an Azure IMDS endpoint using an attached managed identity as principal (#2526). + + # 2.11.1 ## Enhancements diff --git a/examples/OAuthConsumer/Program.cs b/examples/OAuthConsumer/Program.cs index 8a651e167..5b3a5eac3 100644 --- a/examples/OAuthConsumer/Program.cs +++ b/examples/OAuthConsumer/Program.cs @@ -50,15 +50,15 @@ public class Program public static async Task Main(string[] args) { - if (args.Length != 5) + if (args.Length != 4) { Console.WriteLine("Usage: .. brokerList topic group \"principal= scope=\""); return; } - string bootstrapServers = args[1]; - string topicName = args[2]; - string groupId = args[3]; - string oauthConf = args[4]; + string bootstrapServers = args[0]; + string topicName = args[1]; + string groupId = args[2]; + string oauthConf = args[3]; if (!Regex.IsMatch(oauthConf, OauthConfigRegexPattern)) { diff --git a/examples/OAuthOIDC/Program.cs b/examples/OAuthOIDC/Program.cs index bb762bae8..df7d1f43a 100644 --- a/examples/OAuthOIDC/Program.cs +++ b/examples/OAuthOIDC/Program.cs @@ -39,12 +39,12 @@ public class Program private const String OAuthBearerScope = ""; public static async Task Main(string[] args) { - if (args.Length != 2) + if (args.Length != 1) { Console.WriteLine("Usage: .. brokerList"); return; } - var bootstrapServers = args[1]; + var bootstrapServers = args[0]; var topicName = Guid.NewGuid().ToString(); var groupId = Guid.NewGuid().ToString(); @@ -142,12 +142,12 @@ public static async Task Main(string[] args) } } - private static void createTopic(ClientConfig config, String topicName) + private static void createTopic(ClientConfig config, string topicName) { using (var adminClient = new AdminClientBuilder(config).Build()) { adminClient.CreateTopicsAsync(new TopicSpecification[] { - new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait(); ; + new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait(); } } } diff --git a/examples/OAuthOIDCAzureIMDS/OAuthOIDCAzureIMDS.csproj b/examples/OAuthOIDCAzureIMDS/OAuthOIDCAzureIMDS.csproj new file mode 100644 index 000000000..beaaf8ff1 --- /dev/null +++ b/examples/OAuthOIDCAzureIMDS/OAuthOIDCAzureIMDS.csproj @@ -0,0 +1,16 @@ + + + + {FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} + OAuthOIDCAzureIMDS + net8.0 + Exe + 7.1 + + + + + + + + diff --git a/examples/OAuthOIDCAzureIMDS/Program.cs b/examples/OAuthOIDCAzureIMDS/Program.cs new file mode 100644 index 000000000..e2782efa4 --- /dev/null +++ b/examples/OAuthOIDCAzureIMDS/Program.cs @@ -0,0 +1,156 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Confluent.Kafka.SyncOverAsync; + +/// +/// An example demonstrating how to produce a message to +/// a topic, and then reading it back again using a consumer. +/// The authentication uses the OpenID Connect method of the OAUTHBEARER SASL mechanism. +/// The token is acquired from the Azure Instance Metadata Service (IMDS) +/// using metadata based secret-less authentication. +/// +namespace Confluent.Kafka.Examples.OAuthOIDCAzureIMDS +{ + + public class Program + { + private const string azureIMDSQueryParams = "api-version=&resource=&client_id="; + private const string kafkaLogicalCluster = "your-logical-cluster"; + private const string identityPoolId = "your-identity-pool-id"; + + public static async Task Main(string[] args) + { + if (args.Length != 1) + { + Console.WriteLine("Usage: .. brokerList"); + return; + } + var bootstrapServers = args[0]; + var topicName = Guid.NewGuid().ToString(); + var groupId = Guid.NewGuid().ToString(); + + var commonConfig = new ClientConfig + { + BootstrapServers = bootstrapServers, + SecurityProtocol = SecurityProtocol.SaslPlaintext, + SaslMechanism = SaslMechanism.OAuthBearer, + SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc, + SaslOauthbearerMetadataAuthenticationType = SaslOauthbearerMetadataAuthenticationType.AzureIMDS, + SaslOauthbearerConfig = $"query={azureIMDSQueryParams}", + SaslOauthbearerExtensions = $"logicalCluster={kafkaLogicalCluster},identityPoolId={identityPoolId}" + }; + + var consumerConfig = new ConsumerConfig + { + BootstrapServers = bootstrapServers, + SecurityProtocol = SecurityProtocol.SaslPlaintext, + SaslMechanism = SaslMechanism.OAuthBearer, + SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc, + GroupId = groupId, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoOffsetStore = false + }; + + try + { + createTopic(commonConfig, topicName); + } + catch (CreateTopicsException e) + { + Console.WriteLine($"An error occurred creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + Environment.Exit(1); + } + + using (var producer = new ProducerBuilder(commonConfig) + .Build()) + using (var consumer = new ConsumerBuilder(consumerConfig) + .Build()) + { + consumer.Subscribe(topicName); + + var cancelled = false; + CancellationTokenSource cts = new CancellationTokenSource(); + + Console.CancelKeyPress += (_, e) => + { + e.Cancel = true; // prevent the process from terminating. + cancelled = true; + cts.Cancel(); + }; + + try + { + while (!cancelled) + { + var msg = "User"; + + try + { + var deliveryReport = await producer.ProduceAsync(topicName, new Message { Value = msg }); + Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}, {msg}"); + } + catch (ProduceException e) + { + Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]"); + } + + try + { + var consumeResult = consumer.Consume(cts.Token); + Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}"); + try + { + consumer.StoreOffset(consumeResult); + } + catch (KafkaException e) + { + Console.WriteLine($"Store Offset error: {e.Error.Reason}"); + } + } + catch (ConsumeException e) + { + Console.WriteLine($"Consume error: {e.Error.Reason}"); + } + } + } + catch (OperationCanceledException) + { + Console.WriteLine("Closing consumer."); + consumer.Close(); + } + } + } + + private static void createTopic(ClientConfig config, string topicName) + { + using (var adminClient = new AdminClientBuilder(config).Build()) + { + adminClient.CreateTopicsAsync(new TopicSpecification[] { + new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait(); + } + } + } + +} diff --git a/examples/OAuthProducer/Program.cs b/examples/OAuthProducer/Program.cs index 48759b0fc..f5725243d 100644 --- a/examples/OAuthProducer/Program.cs +++ b/examples/OAuthProducer/Program.cs @@ -50,14 +50,14 @@ public class Program public static async Task Main(string[] args) { - if (args.Length != 4) + if (args.Length != 3) { Console.WriteLine("Usage: .. brokerList topic \"principal= scope=\""); return; } - string bootstrapServers = args[1]; - string topicName = args[2]; - string oauthConf = args[3]; + string bootstrapServers = args[0]; + string topicName = args[1]; + string oauthConf = args[2]; if (!Regex.IsMatch(oauthConf, OauthConfigRegexPattern)) { diff --git a/src/ConfigGen/Program.cs b/src/ConfigGen/Program.cs index 8e7c2c9bb..66b52f46d 100644 --- a/src/ConfigGen/Program.cs +++ b/src/ConfigGen/Program.cs @@ -320,6 +320,7 @@ static string ConfigNameToDotnetName(string configName) { "resolve_canonical_bootstrap_servers_only", "ResolveCanonicalBootstrapServersOnly"}, { "client_credentials", "ClientCredentials"}, { "urn:ietf:params:oauth:grant-type:jwt-bearer", "JwtBearer"}, + { "azure_imds", "AzureIMDS"}, }; static string EnumNameToDotnetName(string enumName) diff --git a/src/Confluent.Kafka/Config.cs b/src/Confluent.Kafka/Config.cs index 511139126..5d62addd1 100644 --- a/src/Confluent.Kafka/Config.cs +++ b/src/Confluent.Kafka/Config.cs @@ -37,7 +37,8 @@ public class Config : IEnumerable> { "readuncommitted", "read_uncommitted" }, { "cooperativesticky", "cooperative-sticky" }, { "usealldnsips", "use_all_dns_ips"}, - { "resolvecanonicalbootstrapserversonly", "resolve_canonical_bootstrap_servers_only"} + { "resolvecanonicalbootstrapserversonly", "resolve_canonical_bootstrap_servers_only"}, + { "azureimds", "azure_imds"}, }; /// diff --git a/src/Confluent.Kafka/Config_gen.cs b/src/Confluent.Kafka/Config_gen.cs index 8562a0a23..e2c346c45 100644 --- a/src/Confluent.Kafka/Config_gen.cs +++ b/src/Confluent.Kafka/Config_gen.cs @@ -1,4 +1,4 @@ -// *** Auto-generated from librdkafka v2.11.1 *** - do not modify manually. +// *** Auto-generated from librdkafka v2.12.0-RC2 *** - do not modify manually. // // Copyright 2018-2022 Confluent Inc. // @@ -203,6 +203,22 @@ public enum SaslOauthbearerAssertionAlgorithm ES256 } + /// + /// SaslOauthbearerMetadataAuthenticationType enum values + /// + public enum SaslOauthbearerMetadataAuthenticationType + { + /// + /// None + /// + None, + + /// + /// AzureIMDS + /// + AzureIMDS + } + /// /// PartitionAssignmentStrategy enum values /// @@ -1318,6 +1334,16 @@ public Acks? Acks /// public string SaslOauthbearerAssertionJwtTemplateFile { get { return Get("sasl.oauthbearer.assertion.jwt.template.file"); } set { this.SetObject("sasl.oauthbearer.assertion.jwt.template.file", value); } } + /// + /// + /// + public SaslOauthbearerMetadataAuthenticationType? SaslOauthbearerMetadataAuthenticationType { get { return (SaslOauthbearerMetadataAuthenticationType?)GetEnum(typeof(SaslOauthbearerMetadataAuthenticationType), "sasl.oauthbearer.metadata.authentication.type"); } set { this.SetObject("sasl.oauthbearer.metadata.authentication.type", value); } } + /// ///