Skip to content

Commit f2f975c

Browse files
feat: support describe group (#7)
1 parent 6ad013f commit f2f975c

File tree

3 files changed

+143
-1
lines changed

3 files changed

+143
-1
lines changed

README.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,21 @@ topic count: 2
125125
```bash
126126
$ snctl-cpp groups list
127127
There are 1 group
128-
[0] my-group Stable
128+
[0] sub Stable
129+
```
130+
131+
### Describe a specific consumer group
132+
133+
```bash
134+
$ snctl-cpp groups describe sub
135+
Group ID: sub
136+
Assignor: range
137+
State: Stable
138+
Type: 2
139+
There are 2 members:
140+
| index | client id | consumer id | host | assignments |
141+
| 0 | consumer-sub-1 | consumer-sub-1-b97d2b45-86cf-4352-8e82-9ebdfd6fbff6 | /127.0.0.1:54214 | [test-0, test-1] |
142+
| 1 | consumer-sub-2 | consumer-sub-2-63b7c688-3007-4650-91eb-404284dfd837 | /127.0.0.1:54213 | [test-2, test-3] |
129143
```
130144
131145
The format of each line is `[index] <group-id> <state>`.

include/snctl-cpp/groups.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
#pragma once
1717

18+
#include "snctl-cpp/groups/describe_group.h"
1819
#include "snctl-cpp/groups/list_groups.h"
1920
#include "snctl-cpp/subcommand.h"
2021

@@ -25,19 +26,29 @@ class Groups : public SubCommand {
2526
public:
2627
explicit Groups(argparse::ArgumentParser &parent) : SubCommand("groups") {
2728
list_command_.add_description("List all consumer groups");
29+
describe_command_.add_description("Describe a specific consumer group")
30+
.add_argument("group")
31+
.help("The group id")
32+
.required();
2833

2934
add_child(list_command_);
35+
add_child(describe_command_);
36+
3037
attach_parent(parent);
3138
}
3239

3340
void run(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
3441
if (is_subcommand_used(list_command_)) {
3542
list_groups(rk, rkqu);
43+
} else if (is_subcommand_used(describe_command_)) {
44+
auto group = describe_command_.get("group");
45+
describe_group(rk, rkqu, group);
3646
} else {
3747
fail();
3848
}
3949
}
4050

4151
private:
4252
argparse::ArgumentParser list_command_{"list"};
53+
argparse::ArgumentParser describe_command_{"describe"};
4354
};
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright 2025 Yunze Xu
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "snctl-cpp/rk_event_wrapper.h"
19+
#include <cassert>
20+
#include <cstddef>
21+
#include <iostream>
22+
#include <librdkafka/rdkafka.h>
23+
#include <ostream>
24+
#include <string>
25+
26+
static std::ostream &operator<<(std::ostream &os, const rd_kafka_Node_t *node) {
27+
os << rd_kafka_Node_host(node) << ":" << rd_kafka_Node_port(node);
28+
if (const auto *rack = rd_kafka_Node_rack(node); rack != nullptr) {
29+
os << " (rack: " << rack << ")";
30+
}
31+
return os;
32+
}
33+
34+
static std::ostream &operator<<(std::ostream &os,
35+
const rd_kafka_topic_partition_t &partition) {
36+
os << partition.topic << "-" << partition.partition;
37+
return os;
38+
}
39+
40+
inline void describe_group(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
41+
const std::string &group) {
42+
const char *groups[1] = {group.c_str()};
43+
rd_kafka_DescribeConsumerGroups(rk, groups, 1, nullptr, rkqu);
44+
45+
try {
46+
auto event = RdKafkaEvent::poll(rkqu);
47+
const auto *result =
48+
rd_kafka_event_DescribeConsumerGroups_result(event.handle());
49+
assert(result != nullptr);
50+
51+
size_t group_count;
52+
const auto *groups =
53+
rd_kafka_DescribeConsumerGroups_result_groups(result, &group_count);
54+
assert(groups);
55+
if (group_count != 1) {
56+
std::cerr << "Expected exactly one group, but got " << group_count
57+
<< std::endl;
58+
return;
59+
}
60+
61+
const auto *group = groups[0];
62+
const auto *group_id = rd_kafka_ConsumerGroupDescription_group_id(group);
63+
const auto *error = rd_kafka_ConsumerGroupDescription_error(group);
64+
if (error != nullptr) {
65+
std::cerr << "Error describing group '" << group_id
66+
<< "': " << rd_kafka_error_string(error) << std::endl;
67+
return;
68+
}
69+
70+
const auto *assignor =
71+
rd_kafka_ConsumerGroupDescription_partition_assignor(group);
72+
const auto state = rd_kafka_ConsumerGroupDescription_state(group);
73+
const auto *state_name = rd_kafka_consumer_group_state_name(state);
74+
const auto *node = rd_kafka_ConsumerGroupDescription_coordinator(group);
75+
const auto type = rd_kafka_ConsumerGroupDescription_type(group);
76+
77+
std::cout << "Group ID: " << group_id << std::endl;
78+
std::cout << "Assignor: " << assignor << std::endl;
79+
std::cout << "State: " << state_name << std::endl;
80+
std::cout << "Type: " << type << std::endl;
81+
std::cout << "Coordinator: " << node << std::endl;
82+
const auto member_count =
83+
rd_kafka_ConsumerGroupDescription_member_count(group);
84+
85+
if (member_count > 0) {
86+
std::cout << "There are " << member_count << " members:" << std::endl;
87+
std::cout << "| index | client id | consumer id | host | assignments |"
88+
<< std::endl;
89+
} else {
90+
std::cout << "No members" << std::endl;
91+
}
92+
for (size_t i = 0; i < member_count; i++) {
93+
const auto *member = rd_kafka_ConsumerGroupDescription_member(group, i);
94+
assert(member != nullptr);
95+
const auto *assignment = rd_kafka_MemberDescription_assignment(member);
96+
assert(assignment != nullptr);
97+
std::cout << "| " << i << " | "
98+
<< rd_kafka_MemberDescription_client_id(member) << " | "
99+
<< rd_kafka_MemberDescription_consumer_id(member) << " | "
100+
<< rd_kafka_MemberDescription_host(member);
101+
const auto *partitions = rd_kafka_MemberAssignment_partitions(assignment);
102+
assert(partitions != nullptr);
103+
std::cout << " | [";
104+
for (int j = 0; j < partitions->cnt; j++) {
105+
if (j > 0) {
106+
std::cout << ", ";
107+
}
108+
std::cout << partitions->elems[j];
109+
}
110+
std::cout << "] |" << std::endl;
111+
}
112+
113+
} catch (const std::runtime_error &e) {
114+
std::cerr << "Failed to describe group '" << group << "': " << e.what()
115+
<< std::endl;
116+
}
117+
}

0 commit comments

Comments
 (0)