Skip to content

Commit 39f4206

Browse files
mahajanadhityaemasabanchitj
authored
[KIP-554] User SCRAM credentials API (#2070)
* Check for Flaky Tests * Initial build try * Initial build try-(2) * Initial build try-(3) * ignoring flaky tests * faster response cycle * Final build - (1) * PR comment Addressal * added logtofile for protobuf_with_reference * Removed LogToFile for protobufwithreference * FIXME Comment for ProtobufWithReference and removed SchemaRegistry.IntegrationTests for now from semaphore * ProtobufWithReference skipFlaky removed * inital stubs * Changes * changes * changes * changes * changes * changes * changes * changes * changes * changes * changes * changes * changes * changes * changes * Reflect librdkafka changes Add exceptions Remove AlterUserScramCredentialsResult Add DescribeUserScramCredentialsReport * Fixes and parametric examples * Fix integration tests * Better exception documentation * Address comments * Change ToString methods to return a JSON string * Changelog entry * Unit tests --------- Co-authored-by: Emanuele Sabellico <[email protected]> Co-authored-by: Anchit Jain <[email protected]>
1 parent ee5fc62 commit 39f4206

29 files changed

+2093
-3
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
- References librdkafka.redist 2.2.0. Refer to the [librdkafka v2.2.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0) for more information.
66
- [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API)
77
IncrementalAlterConfigs API (#2005).
8+
- [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API):
9+
User SASL/SCRAM credentials alteration and description (#2070).
10+
811

912
## Fixes
1013

examples/AdminClient/Program.cs

Lines changed: 167 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using Confluent.Kafka.Admin;
2222
using System.Linq;
2323
using System.Collections.Generic;
24+
using System.Text;
2425

2526

2627
namespace Confluent.Kafka.Examples
@@ -161,6 +162,85 @@ static void PrintAclBindings(List<AclBinding> aclBindings)
161162
}
162163
}
163164

165+
static List<UserScramCredentialAlteration> ParseUserScramCredentialAlterations(
166+
string[] args)
167+
{
168+
if (args.Length == 0)
169+
{
170+
Console.WriteLine("usage: .. <bootstrapServers> alter-user-scram-alterations " +
171+
"UPSERT <user1> <mechanism1> <iterations1> <password1> <salt1> " +
172+
"[UPSERT <user2> <mechanism2> <iterations2> <password2> <salt2> " +
173+
"DELETE <user3> <mechanism3> ..]");
174+
Environment.ExitCode = 1;
175+
return null;
176+
}
177+
178+
var alterations = new List<UserScramCredentialAlteration>();
179+
for (int i = 0; i < args.Length;) {
180+
string alterationName = args[i];
181+
if (alterationName == "UPSERT")
182+
{
183+
if (i + 5 >= args.Length)
184+
{
185+
throw new ArgumentException(
186+
$"invalid number of arguments for alteration {alterations.Count},"+
187+
$" expected 5, got {args.Length - i - 1}");
188+
}
189+
190+
string user = args[i + 1];
191+
var mechanism = Enum.Parse<ScramMechanism>(args[i + 2]);
192+
var iterations = Int32.Parse(args[i + 3]);
193+
var password = Encoding.UTF8.GetBytes(args[i + 4]);
194+
string saltString = args[i + 5];
195+
byte[] salt = null;
196+
if (saltString != "")
197+
{
198+
salt = Encoding.UTF8.GetBytes(saltString);
199+
}
200+
alterations.Add(
201+
new UserScramCredentialUpsertion
202+
{
203+
User = user,
204+
ScramCredentialInfo = new ScramCredentialInfo
205+
{
206+
Mechanism = mechanism,
207+
Iterations = iterations,
208+
},
209+
Password = password,
210+
Salt = salt,
211+
}
212+
);
213+
i += 6;
214+
}
215+
else if (alterationName == "DELETE")
216+
{
217+
if (i + 2 >= args.Length)
218+
{
219+
throw new ArgumentException(
220+
$"invalid number of arguments for alteration {alterations.Count},"+
221+
$" expected 2, got {args.Length - i - 1}");
222+
}
223+
224+
string user = args[i + 1];
225+
var mechanism = Enum.Parse<ScramMechanism>(args[i + 2]);
226+
alterations.Add(
227+
new UserScramCredentialDeletion
228+
{
229+
User = user,
230+
Mechanism = mechanism,
231+
}
232+
);
233+
i += 3;
234+
}
235+
else
236+
{
237+
throw new ArgumentException(
238+
$"invalid alteration {alterations.Count}, must be UPSERT or DELETE");
239+
}
240+
}
241+
return alterations;
242+
}
243+
164244
static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs)
165245
{
166246
List<AclBinding> aclBindings;
@@ -599,6 +679,84 @@ e is FormatException
599679
}
600680
}
601681

682+
static async Task DescribeUserScramCredentialsAsync(string bootstrapServers, string[] commandArgs)
683+
{
684+
var users = commandArgs.ToList();
685+
var timeout = TimeSpan.FromSeconds(30);
686+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
687+
{
688+
try
689+
{
690+
var descResult = await adminClient.DescribeUserScramCredentialsAsync(users, new DescribeUserScramCredentialsOptions() { RequestTimeout = timeout });
691+
foreach (var description in descResult.UserScramCredentialsDescriptions)
692+
{
693+
Console.WriteLine($" User: {description.User}");
694+
foreach (var scramCredentialInfo in description.ScramCredentialInfos)
695+
{
696+
Console.WriteLine($" Mechanism: {scramCredentialInfo.Mechanism}");
697+
Console.WriteLine($" Iterations: {scramCredentialInfo.Iterations}");
698+
}
699+
}
700+
}
701+
catch (DescribeUserScramCredentialsException e)
702+
{
703+
Console.WriteLine($"An error occurred describing user SCRAM credentials" +
704+
" for some users:");
705+
foreach (var description in e.Results.UserScramCredentialsDescriptions)
706+
{
707+
Console.WriteLine($" User: {description.User}");
708+
Console.WriteLine($" Error: {description.Error}");
709+
if (!description.Error.IsError)
710+
{
711+
foreach (var scramCredentialInfo in description.ScramCredentialInfos)
712+
{
713+
Console.WriteLine($" Mechanism: {scramCredentialInfo.Mechanism}");
714+
Console.WriteLine($" Iterations: {scramCredentialInfo.Iterations}");
715+
}
716+
}
717+
}
718+
}
719+
catch (KafkaException e)
720+
{
721+
Console.WriteLine($"An error occurred describing user SCRAM credentials: {e}");
722+
Environment.ExitCode = 1;
723+
}
724+
}
725+
}
726+
727+
static async Task AlterUserScramCredentialsAsync(string bootstrapServers, string[] commandArgs)
728+
{
729+
var alterations = ParseUserScramCredentialAlterations(commandArgs);
730+
if (alterations == null)
731+
return;
732+
733+
var timeout = TimeSpan.FromSeconds(30);
734+
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
735+
{
736+
try
737+
{
738+
await adminClient.AlterUserScramCredentialsAsync(alterations,
739+
new AlterUserScramCredentialsOptions() { RequestTimeout = timeout });
740+
Console.WriteLine("All AlterUserScramCredentials operations completed successfully");
741+
}
742+
catch (AlterUserScramCredentialsException e)
743+
{
744+
Console.WriteLine($"An error occurred altering user SCRAM credentials" +
745+
" for some users:");
746+
foreach (var result in e.Results)
747+
{
748+
Console.WriteLine($" User: {result.User}");
749+
Console.WriteLine($" Error: {result.Error}");
750+
}
751+
}
752+
catch (KafkaException e)
753+
{
754+
Console.WriteLine($"An error occurred altering user SCRAM credentials: {e}");
755+
Environment.ExitCode = 1;
756+
}
757+
}
758+
}
759+
602760
public static async Task Main(string[] args)
603761
{
604762
if (args.Length < 2)
@@ -609,7 +767,9 @@ public static async Task Main(string[] args)
609767
"describe-acls", "delete-acls",
610768
"list-consumer-groups", "describe-consumer-groups",
611769
"list-consumer-group-offsets", "alter-consumer-group-offsets",
612-
"incremental-alter-configs"
770+
"incremental-alter-configs", "describe-user-scram-credentials",
771+
"alter-user-scram-credentials"
772+
613773
}) +
614774
" ..");
615775
Environment.ExitCode = 1;
@@ -658,6 +818,12 @@ public static async Task Main(string[] args)
658818
case "incremental-alter-configs":
659819
await IncrementalAlterConfigsAsync(bootstrapServers, commandArgs);
660820
break;
821+
case "describe-user-scram-credentials":
822+
await DescribeUserScramCredentialsAsync(bootstrapServers, commandArgs);
823+
break;
824+
case "alter-user-scram-credentials":
825+
await AlterUserScramCredentialsAsync(bootstrapServers, commandArgs);
826+
break;
661827
default:
662828
Console.WriteLine($"unknown command: {command}");
663829
break;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System.Collections.Generic;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Represents an error that occurred during an alter user
24+
/// scram credentials operation.
25+
/// </summary>
26+
public class AlterUserScramCredentialsException : KafkaException
27+
{
28+
/// <summary>
29+
/// Initializes a new instance of AlterUserScramCredentialsException.
30+
/// </summary>
31+
/// <param name="results">
32+
/// The result corresponding to all users in the request
33+
/// (whether or not they were in error). At least one of these
34+
/// results will be in error.
35+
/// </param>
36+
public AlterUserScramCredentialsException(List<AlterUserScramCredentialsReport> results)
37+
: base(new Error(ErrorCode.Local_Partial,
38+
"An error occurred altering user SCRAM credentials, check individual result elements"))
39+
{
40+
Results = results;
41+
}
42+
43+
/// <summary>
44+
/// The result corresponding to all users in the request
45+
/// (whether or not they were in error). At least one of these
46+
/// results will be in error.
47+
/// </summary>
48+
public List<AlterUserScramCredentialsReport> Results { get; }
49+
}
50+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
19+
20+
namespace Confluent.Kafka.Admin
21+
{
22+
/// <summary>
23+
/// Options for the AlterUserScramCredentials method.
24+
/// </summary>
25+
public class AlterUserScramCredentialsOptions
26+
{
27+
/// <summary>
28+
/// The overall request timeout, including broker lookup, request
29+
/// transmission, operation time on broker, and response. If set
30+
/// to null, the default request timeout for the AdminClient will
31+
/// be used.
32+
///
33+
/// Default: null
34+
/// </summary>
35+
public TimeSpan? RequestTimeout { get; set; }
36+
}
37+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2023 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
18+
namespace Confluent.Kafka.Admin
19+
{
20+
/// <summary>
21+
/// The per-user result for an alter user scram credentials request,
22+
/// including errors.
23+
/// </summary>
24+
public class AlterUserScramCredentialsReport
25+
{
26+
/// <summary>
27+
/// Username for the performed Alteration.
28+
/// </summary>
29+
public string User { get; set; }
30+
31+
/// <summary>
32+
/// Error of the performed Alteration.
33+
/// </summary>
34+
public Error Error { get; set; }
35+
36+
/// <summary>
37+
/// Returns a JSON representation of the AlterUserScramCredentialsReport object.
38+
/// </summary>
39+
/// <returns>
40+
/// A JSON representation the AlterUserScramCredentialsReport object.
41+
/// </returns>
42+
public override string ToString()
43+
{
44+
return $"{{\"User\": {User.Quote()}, " +
45+
$"\"Error\": {Error.ToString().Quote()}}}";
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)