Skip to content

Commit 8d16bbd

Browse files
authored
OAuth Azure IMDS Kafka example (#2526)
* Fix command line arguments position in some OAuth examples and azure_imds value mapping
1 parent 1d7eb4c commit 8d16bbd

File tree

6 files changed

+187
-13
lines changed

6 files changed

+187
-13
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
## Enhancements
44

55
* References librdkafka.redist 2.12.0. Refer to the [librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) for more information.
6+
* OAuth OIDC method example for Kafka metadata based authentication with
7+
an Azure IMDS endpoint using an attached managed identity as principal (#2526).
68

79

810
# 2.11.1

examples/OAuthConsumer/Program.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ public class Program
5050

5151
public static async Task Main(string[] args)
5252
{
53-
if (args.Length != 5)
53+
if (args.Length != 4)
5454
{
5555
Console.WriteLine("Usage: .. brokerList topic group \"principal=<value> scope=<scope>\"");
5656
return;
5757
}
58-
string bootstrapServers = args[1];
59-
string topicName = args[2];
60-
string groupId = args[3];
61-
string oauthConf = args[4];
58+
string bootstrapServers = args[0];
59+
string topicName = args[1];
60+
string groupId = args[2];
61+
string oauthConf = args[3];
6262

6363
if (!Regex.IsMatch(oauthConf, OauthConfigRegexPattern))
6464
{

examples/OAuthOIDC/Program.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ public class Program
3939
private const String OAuthBearerScope = "<scope>";
4040
public static async Task Main(string[] args)
4141
{
42-
if (args.Length != 2)
42+
if (args.Length != 1)
4343
{
4444
Console.WriteLine("Usage: .. brokerList");
4545
return;
4646
}
47-
var bootstrapServers = args[1];
47+
var bootstrapServers = args[0];
4848
var topicName = Guid.NewGuid().ToString();
4949
var groupId = Guid.NewGuid().ToString();
5050

@@ -142,12 +142,12 @@ public static async Task Main(string[] args)
142142
}
143143
}
144144

145-
private static void createTopic(ClientConfig config, String topicName)
145+
private static void createTopic(ClientConfig config, string topicName)
146146
{
147147
using (var adminClient = new AdminClientBuilder(config).Build())
148148
{
149149
adminClient.CreateTopicsAsync(new TopicSpecification[] {
150-
new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait(); ;
150+
new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait();
151151
}
152152
}
153153
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<ProjectTypeGuids>{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
5+
<AssemblyName>OAuthOIDCAzureIMDS</AssemblyName>
6+
<TargetFramework>net8.0</TargetFramework>
7+
<OutputType>Exe</OutputType>
8+
<LangVersion>7.1</LangVersion>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.11.1" /> -->
13+
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
14+
</ItemGroup>
15+
16+
</Project>
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright 2025 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Text;
20+
using System.Text.RegularExpressions;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Confluent.Kafka;
24+
using Confluent.Kafka.Admin;
25+
using Confluent.Kafka.SyncOverAsync;
26+
27+
/// <summary>
28+
/// An example demonstrating how to produce a message to
29+
/// a topic, and then reading it back again using a consumer.
30+
/// The authentication uses the OpenID Connect method of the OAUTHBEARER SASL mechanism.
31+
/// The token is acquired from the Azure Instance Metadata Service (IMDS)
32+
/// using metadata based secret-less authentication.
33+
/// </summary>
34+
namespace Confluent.Kafka.Examples.OAuthOIDCAzureIMDS
35+
{
36+
37+
public class Program
38+
{
39+
private const string azureIMDSQueryParams = "api-version=&resource=&client_id=";
40+
private const string kafkaLogicalCluster = "your-logical-cluster";
41+
private const string identityPoolId = "your-identity-pool-id";
42+
43+
public static async Task Main(string[] args)
44+
{
45+
if (args.Length != 1)
46+
{
47+
Console.WriteLine("Usage: .. brokerList");
48+
return;
49+
}
50+
var bootstrapServers = args[0];
51+
var topicName = Guid.NewGuid().ToString();
52+
var groupId = Guid.NewGuid().ToString();
53+
54+
var commonConfig = new ClientConfig
55+
{
56+
BootstrapServers = bootstrapServers,
57+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
58+
SaslMechanism = SaslMechanism.OAuthBearer,
59+
SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
60+
SaslOauthbearerMetadataAuthenticationType = SaslOauthbearerMetadataAuthenticationType.AzureIMDS,
61+
SaslOauthbearerConfig = $"query={azureIMDSQueryParams}",
62+
SaslOauthbearerExtensions = $"logicalCluster={kafkaLogicalCluster},identityPoolId={identityPoolId}"
63+
};
64+
65+
var consumerConfig = new ConsumerConfig
66+
{
67+
BootstrapServers = bootstrapServers,
68+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
69+
SaslMechanism = SaslMechanism.OAuthBearer,
70+
SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
71+
GroupId = groupId,
72+
AutoOffsetReset = AutoOffsetReset.Earliest,
73+
EnableAutoOffsetStore = false
74+
};
75+
76+
try
77+
{
78+
createTopic(commonConfig, topicName);
79+
}
80+
catch (CreateTopicsException e)
81+
{
82+
Console.WriteLine($"An error occurred creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
83+
Environment.Exit(1);
84+
}
85+
86+
using (var producer = new ProducerBuilder<Null, string>(commonConfig)
87+
.Build())
88+
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
89+
.Build())
90+
{
91+
consumer.Subscribe(topicName);
92+
93+
var cancelled = false;
94+
CancellationTokenSource cts = new CancellationTokenSource();
95+
96+
Console.CancelKeyPress += (_, e) =>
97+
{
98+
e.Cancel = true; // prevent the process from terminating.
99+
cancelled = true;
100+
cts.Cancel();
101+
};
102+
103+
try
104+
{
105+
while (!cancelled)
106+
{
107+
var msg = "User";
108+
109+
try
110+
{
111+
var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = msg });
112+
Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}, {msg}");
113+
}
114+
catch (ProduceException<Null, string> e)
115+
{
116+
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
117+
}
118+
119+
try
120+
{
121+
var consumeResult = consumer.Consume(cts.Token);
122+
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
123+
try
124+
{
125+
consumer.StoreOffset(consumeResult);
126+
}
127+
catch (KafkaException e)
128+
{
129+
Console.WriteLine($"Store Offset error: {e.Error.Reason}");
130+
}
131+
}
132+
catch (ConsumeException e)
133+
{
134+
Console.WriteLine($"Consume error: {e.Error.Reason}");
135+
}
136+
}
137+
}
138+
catch (OperationCanceledException)
139+
{
140+
Console.WriteLine("Closing consumer.");
141+
consumer.Close();
142+
}
143+
}
144+
}
145+
146+
private static void createTopic(ClientConfig config, string topicName)
147+
{
148+
using (var adminClient = new AdminClientBuilder(config).Build())
149+
{
150+
adminClient.CreateTopicsAsync(new TopicSpecification[] {
151+
new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait();
152+
}
153+
}
154+
}
155+
156+
}

examples/OAuthProducer/Program.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ public class Program
5050

5151
public static async Task Main(string[] args)
5252
{
53-
if (args.Length != 4)
53+
if (args.Length != 3)
5454
{
5555
Console.WriteLine("Usage: .. brokerList topic \"principal=<value> scope=<scope>\"");
5656
return;
5757
}
58-
string bootstrapServers = args[1];
59-
string topicName = args[2];
60-
string oauthConf = args[3];
58+
string bootstrapServers = args[0];
59+
string topicName = args[1];
60+
string oauthConf = args[2];
6161

6262
if (!Regex.IsMatch(oauthConf, OauthConfigRegexPattern))
6363
{

0 commit comments

Comments
 (0)