Skip to content

Commit 6ad013f

Browse files
feat: list all groups (#6)
1 parent e59537c commit 6ad013f

File tree

4 files changed

+127
-4
lines changed

4 files changed

+127
-4
lines changed

README.md

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ Updated config file /private/tmp/sncloud.ini
6363

6464
**NOTE**: The commands below assumes `snctl-cpp` is in the `PATH`.
6565

66-
### Create a topic
66+
### Topics
67+
68+
#### Create a topic
6769

6870
```bash
6971
$ snctl-cpp topics create tp0
@@ -72,7 +74,7 @@ $ snctl-cpp topics create tp1 -p 5
7274
Created topic "tp1" with 5 partitions
7375
```
7476

75-
### Delete a topic
77+
#### Delete a topic
7678

7779
```bash
7880
$ snctl-cpp topics delete tp
@@ -81,7 +83,7 @@ $ snctl-cpp topics delete tp0
8183
Deleted topic "tp0"
8284
```
8385

84-
### Describe a topic
86+
#### Describe a topic
8587

8688
Query the owner brokers for all partitions:
8789

@@ -105,7 +107,7 @@ Partition[15] leader: {"id": 644587507, url: "pb2-<xxx>:9093"}
105107
106108
As you can see, when a client specifies `use1-az1` as its zone, only brokers in the same zone (`pb2` and `pb5`) will serve the requests from that client.
107109
108-
### List topics
110+
#### List topics
109111
110112
List all topics and print the number of partitions for each topic:
111113
@@ -116,6 +118,18 @@ topic count: 2
116118
[1] "my-topic-1" with 10 partitions
117119
```
118120
121+
## Consumer Groups
122+
123+
### List all consumer groups
124+
125+
```bash
126+
$ snctl-cpp groups list
127+
There are 1 group
128+
[0] my-group Stable
129+
```
130+
131+
The format of each line is `[index] <group-id> <state>`.
132+
119133
## Logging
120134
121135
By default, rdkafka will generate logs to the standard output. `snctl-cpp` can redirect the logs to a file. For example, with the following configs in `sncloud.ini`:

include/snctl-cpp/groups.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright 2025 Yunze Xu
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "snctl-cpp/groups/list_groups.h"
19+
#include "snctl-cpp/subcommand.h"
20+
21+
#include <argparse/argparse.hpp>
22+
#include <librdkafka/rdkafka.h>
23+
24+
class Groups : public SubCommand {
25+
public:
26+
explicit Groups(argparse::ArgumentParser &parent) : SubCommand("groups") {
27+
list_command_.add_description("List all consumer groups");
28+
29+
add_child(list_command_);
30+
attach_parent(parent);
31+
}
32+
33+
void run(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
34+
if (is_subcommand_used(list_command_)) {
35+
list_groups(rk, rkqu);
36+
} else {
37+
fail();
38+
}
39+
}
40+
41+
private:
42+
argparse::ArgumentParser list_command_{"list"};
43+
};
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright 2025 Yunze Xu
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "snctl-cpp/rk_event_wrapper.h"
19+
#include <cassert>
20+
#include <iostream>
21+
#include <librdkafka/rdkafka.h>
22+
#include <stdexcept>
23+
24+
inline void list_groups(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
25+
rd_kafka_ListConsumerGroups(rk, nullptr, rkqu);
26+
27+
try {
28+
auto event = RdKafkaEvent::poll(rkqu);
29+
const auto *result =
30+
rd_kafka_event_ListConsumerGroups_result(event.handle());
31+
assert(result != nullptr);
32+
33+
size_t count;
34+
const auto *errors =
35+
rd_kafka_ListConsumerGroups_result_errors(result, &count);
36+
if (errors != nullptr) {
37+
for (size_t i = 0; i < count; ++i) {
38+
const auto *error = errors[i];
39+
std::cerr << i << " error: " << rd_kafka_error_string(error)
40+
<< std::endl;
41+
}
42+
return;
43+
}
44+
45+
const auto *groups =
46+
rd_kafka_ListConsumerGroups_result_valid(result, &count);
47+
assert(groups != nullptr);
48+
std::cout << "There are " << count << " group" << (count == 1 ? "" : "s")
49+
<< std::endl;
50+
for (size_t i = 0; i < count; i++) {
51+
const auto *group = groups[i];
52+
std::cout << "[" << i << "] "
53+
<< rd_kafka_ConsumerGroupListing_group_id(group) << " "
54+
<< rd_kafka_consumer_group_state_name(
55+
rd_kafka_ConsumerGroupListing_state(group))
56+
<< std::endl;
57+
}
58+
59+
} catch (const std::runtime_error &e) {
60+
std::cerr << "Failed to list consumer groups: " << e.what() << std::endl;
61+
}
62+
}

src/main.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <vector>
2929

3030
#include "snctl-cpp/configs.h"
31+
#include "snctl-cpp/groups.h"
3132
#include "snctl-cpp/raii_helper.h"
3233
#include "snctl-cpp/topics.h"
3334

@@ -55,6 +56,7 @@ int main(int argc, char *argv[]) noexcept(false) {
5556

5657
Topics topics{program};
5758
Configs configs{program};
59+
Groups groups{program};
5860
try {
5961
program.parse_args(argc, argv);
6062
} catch (const std::exception &err) {
@@ -128,6 +130,8 @@ int main(int argc, char *argv[]) noexcept(false) {
128130
topics.run(rk, rkqu);
129131
} else if (configs.used_by_parent(program)) {
130132
configs.run();
133+
} else if (groups.used_by_parent(program)) {
134+
groups.run(rk, rkqu);
131135
} else {
132136
if (program["--get-config"] == true) {
133137
if (const auto &config_file = configs.config_file();

0 commit comments

Comments
 (0)