Skip to content

Commit 62fd1b8

Browse files
committed
Enhance Bearer Authentication Support with Azure IMDS
- Introduced OAuthBearerAzureIMDS as a new authentication method in AuthCredentialsSource. - Updated CachedSchemaRegistryClient to support new Azure IMDS authentication configuration. - Implemented AzureIMDSBearerAuthenticationHeaderValueProvider for token retrieval using Azure Instance Metadata Service. - Added AzureIMDSBearerAuthenticationHeaderValueProviderBuilder to construct the new provider. - Refactored existing authentication header value providers to use a builder pattern for better extensibility. - Updated SchemaRegistryConfig to include new properties for Azure IMDS token endpoint and query parameters. - Enhanced unit tests to cover new Azure IMDS authentication scenarios.
1 parent abde168 commit 62fd1b8

24 files changed

+1104
-214
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# 2.12.0
2+
3+
## Enhancements
4+
5+
* References librdkafka.redist 2.12.0. Refer to the [librdkafka v2.12.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.12.0) for more information.
6+
* OAuth OIDC method for Schema Registry metadata based authentication with
7+
an Azure IMDS endpoint using an attached managed identity as principal (#).
8+
9+
110
# 2.11.1
211

312
## Enhancements
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<ProjectTypeGuids>{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
5+
<AssemblyName>OAuthOIDC</AssemblyName>
6+
<TargetFramework>net8.0</TargetFramework>
7+
<OutputType>Exe</OutputType>
8+
<LangVersion>7.1</LangVersion>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.11.1" /> -->
13+
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj" />
14+
</ItemGroup>
15+
16+
</Project>
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Text;
20+
using System.Text.RegularExpressions;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Confluent.Kafka;
24+
using Confluent.Kafka.Admin;
25+
using Confluent.Kafka.SyncOverAsync;
26+
using Newtonsoft.Json;
27+
using Confluent.SchemaRegistry;
28+
using Confluent.SchemaRegistry.Serdes;
29+
30+
/// <summary>
31+
/// An example demonstrating how to produce a message to
32+
/// a topic, and then reading it back again using a consumer.
33+
/// The authentication uses the OpenID Connect method of the OAUTHBEARER SASL mechanism.
34+
/// The token is acquired from the Azure Instance Metadata Service (IMDS)
35+
/// using metadata based secret-less authentication.
36+
/// </summary>
37+
namespace Confluent.Kafka.Examples.OAuthOIDCAzureIMDS
38+
{
39+
40+
class User
41+
{
42+
[JsonRequired] // use Newtonsoft.Json annotations
43+
[JsonProperty("name")]
44+
public string Name { get; set; }
45+
46+
[JsonRequired]
47+
[JsonProperty("favorite_color")]
48+
public string FavoriteColor { get; set; }
49+
50+
[JsonProperty("favorite_number")]
51+
public long FavoriteNumber { get; set; }
52+
}
53+
54+
public class Program
55+
{
56+
private const string azureIMDSQueryParams = "api-version=&resource=&client_id=";
57+
58+
public static async Task Main(string[] args)
59+
{
60+
if (args.Length != 2)
61+
{
62+
Console.WriteLine("Usage: .. brokerList schemaRegistryUrl");
63+
return;
64+
}
65+
var bootstrapServers = args[1];
66+
var schemaRegistryUrl = args[2];
67+
var topicName = Guid.NewGuid().ToString();
68+
var groupId = Guid.NewGuid().ToString();
69+
70+
var commonConfig = new ClientConfig
71+
{
72+
BootstrapServers = bootstrapServers,
73+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
74+
SaslMechanism = SaslMechanism.OAuthBearer,
75+
SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
76+
SaslOauthbearerMetadataAuthenticationType = SaslOauthbearerMetadataAuthenticationType.AzureIMDS,
77+
SaslOauthbearerConfig = $"query={azureIMDSQueryParams}",
78+
};
79+
80+
var consumerConfig = new ConsumerConfig
81+
{
82+
BootstrapServers = bootstrapServers,
83+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
84+
SaslMechanism = SaslMechanism.OAuthBearer,
85+
SaslOauthbearerMethod = SaslOauthbearerMethod.Oidc,
86+
GroupId = groupId,
87+
AutoOffsetReset = AutoOffsetReset.Earliest,
88+
EnableAutoOffsetStore = false
89+
};
90+
91+
var schemaRegistryConfig = new SchemaRegistryConfig
92+
{
93+
Url = schemaRegistryUrl,
94+
BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearerAzureIMDS,
95+
BearerAuthTokenEndpointQuery = azureIMDSQueryParams,
96+
};
97+
98+
try
99+
{
100+
createTopic(commonConfig, topicName);
101+
}
102+
catch (CreateTopicsException e)
103+
{
104+
Console.WriteLine($"An error occurred creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
105+
Environment.Exit(1);
106+
}
107+
108+
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
109+
using (var producer = new ProducerBuilder<Null, User>(commonConfig)
110+
.SetValueSerializer(new JsonSerializer<User>(schemaRegistry))
111+
.Build())
112+
using (var consumer = new ConsumerBuilder<Ignore, User>(consumerConfig)
113+
.SetValueDeserializer(new JsonDeserializer<User>(schemaRegistry).AsSyncOverAsync()).Build())
114+
{
115+
consumer.Subscribe(topicName);
116+
117+
var cancelled = false;
118+
CancellationTokenSource cts = new CancellationTokenSource();
119+
120+
Console.CancelKeyPress += (_, e) =>
121+
{
122+
e.Cancel = true; // prevent the process from terminating.
123+
cancelled = true;
124+
cts.Cancel();
125+
};
126+
127+
try
128+
{
129+
while (!cancelled)
130+
{
131+
var msg = new User
132+
{
133+
Name = "user-" + Guid.NewGuid().ToString(),
134+
FavoriteColor = "blue",
135+
FavoriteNumber = 7
136+
};
137+
138+
try
139+
{
140+
var deliveryReport = await producer.ProduceAsync(topicName, new Message<Null, User> { Value = msg });
141+
Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}, {msg}");
142+
}
143+
catch (ProduceException<Null, User> e)
144+
{
145+
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
146+
}
147+
148+
try
149+
{
150+
var consumeResult = consumer.Consume(cts.Token);
151+
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
152+
try
153+
{
154+
consumer.StoreOffset(consumeResult);
155+
}
156+
catch (KafkaException e)
157+
{
158+
Console.WriteLine($"Store Offset error: {e.Error.Reason}");
159+
}
160+
}
161+
catch (ConsumeException e)
162+
{
163+
Console.WriteLine($"Consume error: {e.Error.Reason}");
164+
}
165+
}
166+
}
167+
catch (OperationCanceledException)
168+
{
169+
Console.WriteLine("Closing consumer.");
170+
consumer.Close();
171+
}
172+
}
173+
}
174+
175+
private static void createTopic(ClientConfig config, String topicName)
176+
{
177+
using (var adminClient = new AdminClientBuilder(config).Build())
178+
{
179+
adminClient.CreateTopicsAsync(new TopicSpecification[] {
180+
new TopicSpecification { Name = topicName, ReplicationFactor = 3, NumPartitions = 1 } }).Wait(); ;
181+
}
182+
}
183+
}
184+
185+
}

examples/SchemaRegistryOAuth/Program.cs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public ExampleBearerAuthProvider(string token, string logicalCluster, string ide
5050

5151
public static async Task Main(string[] args)
5252
{
53-
if (args.Length != 9)
53+
if (args.Length >= 9)
5454
{
55-
Console.WriteLine("Usage: .. schemaRegistryUrl clientId clientSecret scope tokenEndpoint logicalCluster identityPool token");
55+
Console.WriteLine("Usage: .. schemaRegistryUrl clientId clientSecret scope tokenEndpoint logicalCluster identityPool token [azureIMDSQueryParams]");
5656
return;
5757
}
5858
string schemaRegistryUrl = args[1];
@@ -63,10 +63,15 @@ public static async Task Main(string[] args)
6363
string logicalCluster = args[6];
6464
string identityPool = args[7];
6565
string token = args[8];
66+
string azureIMDSQueryParams = null;
67+
if (args.Length >= 10)
68+
{
69+
azureIMDSQueryParams = args[9];
70+
}
6671

6772
//using BearerAuthCredentialsSource.OAuthBearer
68-
var clientCredentialsSchemaRegistryConfig = new SchemaRegistryConfig
69-
{
73+
var clientCredentialsSchemaRegistryConfig = new SchemaRegistryConfig
74+
{
7075
Url = schemaRegistryUrl,
7176
BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthBearer,
7277
BearerAuthClientId = clientId,
@@ -85,7 +90,7 @@ public static async Task Main(string[] args)
8590

8691
//using BearerAuthCredentialsSource.StaticToken
8792
var staticSchemaRegistryConfig = new SchemaRegistryConfig
88-
{
93+
{
8994
Url = schemaRegistryUrl,
9095
BearerAuthCredentialsSource = BearerAuthCredentialsSource.StaticToken,
9196
BearerAuthToken = token,
@@ -101,7 +106,7 @@ public static async Task Main(string[] args)
101106

102107
//Using BearerAuthCredentialsSource.Custom
103108
var customSchemaRegistryConfig = new SchemaRegistryConfig
104-
{
109+
{
105110
Url = schemaRegistryUrl,
106111
BearerAuthCredentialsSource = BearerAuthCredentialsSource.Custom
107112
};
@@ -112,6 +117,25 @@ public static async Task Main(string[] args)
112117
var subjects = await schemaRegistry.GetAllSubjectsAsync();
113118
Console.WriteLine(string.Join(", ", subjects));
114119
}
120+
121+
if (azureIMDSQueryParams == null)
122+
{
123+
return;
124+
}
125+
126+
//Using BearerAuthCredentialsSource.OAuthOIDCAzureIMDS
127+
var azureIMDSSchemaRegistryConfig = new SchemaRegistryConfig
128+
{
129+
Url = schemaRegistryUrl,
130+
BearerAuthCredentialsSource = BearerAuthCredentialsSource.OAuthOIDCAzureIMDS,
131+
SchemaRegistryBearerAuthTokenEndpointQuery = azureIMDSQueryParams,
132+
};
133+
134+
using (var schemaRegistry = new CachedSchemaRegistryClient(azureIMDSSchemaRegistryConfig))
135+
{
136+
var subjects = await schemaRegistry.GetAllSubjectsAsync();
137+
Console.WriteLine(string.Join(", ", subjects));
138+
}
115139
}
116140
}
117141
}

src/ConfigGen/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ static string ConfigNameToDotnetName(string configName)
320320
{ "resolve_canonical_bootstrap_servers_only", "ResolveCanonicalBootstrapServersOnly"},
321321
{ "client_credentials", "ClientCredentials"},
322322
{ "urn:ietf:params:oauth:grant-type:jwt-bearer", "JwtBearer"},
323+
{ "azure_imds", "AzureIMDS"},
323324
};
324325

325326
static string EnumNameToDotnetName(string enumName)

src/Confluent.Kafka/Config_gen.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// *** Auto-generated from librdkafka v2.11.1 *** - do not modify manually.
1+
// *** Auto-generated from librdkafka dev_oauthbearer_metadata_based *** - do not modify manually.
22
//
33
// Copyright 2018-2022 Confluent Inc.
44
//
@@ -203,6 +203,22 @@ public enum SaslOauthbearerAssertionAlgorithm
203203
ES256
204204
}
205205

206+
/// <summary>
207+
/// SaslOauthbearerMetadataAuthenticationType enum values
208+
/// </summary>
209+
public enum SaslOauthbearerMetadataAuthenticationType
210+
{
211+
/// <summary>
212+
/// None
213+
/// </summary>
214+
None,
215+
216+
/// <summary>
217+
/// AzureIMDS
218+
/// </summary>
219+
AzureIMDS
220+
}
221+
206222
/// <summary>
207223
/// PartitionAssignmentStrategy enum values
208224
/// </summary>
@@ -1318,6 +1334,16 @@ public Acks? Acks
13181334
/// </summary>
13191335
public string SaslOauthbearerAssertionJwtTemplateFile { get { return Get("sasl.oauthbearer.assertion.jwt.template.file"); } set { this.SetObject("sasl.oauthbearer.assertion.jwt.template.file", value); } }
13201336

1337+
/// <summary>
1338+
/// <![CDATA[
1339+
/// Type of metadata-based authentication to use for OAUTHBEARER/OIDC `azure_imds` authenticates using the Azure IMDS endpoint. Sets a default value for `sasl.oauthbearer.token.endpoint.url` if missing. Configuration values specific of chosen authentication type can be passed through `sasl.oauthbearer.config`.
1340+
///
1341+
/// default: none
1342+
/// importance: low
1343+
/// ]]>
1344+
/// </summary>
1345+
public SaslOauthbearerMetadataAuthenticationType? SaslOauthbearerMetadataAuthenticationType { get { return (SaslOauthbearerMetadataAuthenticationType?)GetEnum(typeof(SaslOauthbearerMetadataAuthenticationType), "sasl.oauthbearer.metadata.authentication.type"); } set { this.SetObject("sasl.oauthbearer.metadata.authentication.type", value); } }
1346+
13211347
/// <summary>
13221348
/// <![CDATA[
13231349
/// List of plugin libraries to load (; separated). The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the platform-specific extension (such as .dll or .so) will be appended automatically.

0 commit comments

Comments
 (0)