Skip to content

Commit 0d0cb8e

Browse files
authored
KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups (confluentinc#2378)
1 parent 2a12802 commit 0d0cb8e

File tree

9 files changed

+88
-6
lines changed

9 files changed

+88
-6
lines changed

examples/AdminClient/Program.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,7 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
790790
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}");
791791
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}");
792792
Console.WriteLine($" State: {group.State}");
793+
Console.WriteLine($" GroupType: {group.GroupType}");
793794
Console.WriteLine($" Members:");
794795
foreach (var m in group.Members)
795796
{
@@ -801,6 +802,17 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
801802
topicPartitions = String.Join(", ", m.Assignment.TopicPartitions.Select(tp => tp.ToString()));
802803
}
803804
Console.WriteLine($" TopicPartitions: [{topicPartitions}]");
805+
if (m. TargetAssignment != null)
806+
{
807+
Console.WriteLine($" TargetAssignment:");
808+
var targetTopicPartitions = "";
809+
if (m.TargetAssignment.TopicPartitions != null)
810+
{
811+
targetTopicPartitions = String.Join(", ", m.TargetAssignment.TopicPartitions.Select(tp => tp.ToString()));
812+
}
813+
Console.WriteLine($" TopicPartitions: [{targetTopicPartitions}]");
814+
}
815+
804816
}
805817
if (includeAuthorizedOperations)
806818
{

src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ public class ConsumerGroupDescription
5151
/// </summary>
5252
public ConsumerGroupState State { get; set; }
5353

54+
/// <summary>
55+
/// Consumer group type.
56+
/// </summary>
57+
public ConsumerGroupType GroupType { get; set;}
58+
5459
/// <summary>
5560
/// Broker that acts as consumer group coordinator (null if not known).
5661
/// </summary>
@@ -92,6 +97,7 @@ public override string ToString()
9297
result.Append($"{{\"GroupId\": {GroupId.Quote()}");
9398
result.Append($", \"Error\": \"{Error.Code}\", \"IsSimpleConsumerGroup\": {IsSimpleConsumerGroup.Quote()}");
9499
result.Append($", \"PartitionAssignor\": {PartitionAssignor.Quote()}, \"State\": {State.ToString().Quote()}");
100+
result.Append($", \"GroupType\": {GroupType.ToString().Quote()}");
95101
result.Append($", \"Coordinator\": {Coordinator?.ToString() ?? "null"}, \"Members\": [{members}]");
96102
result.Append($", \"AuthorizedOperations\": {authorizedOperations}}}");
97103

src/Confluent.Kafka/Admin/MemberDescription.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ public class MemberDescription
4949
/// Member assignment.
5050
/// </summary>
5151
public MemberAssignment Assignment { get; set; }
52-
52+
53+
/// <summary>
54+
/// Target assignment.
55+
/// </summary>
56+
public MemberAssignment TargetAssignment { get; set; } = null;
57+
5358
/// <summary>
5459
/// Returns a JSON representation of this object.
5560
/// </summary>
@@ -60,13 +65,24 @@ public override string ToString()
6065
{
6166
var result = new StringBuilder();
6267
var assignment = string.Join(",",
63-
Assignment.TopicPartitions.Select(topicPartition =>
68+
Assignment.TopicPartitions.Select(topicPartition =>
6469
$"{{\"Topic\": {topicPartition.Topic.Quote()}, \"Partition\": {topicPartition.Partition.Value}}}"
6570
).ToList());
66-
71+
var targetAssignment = TargetAssignment != null
72+
? string.Join(",",
73+
TargetAssignment.TopicPartitions.Select(topicPartition =>
74+
$"{{\"Topic\": {topicPartition.Topic.Quote()}, \"Partition\": {topicPartition.Partition.Value}}}"
75+
).ToList())
76+
: string.Empty;
77+
6778
result.Append($"{{\"ClientId\": {ClientId.Quote()}");
6879
result.Append($", \"GroupInstanceId\": {GroupInstanceId.Quote()}, \"ConsumerId\": {ConsumerId.Quote()}");
69-
result.Append($", \"Host\": {Host.Quote()}, \"Assignment\": [{assignment}]}}");
80+
result.Append($", \"Host\": {Host.Quote()}, \"Assignment\": [{assignment}]");
81+
if (TargetAssignment != null)
82+
{
83+
result.Append($", \"TargetAssignment\": [{targetAssignment}]");
84+
}
85+
result.Append("}");
7086

7187
return result.ToString();
7288
}

src/Confluent.Kafka/AdminClient.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,19 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr
342342
{
343343
member.Assignment.TopicPartitions = SafeKafkaHandle.GetTopicPartitionList(topicPartitionPtr);
344344
}
345+
var targetAssignmentPtr = Librdkafka.MemberDescription_target_assignment(memberPtr);
346+
if (targetAssignmentPtr != IntPtr.Zero)
347+
{
348+
var targetTopicPartitionPtr = Librdkafka.MemberAssignment_topic_partitions(targetAssignmentPtr);
349+
if (targetTopicPartitionPtr != IntPtr.Zero)
350+
{
351+
member.TargetAssignment = new MemberAssignment
352+
{
353+
TopicPartitions = SafeKafkaHandle.GetTopicPartitionList(targetTopicPartitionPtr)
354+
};
355+
}
356+
}
357+
345358
members.Add(member);
346359
}
347360

@@ -362,6 +375,8 @@ private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr
362375
PtrToStringUTF8(Librdkafka.ConsumerGroupDescription_partition_assignor(groupPtr)),
363376
State =
364377
Librdkafka.ConsumerGroupDescription_state(groupPtr),
378+
GroupType =
379+
Librdkafka.ConsumerGroupDescription_type(groupPtr),
365380
Coordinator = coordinator,
366381
Members = members,
367382
AuthorizedOperations = authorizedOperations,

src/Confluent.Kafka/Impl/LibRdKafka.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ static bool SetDelegates(Type nativeMethodsClass)
424424
_ConsumerGroupDescription_is_simple_consumer_group = (_ConsumerGroupDescription_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_is_simple_consumer_group").CreateDelegate(typeof (_ConsumerGroupDescription_is_simple_consumer_group_delegate));
425425
_ConsumerGroupDescription_partition_assignor = (_ConsumerGroupDescription_partition_assignor_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_partition_assignor").CreateDelegate(typeof (_ConsumerGroupDescription_partition_assignor_delegate));
426426
_ConsumerGroupDescription_state = (_ConsumerGroupDescription_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_state").CreateDelegate(typeof (_ConsumerGroupDescription_state_delegate));
427+
_ConsumerGroupDescription_type = (_ConsumerGroupDescription_type_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_type").CreateDelegate(typeof (_ConsumerGroupDescription_type_delegate));
427428
_ConsumerGroupDescription_coordinator = (_ConsumerGroupDescription_coordinator_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_coordinator").CreateDelegate(typeof (_ConsumerGroupDescription_coordinator_delegate));
428429
_ConsumerGroupDescription_member_count = (_ConsumerGroupDescription_member_count_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member_count").CreateDelegate(typeof (_ConsumerGroupDescription_member_count_delegate));
429430
_ConsumerGroupDescription_authorized_operations = (_ConsumerGroupDescription_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_authorized_operations").CreateDelegate(typeof (_ConsumerGroupDescription_authorized_operations_delegate));
@@ -434,6 +435,7 @@ static bool SetDelegates(Type nativeMethodsClass)
434435
_MemberDescription_host = (_MemberDescription_host_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_host").CreateDelegate(typeof (_MemberDescription_host_delegate));
435436
_MemberDescription_assignment = (_MemberDescription_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_assignment").CreateDelegate(typeof (_MemberDescription_assignment_delegate));
436437
_MemberAssignment_partitions = (_MemberAssignment_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_MemberAssignment_partitions").CreateDelegate(typeof (_MemberAssignment_partitions_delegate));
438+
_MemberDescription_target_assignment = (_MemberDescription_target_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_target_assignment").CreateDelegate(typeof (_MemberDescription_target_assignment_delegate));
437439
_Node_id = (_Node_id_delegate)methods.Single(m => m.Name == "rd_kafka_Node_id").CreateDelegate(typeof (_Node_id_delegate));
438440
_Node_host = (_Node_host_delegate)methods.Single(m => m.Name == "rd_kafka_Node_host").CreateDelegate(typeof (_Node_host_delegate));
439441
_Node_port = (_Node_port_delegate)methods.Single(m => m.Name == "rd_kafka_Node_port").CreateDelegate(typeof (_Node_port_delegate));
@@ -1953,6 +1955,12 @@ internal static ConsumerGroupState ConsumerGroupDescription_state(IntPtr grpdes
19531955
return _ConsumerGroupDescription_state(grpdesc);
19541956
}
19551957

1958+
private delegate ConsumerGroupType _ConsumerGroupDescription_type_delegate(IntPtr grpdesc);
1959+
private static _ConsumerGroupDescription_type_delegate _ConsumerGroupDescription_type;
1960+
1961+
internal static ConsumerGroupType ConsumerGroupDescription_type(IntPtr grpdesc)
1962+
=> _ConsumerGroupDescription_type(grpdesc);
1963+
19561964
private delegate IntPtr _ConsumerGroupDescription_coordinator_delegate(IntPtr grpdesc);
19571965
private static _ConsumerGroupDescription_coordinator_delegate _ConsumerGroupDescription_coordinator;
19581966
internal static IntPtr ConsumerGroupDescription_coordinator(IntPtr grpdesc)
@@ -2003,6 +2011,11 @@ internal static IntPtr MemberDescription_assignment(IntPtr member)
20032011
internal static IntPtr MemberAssignment_topic_partitions(IntPtr assignment)
20042012
=> _MemberAssignment_partitions(assignment);
20052013

2014+
private delegate IntPtr _MemberDescription_target_assignment_delegate(IntPtr member);
2015+
private static _MemberDescription_target_assignment_delegate _MemberDescription_target_assignment;
2016+
internal static IntPtr MemberDescription_target_assignment(IntPtr member)
2017+
=> _MemberDescription_target_assignment(member);
2018+
20062019
private delegate IntPtr _Node_id_delegate(IntPtr node);
20072020
private static _Node_id_delegate _Node_id;
20082021
internal static IntPtr Node_id(IntPtr node) => _Node_id(node);

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,6 +1073,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
10731073
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10741074
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc);
10751075

1076+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1077+
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc);
1078+
10761079
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10771080
internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc);
10781081

@@ -1103,6 +1106,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
11031106
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
11041107
internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment);
11051108

1109+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1110+
internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member);
1111+
11061112
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
11071113
internal static extern IntPtr rd_kafka_Node_id(IntPtr node);
11081114

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
10771077
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10781078
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc);
10791079

1080+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1081+
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc);
1082+
10801083
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10811084
internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc);
10821085

@@ -1107,6 +1110,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
11071110
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
11081111
internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment);
11091112

1113+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1114+
internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member);
1115+
11101116
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
11111117
internal static extern IntPtr rd_kafka_Node_id(IntPtr node);
11121118

src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,6 +1077,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
10771077
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10781078
internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc);
10791079

1080+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1081+
internal static extern ConsumerGroupType rd_kafka_ConsumerGroupDescription_type(IntPtr grpdesc);
1082+
10801083
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
10811084
internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc);
10821085

@@ -1107,6 +1110,9 @@ internal static extern void rd_kafka_DescribeConsumerGroups(
11071110
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
11081111
internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment);
11091112

1113+
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
1114+
internal static extern IntPtr rd_kafka_MemberDescription_target_assignment(IntPtr member);
1115+
11101116
[DllImport(DllName, CallingConvention = CallingConvention.Cdecl)]
11111117
internal static extern IntPtr rd_kafka_Node_id(IntPtr node);
11121118

test/Confluent.Kafka.UnitTests/Admin/ConsumerGroupDescription.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public void StringRepresentation()
3434
IsSimpleConsumerGroup = true,
3535
PartitionAssignor = "testAssignor",
3636
State = ConsumerGroupState.PreparingRebalance,
37+
GroupType = ConsumerGroupType.Classic,
3738
Coordinator = null,
3839
Members = new List<MemberDescription>
3940
{
@@ -59,7 +60,7 @@ public void StringRepresentation()
5960
};
6061
Assert.Equal(
6162
@"{""GroupId"": ""test"", ""Error"": ""NoError"", ""IsSimpleConsumerGroup"": true"+
62-
@", ""PartitionAssignor"": ""testAssignor"", ""State"": ""PreparingRebalance"", ""Coordinator"": null" +
63+
@", ""PartitionAssignor"": ""testAssignor"", ""State"": ""PreparingRebalance"", ""GroupType"": ""Classic"", ""Coordinator"": null" +
6364
@", ""Members"": [{""ClientId"": ""client1"", ""GroupInstanceId"": null" +
6465
@", ""ConsumerId"": ""consumer1"", ""Host"": ""localhost"", ""Assignment"": [{""Topic"": ""test1"", ""Partition"": 0}," +
6566
@"{""Topic"": ""test1"", ""Partition"": 1}]}], ""AuthorizedOperations"": [""Create""]}",
@@ -73,6 +74,7 @@ public void StringRepresentation()
7374
IsSimpleConsumerGroup = true,
7475
PartitionAssignor = "testAssignor",
7576
State = ConsumerGroupState.PreparingRebalance,
77+
GroupType = ConsumerGroupType.Classic,
7678
Coordinator = new Node
7779
{
7880
Host = "localhost",
@@ -99,7 +101,7 @@ public void StringRepresentation()
99101
};
100102
Assert.Equal(
101103
@"{""GroupId"": ""test"", ""Error"": ""NoError"", ""IsSimpleConsumerGroup"": true"+
102-
@", ""PartitionAssignor"": ""testAssignor"", ""State"": ""PreparingRebalance"", ""Coordinator"": " +
104+
@", ""PartitionAssignor"": ""testAssignor"", ""State"": ""PreparingRebalance"", ""GroupType"": ""Classic"", ""Coordinator"": " +
103105
@"{""Id"": 1, ""Host"": ""localhost"", ""Port"": 9092, ""Rack"": null}" +
104106
@", ""Members"": [{""ClientId"": ""client1"", ""GroupInstanceId"": ""groupInstanceId1""" +
105107
@", ""ConsumerId"": ""consumer1"", ""Host"": ""localhost"", ""Assignment"": [" +

0 commit comments

Comments
 (0)