Skip to content

Commit 8b772df

Browse files
committed
OAuth Azure IMDS Kafka example
1 parent cefe511 commit 8b772df

File tree

5 files changed

+214
-1
lines changed

5 files changed

+214
-1
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# 2.12.0
2+
3+
## Enhancements
4+
5+
* References librdkafka.redist 2.12.0. Refer to the [librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) for more information.
6+
* OAuth OIDC method example for Kafka metadata based authentication with
7+
an Azure IMDS endpoint using an attached managed identity as principal (#2523).
8+
9+
110
# 2.11.1
211

312
## Enhancements
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<ProjectTypeGuids>{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
5+
<AssemblyName>OAuthOIDC</AssemblyName>
6+
<TargetFramework>net8.0</TargetFramework>
7+
<OutputType>Exe</OutputType>
8+
<LangVersion>7.1</LangVersion>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="2.11.1" /> -->
13+
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
14+
</ItemGroup>
15+
16+
<ItemGroup>
17+
<PackageReference Include="NJsonSchema" Version="10.8.0" />
18+
</ItemGroup>
19+
20+
</Project>
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Text;
20+
using System.Text.RegularExpressions;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Confluent.Kafka;
24+
using Confluent.Kafka.Admin;
25+
using Confluent.Kafka.SyncOverAsync;
26+
using Newtonsoft.Json;
27+
28+
/// <summary>
29+
/// An example demonstrating how to produce a message to
30+
/// a topic, and then reading it back again using a consumer.
31+
/// The authentication uses the OpenID Connect method of the OAUTHBEARER SASL mechanism.
32+
/// The token is acquired from the Azure Instance Metadata Service (IMDS)
33+
/// using metadata based secret-less authentication.
34+
/// </summary>
35+
namespace Confluent.Kafka.Examples.OAuthOIDCAzureIMDS
36+
{
37+
38+
public class Program
39+
{
40+
private const string azureIMDSQueryParams = "api-version=&resource=&client_id=";
41+
private const string kafkaLogicalCluster = "your-logical-cluster";
42+
private const string identityPoolId = "your-identity-pool-id";
43+
44+
public static async Task Main(string[] args)
45+
{
46+
if (args.Length != 2)
47+
{
48+
Console.WriteLine("Usage: .. brokerList");
49+
return;
50+
}
51+
var bootstrapServers = args[1];
52+
var topicName = Guid.NewGuid().ToString();
53+
var groupId = Guid.NewGuid().ToString();
54+
55+
var commonConfig = new ClientConfig
56+
{
57+
BootstrapServers = bootstrapServers,
58+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
59+
SaslMechanism = SaslMechanism.OAuthBearer,
60+
SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
61+
SaslOauthbearerMetadataAuthenticationType = SaslOauthbearerMetadataAuthenticationType.AzureIMDS,
62+
SaslOauthbearerConfig = $"query={azureIMDSQueryParams}",
63+
SaslOauthbearerExtensions = $"logicalCluster={kafkaLogicalCluster},identityPoolId={identityPoolId}"
64+
};
65+
66+
var consumerConfig = new ConsumerConfig
67+
{
68+
BootstrapServers = bootstrapServers,
69+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
70+
SaslMechanism = SaslMechanism.OAuthBearer,
71+
SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
72+
GroupId = groupId,
73+
AutoOffsetReset = AutoOffsetReset.Earliest,
74+
EnableAutoOffsetStore = false
75+
};
76+
77+
try
78+
{
79+
createTopic(commonConfig, topicName);
80+
}
81+
catch (CreateTopicsException e)
82+
{
83+
Console.WriteLine($"An error occurred creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
84+
Environment.Exit(1);
85+
}
86+
87+
using (var producer = new ProducerBuilder<Null, string>(commonConfig)
88+
.Build())
89+
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
90+
.Build())
91+
{
92+
consumer.Subscribe(topicName);
93+
94+
var cancelled = false;
95+
CancellationTokenSource cts = new CancellationTokenSource();
96+
97+
Console.CancelKeyPress += (_, e) =>
98+
{
99+
e.Cancel = true; // prevent the process from terminating.
100+
cancelled = true;
101+
cts.Cancel();
102+
};
103+
104+
try
105+
{
106+
while (!cancelled)
107+
{
108+
var msg = "User";
109+
110+
try
111+
{
112+
var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, string> { Value = msg });
113+
Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}, {msg}");
114+
}
115+
catch (ProduceException<Null, string> e)
116+
{
117+
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
118+
}
119+
120+
try
121+
{
122+
var consumeResult = consumer.Consume(cts.Token);
123+
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
124+
try
125+
{
126+
consumer.StoreOffset(consumeResult);
127+
}
128+
catch (KafkaException e)
129+
{
130+
Console.WriteLine($"Store Offset error: {e.Error.Reason}");
131+
}
132+
}
133+
catch (ConsumeException e)
134+
{
135+
Console.WriteLine($"Consume error: {e.Error.Reason}");
136+
}
137+
}
138+
}
139+
catch (OperationCanceledException)
140+
{
141+
Console.WriteLine("Closing consumer.");
142+
consumer.Close();
143+
}
144+
}
145+
}
146+
147+
private static void createTopic(ClientConfig config, String topicName)
148+
{
149+
using (var adminClient = new AdminClientBuilder(config).Build())
150+
{
151+
adminClient.CreateTopicsAsync(new TopicSpecification[] {
152+
new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait(); ;
153+
}
154+
}
155+
}
156+
157+
}

src/ConfigGen/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ static string ConfigNameToDotnetName(string configName)
320320
{ "resolve_canonical_bootstrap_servers_only", "ResolveCanonicalBootstrapServersOnly"},
321321
{ "client_credentials", "ClientCredentials"},
322322
{ "urn:ietf:params:oauth:grant-type:jwt-bearer", "JwtBearer"},
323+
{ "azure_imds", "AzureIMDS"},
323324
};
324325

325326
static string EnumNameToDotnetName(string enumName)

src/Confluent.Kafka/Config_gen.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// *** Auto-generated from librdkafka v2.11.1 *** - do not modify manually.
1+
// *** Auto-generated from librdkafka dev_oauthbearer_metadata_based *** - do not modify manually.
22
//
33
// Copyright 2018-2022 Confluent Inc.
44
//
@@ -203,6 +203,22 @@ public enum SaslOauthbearerAssertionAlgorithm
203203
ES256
204204
}
205205

206+
/// <summary>
207+
/// SaslOauthbearerMetadataAuthenticationType enum values
208+
/// </summary>
209+
public enum SaslOauthbearerMetadataAuthenticationType
210+
{
211+
/// <summary>
212+
/// None
213+
/// </summary>
214+
None,
215+
216+
/// <summary>
217+
/// AzureIMDS
218+
/// </summary>
219+
AzureIMDS
220+
}
221+
206222
/// <summary>
207223
/// PartitionAssignmentStrategy enum values
208224
/// </summary>
@@ -1318,6 +1334,16 @@ public Acks? Acks
13181334
/// </summary>
13191335
public string SaslOauthbearerAssertionJwtTemplateFile { get { return Get("sasl.oauthbearer.assertion.jwt.template.file"); } set { this.SetObject("sasl.oauthbearer.assertion.jwt.template.file", value); } }
13201336

1337+
/// <summary>
1338+
/// <![CDATA[
1339+
/// Type of metadata-based authentication to use for OAUTHBEARER/OIDC `azure_imds` authenticates using the Azure IMDS endpoint. Sets a default value for `sasl.oauthbearer.token.endpoint.url` if missing. Configuration values specific of chosen authentication type can be passed through `sasl.oauthbearer.config`.
1340+
///
1341+
/// default: none
1342+
/// importance: low
1343+
/// ]]>
1344+
/// </summary>
1345+
public SaslOauthbearerMetadataAuthenticationType? SaslOauthbearerMetadataAuthenticationType { get { return (SaslOauthbearerMetadataAuthenticationType?)GetEnum(typeof(SaslOauthbearerMetadataAuthenticationType), "sasl.oauthbearer.metadata.authentication.type"); } set { this.SetObject("sasl.oauthbearer.metadata.authentication.type", value); } }
1346+
13211347
/// <summary>
13221348
/// <![CDATA[
13231349
/// List of plugin libraries to load (; separated). The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the platform-specific extension (such as .dll or .so) will be appended automatically.

0 commit comments

Comments
 (0)