1+ #include < stdexcept>
2+ #include < iostream>
3+ #include < csignal>
4+ #include < boost/program_options.hpp>
5+ #include " cppkafka/consumer.h"
6+ #include " cppkafka/configuration.h"
7+ #include " cppkafka/metadata.h"
8+ #include " cppkafka/topic.h"
9+
10+ using std::string;
11+ using std::exception;
12+ using std::cout;
13+ using std::endl;
14+
15+ using cppkafka::Consumer;
16+ using cppkafka::Exception;
17+ using cppkafka::Configuration;
18+ using cppkafka::Topic;
19+ using cppkafka::Metadata;
20+ using cppkafka::TopicMetadata;
21+ using cppkafka::BrokerMetadata;
22+
23+ namespace po = boost::program_options;
24+
25+ bool running = true ;
26+
27+ int main (int argc, char * argv[]) {
28+ string brokers;
29+ string group_id;
30+
31+ po::options_description options (" Options" );
32+ options.add_options ()
33+ (" help,h" , " produce this help message" )
34+ (" brokers,b" , po::value<string>(&brokers)->required (),
35+ " the kafka broker list" )
36+ (" group-id,g" , po::value<string>(&group_id)->required (),
37+ " the consumer group id" )
38+ ;
39+
40+ po::variables_map vm;
41+
42+ try {
43+ po::store (po::command_line_parser (argc, argv).options (options).run (), vm);
44+ po::notify (vm);
45+ }
46+ catch (exception& ex) {
47+ cout << " Error parsing options: " << ex.what () << endl;
48+ cout << endl;
49+ cout << options << endl;
50+ return 1 ;
51+ }
52+
53+ // Stop processing on SIGINT
54+ signal (SIGINT, [](int ) { running = false ; });
55+
56+ // Construct the configuration
57+ Configuration config = {
58+ { " metadata.broker.list" , brokers },
59+ { " group.id" , group_id },
60+ // Disable auto commit
61+ { " enable.auto.commit" , false }
62+ };
63+
64+ try {
65+ // Construct a consumer
66+ Consumer consumer (config);
67+
68+ // Fetch the metadata
69+ Metadata metadata = consumer.get_metadata ();
70+
71+ // Iterate over brokers
72+ cout << " Found the following brokers: " << endl;
73+ for (const BrokerMetadata& broker : metadata.get_brokers ()) {
74+ cout << " * " << broker.get_host () << endl;
75+ }
76+ cout << endl;
77+
78+ // Iterate over topics
79+ cout << " Found the following topics: " << endl;
80+ for (const TopicMetadata& topic : metadata.get_topics ()) {
81+ cout << " * " << topic.get_name () << " : " << topic.get_partitions ().size ()
82+ << " partitions" << endl;
83+ }
84+ }
85+ catch (const Exception& ex) {
86+ cout << " Error fetching metadata: " << ex.what () << endl;
87+ }
88+ }
0 commit comments