1+ #include < stdexcept>
2+ #include < iostream>
3+ #include < boost/program_options.hpp>
4+ #include " cppkafka/producer.h"
5+ #include " cppkafka/configuration.h"
6+ #include " cppkafka/group_information.h"
7+ #include " cppkafka/topic.h"
8+
9+ using std::string;
10+ using std::exception;
11+ using std::vector;
12+ using std::cout;
13+ using std::endl;
14+
15+ using cppkafka::Producer;
16+ using cppkafka::Exception;
17+ using cppkafka::Configuration;
18+ using cppkafka::Topic;
19+ using cppkafka::GroupInformation;
20+ using cppkafka::GroupMemberInformation;
21+
22+ namespace po = boost::program_options;
23+
24+ int main (int argc, char * argv[]) {
25+ string brokers;
26+ string group_id;
27+
28+ po::options_description options (" Options" );
29+ options.add_options ()
30+ (" help,h" , " produce this help message" )
31+ (" brokers,b" , po::value<string>(&brokers)->required (),
32+ " the kafka broker list" )
33+ (" group-id,g" , po::value<string>(&group_id),
34+ " only fetch consumer group information for the specified one" )
35+ ;
36+
37+ po::variables_map vm;
38+
39+ try {
40+ po::store (po::command_line_parser (argc, argv).options (options).run (), vm);
41+ po::notify (vm);
42+ }
43+ catch (exception& ex) {
44+ cout << " Error parsing options: " << ex.what () << endl;
45+ cout << endl;
46+ cout << options << endl;
47+ return 1 ;
48+ }
49+
50+ // Construct the configuration
51+ Configuration config = {
52+ { " metadata.broker.list" , brokers },
53+ // Disable auto commit
54+ { " enable.auto.commit" , false }
55+ };
56+
57+ try {
58+ // Construct a producer
59+ Producer producer (config);
60+
61+ // Fetch the group information
62+ vector<GroupInformation> groups = [&]() {
63+ if (!group_id.empty ()) {
64+ return vector<GroupInformation>{producer.get_consumer_group (group_id)};
65+ }
66+ else {
67+ return producer.get_consumer_groups ();
68+ }
69+ }();
70+
71+ if (groups.empty ()) {
72+ cout << " Found no consumers" << endl;
73+ return 0 ;
74+ }
75+ cout << " Found the following consumers: " << endl;
76+ for (const GroupInformation& group : groups) {
77+ cout << " * \" " << group.get_name () << " \" having the following members: " << endl;
78+ for (const GroupMemberInformation& info : group.get_members ()) {
79+ cout << " - " << info.get_member_id () << " @ " << info.get_client_host () << endl;
80+ }
81+ cout << endl;
82+ }
83+ }
84+ catch (const Exception& ex) {
85+ cout << " Error fetching group information: " << ex.what () << endl;
86+ }
87+ }
0 commit comments