Skip to content

Commit 58b5291

Browse files
jainruchiremasab
andauthored
[KIP-430] DescribeConsumerGroups, DescribeTopics, DescribeCluster with authorized AclOperations (#2021)
--------- Co-authored-by: Emanuele Sabellico <[email protected]>
1 parent bd5a9bc commit 58b5291

35 files changed

+2420
-55
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
# vNext
1+
# 2.3.0
22

33
## Enhancements
44

5+
- References librdkafka.redist 2.3.0. Refer to the [librdkafka v2.3.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0) for more information.
56
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
67
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
8+
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).
9+
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
10+
Return authorized operations in describe responses (#2021, @jainruchir).
711

812

913
# 2.2.0

examples/AdminClient/Program.cs

Lines changed: 208 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
1+
// Copyright 2015-2016 Andreas Heider,
2+
// 2016-2023 Confluent Inc.
23
//
34
// Licensed under the Apache License, Version 2.0 (the "License");
45
// you may not use this file except in compliance with the License.
@@ -525,7 +526,8 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
525526
{
526527
try
527528
{
528-
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() {
529+
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions()
530+
{
529531
RequestTimeout = timeout,
530532
MatchStates = statesList,
531533
});
@@ -546,23 +548,52 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
546548

547549
static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs)
548550
{
549-
if (commandArgs.Length < 1)
551+
if (commandArgs.Length < 3)
550552
{
551-
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <group1> [<group2 ... <groupN>]");
553+
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <username> <password> <include_authorized_operations> <group1> [<group2 ... <groupN>]");
552554
Environment.ExitCode = 1;
553555
return;
554556
}
555557

556-
var groupNames = commandArgs.ToList();
558+
var username = commandArgs[0];
559+
var password = commandArgs[1];
560+
var includeAuthorizedOperations = (commandArgs[2] == "1");
561+
var groupNames = commandArgs.Skip(3).ToList();
562+
563+
if (string.IsNullOrWhiteSpace(username))
564+
{
565+
username = null;
566+
}
567+
if (string.IsNullOrWhiteSpace(password))
568+
{
569+
password = null;
570+
}
571+
557572
var timeout = TimeSpan.FromSeconds(30);
558-
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
573+
var config = new AdminClientConfig
574+
{
575+
BootstrapServers = bootstrapServers,
576+
};
577+
if (username != null && password != null)
578+
{
579+
config = new AdminClientConfig
580+
{
581+
BootstrapServers = bootstrapServers,
582+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
583+
SaslMechanism = SaslMechanism.Plain,
584+
SaslUsername = username,
585+
SaslPassword = password,
586+
};
587+
}
588+
589+
using (var adminClient = new AdminClientBuilder(config).Build())
559590
{
560591
try
561592
{
562-
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout });
593+
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
563594
foreach (var group in descResult.ConsumerGroupDescriptions)
564595
{
565-
Console.WriteLine($" Group: {group.GroupId} {group.Error}");
596+
Console.WriteLine($"\n Group: {group.GroupId} {group.Error}");
566597
Console.WriteLine($" Broker: {group.Coordinator}");
567598
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}");
568599
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}");
@@ -579,6 +610,11 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
579610
}
580611
Console.WriteLine($" TopicPartitions: [{topicPartitions}]");
581612
}
613+
if (includeAuthorizedOperations)
614+
{
615+
string operations = string.Join(" ", group.AuthorizedOperations);
616+
Console.WriteLine($" Authorized operations: {operations}");
617+
}
582618
}
583619
}
584620
catch (KafkaException e)
@@ -757,6 +793,162 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,
757793
}
758794
}
759795

796+
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
797+
{
798+
foreach (var topic in topicDescriptions)
799+
{
800+
Console.WriteLine($"\n Topic: {topic.Name} {topic.Error}");
801+
Console.WriteLine($" Partitions:");
802+
foreach (var partition in topic.Partitions)
803+
{
804+
Console.WriteLine($" Partition ID: {partition.Partition} with leader: {partition.Leader}");
805+
if(!partition.ISR.Any())
806+
{
807+
Console.WriteLine(" There is no In-Sync-Replica broker for the partition");
808+
}
809+
else
810+
{
811+
string isrs = string.Join("; ", partition.ISR);
812+
Console.WriteLine($" The In-Sync-Replica brokers are: {isrs}");
813+
}
814+
815+
if(!partition.Replicas.Any())
816+
{
817+
Console.WriteLine(" There is no Replica broker for the partition");
818+
}
819+
else
820+
{
821+
string replicas = string.Join("; ", partition.Replicas);
822+
Console.WriteLine($" The Replica brokers are: {replicas}");
823+
}
824+
825+
}
826+
Console.WriteLine($" Is internal: {topic.IsInternal}");
827+
if (includeAuthorizedOperations)
828+
{
829+
string operations = string.Join(" ", topic.AuthorizedOperations);
830+
Console.WriteLine($" Authorized operations: {operations}");
831+
}
832+
}
833+
}
834+
835+
static async Task DescribeTopicsAsync(string bootstrapServers, string[] commandArgs)
836+
{
837+
if (commandArgs.Length < 3)
838+
{
839+
Console.WriteLine("usage: .. <bootstrapServers> describe-topics <username> <password> <include_authorized_operations> <topic1> [<topic2 ... <topicN>]");
840+
Environment.ExitCode = 1;
841+
return;
842+
}
843+
844+
var username = commandArgs[0];
845+
var password = commandArgs[1];
846+
var includeAuthorizedOperations = (commandArgs[2] == "1");
847+
if (string.IsNullOrWhiteSpace(username))
848+
{
849+
username = null;
850+
}
851+
if (string.IsNullOrWhiteSpace(password))
852+
{
853+
password = null;
854+
}
855+
var topicNames = commandArgs.Skip(3).ToList();
856+
857+
var timeout = TimeSpan.FromSeconds(30);
858+
var config = new AdminClientConfig
859+
{
860+
BootstrapServers = bootstrapServers,
861+
};
862+
if (username != null && password != null)
863+
{
864+
config = new AdminClientConfig
865+
{
866+
BootstrapServers = bootstrapServers,
867+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
868+
SaslMechanism = SaslMechanism.Plain,
869+
SaslUsername = username,
870+
SaslPassword = password,
871+
};
872+
}
873+
874+
using (var adminClient = new AdminClientBuilder(config).Build())
875+
{
876+
try
877+
{
878+
var descResult = await adminClient.DescribeTopicsAsync(
879+
TopicCollection.OfTopicNames(topicNames),
880+
new DescribeTopicsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
881+
PrintTopicDescriptions(descResult.TopicDescriptions, includeAuthorizedOperations);
882+
}
883+
catch (DescribeTopicsException e)
884+
{
885+
// At least one TopicDescription will have an error.
886+
PrintTopicDescriptions(e.Results.TopicDescriptions, includeAuthorizedOperations);
887+
}
888+
catch (KafkaException e)
889+
{
890+
Console.WriteLine($"An error occurred describing topics: {e}");
891+
Environment.ExitCode = 1;
892+
}
893+
}
894+
}
895+
896+
static async Task DescribeClusterAsync(string bootstrapServers, string[] commandArgs)
897+
{
898+
if (commandArgs.Length < 3)
899+
{
900+
Console.WriteLine("usage: .. <bootstrapServers> describe-cluster <username> <password> <include_authorized_operations>");
901+
Environment.ExitCode = 1;
902+
return;
903+
}
904+
905+
var username = commandArgs[0];
906+
var password = commandArgs[1];
907+
var includeAuthorizedOperations = (commandArgs[2] == "1");
908+
909+
var timeout = TimeSpan.FromSeconds(30);
910+
var config = new AdminClientConfig
911+
{
912+
BootstrapServers = bootstrapServers,
913+
};
914+
if (username != null && password != null)
915+
{
916+
config = new AdminClientConfig
917+
{
918+
BootstrapServers = bootstrapServers,
919+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
920+
SaslMechanism = SaslMechanism.Plain,
921+
SaslUsername = username,
922+
SaslPassword = password,
923+
};
924+
}
925+
926+
using (var adminClient = new AdminClientBuilder(config).Build())
927+
{
928+
try
929+
{
930+
var descResult = await adminClient.DescribeClusterAsync(new DescribeClusterOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
931+
932+
Console.WriteLine($" Cluster Id: {descResult.ClusterId}\n Controller: {descResult.Controller}");
933+
Console.WriteLine(" Nodes:");
934+
foreach(var node in descResult.Nodes)
935+
{
936+
Console.WriteLine($" {node}");
937+
}
938+
if (includeAuthorizedOperations)
939+
{
940+
string operations = string.Join(" ", descResult.AuthorizedOperations);
941+
Console.WriteLine($" Authorized operations: {operations}");
942+
}
943+
}
944+
catch (KafkaException e)
945+
{
946+
Console.WriteLine($"An error occurred describing cluster: {e}");
947+
Environment.ExitCode = 1;
948+
}
949+
}
950+
}
951+
760952
public static async Task Main(string[] args)
761953
{
762954
if (args.Length < 2)
@@ -768,8 +960,8 @@ public static async Task Main(string[] args)
768960
"list-consumer-groups", "describe-consumer-groups",
769961
"list-consumer-group-offsets", "alter-consumer-group-offsets",
770962
"incremental-alter-configs", "describe-user-scram-credentials",
771-
"alter-user-scram-credentials"
772-
963+
"alter-user-scram-credentials", "describe-topics",
964+
"describe-cluster"
773965
}) +
774966
" ..");
775967
Environment.ExitCode = 1;
@@ -824,6 +1016,12 @@ public static async Task Main(string[] args)
8241016
case "alter-user-scram-credentials":
8251017
await AlterUserScramCredentialsAsync(bootstrapServers, commandArgs);
8261018
break;
1019+
case "describe-topics":
1020+
await DescribeTopicsAsync(bootstrapServers, commandArgs);
1021+
break;
1022+
case "describe-cluster":
1023+
await DescribeClusterAsync(bootstrapServers, commandArgs);
1024+
break;
8271025
default:
8281026
Console.WriteLine($"unknown command: {command}");
8291027
break;

src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 Confluent Inc.
1+
// Copyright 2022-2023 Confluent Inc.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -15,6 +15,8 @@
1515
// Refer to LICENSE for more information.
1616

1717
using System.Collections.Generic;
18+
using System.Linq;
19+
using System.Text;
1820

1921
namespace Confluent.Kafka.Admin
2022
{
@@ -50,13 +52,51 @@ public class ConsumerGroupDescription
5052
public ConsumerGroupState State { get; set; }
5153

5254
/// <summary>
53-
/// Consumer group coordinator (broker).
55+
/// Broker that acts as consumer group coordinator (null if not known).
5456
/// </summary>
5557
public Node Coordinator { get; set; }
5658

5759
/// <summary>
5860
/// Members list.
5961
/// </summary>
6062
public List<MemberDescription> Members { get; set; }
63+
64+
/// <summary>
65+
/// AclOperation list (null if not requested or not supported).
66+
/// </summary>
67+
public List<AclOperation> AuthorizedOperations { get; set; }
68+
69+
/// <summary>
70+
/// Returns a JSON representation of this object.
71+
/// </summary>
72+
/// <returns>
73+
/// A JSON representation of this object.
74+
/// </returns>
75+
public override string ToString()
76+
{
77+
var result = new StringBuilder();
78+
var members = string.Join(",",
79+
Members.Select(member =>
80+
member.ToString()
81+
).ToList());
82+
var authorizedOperations = "null";
83+
if (AuthorizedOperations != null)
84+
{
85+
authorizedOperations = string.Join(",",
86+
AuthorizedOperations.Select(authorizedOperation =>
87+
authorizedOperation.ToString().Quote()
88+
).ToList());
89+
authorizedOperations = $"[{authorizedOperations}]";
90+
}
91+
92+
result.Append($"{{\"GroupId\": {GroupId.Quote()}");
93+
result.Append($", \"Error\": \"{Error.Code}\", \"IsSimpleConsumerGroup\": {IsSimpleConsumerGroup.Quote()}");
94+
result.Append($", \"PartitionAssignor\": {PartitionAssignor.Quote()}, \"State\": {State.ToString().Quote()}");
95+
result.Append($", \"Coordinator\": {Coordinator?.ToString() ?? "null"}, \"Members\": [{members}]");
96+
result.Append($", \"AuthorizedOperations\": {authorizedOperations}}}");
97+
98+
return result.ToString();
99+
}
100+
61101
}
62102
}

0 commit comments

Comments
 (0)