1515 */
1616#pragma once
1717
18+ #include " snctl-cpp/raii_helper.h"
1819#include " snctl-cpp/rk_event_wrapper.h"
1920#include < cassert>
2021#include < cstddef>
22+ #include < cstdint>
2123#include < iostream>
2224#include < librdkafka/rdkafka.h>
25+ #include < map>
2326#include < ostream>
27+ #include < sstream>
28+ #include < stdexcept>
2429#include < string>
2530
2631static std::ostream &operator <<(std::ostream &os, const rd_kafka_Node_t *node) {
@@ -37,8 +42,96 @@ static std::ostream &operator<<(std::ostream &os,
3742 return os;
3843}
3944
45+ static std::ostream &operator <<(std::ostream &os,
46+ const rd_kafka_topic_partition_t *partition) {
47+ return os << *partition;
48+ }
49+
50+ // Return a map, whose key's format is "<topic>-<partition>" and value is the
51+ // committed offset of the topic-partition.
52+ inline auto
53+ query_committed_offsets (rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
54+ const std::string &expected_group,
55+ rd_kafka_topic_partition_list_t *rk_topic_partitions) {
56+ auto *group_offset = rd_kafka_ListConsumerGroupOffsets_new (
57+ expected_group.c_str (), rk_topic_partitions);
58+ GUARD (group_offset, rd_kafka_ListConsumerGroupOffsets_destroy);
59+
60+ rd_kafka_ListConsumerGroupOffsets_t *group_offsets[1 ];
61+ group_offsets[0 ] = group_offset;
62+
63+ rd_kafka_ListConsumerGroupOffsets (rk, &group_offsets[0 ], 1 , nullptr , rkqu);
64+ auto event = RdKafkaEvent::poll (rkqu);
65+ const auto *result =
66+ rd_kafka_event_ListConsumerGroupOffsets_result (event.handle ());
67+ assert (result != nullptr );
68+
69+ size_t group_count;
70+ auto *group_result =
71+ rd_kafka_ListConsumerGroupOffsets_result_groups (result, &group_count);
72+ if (group_count != 1 ) {
73+ std::ostringstream oss;
74+ oss << " Expected exactly one group, but got " << group_count
75+ << " in ListConsumerGroupOffsets" ;
76+ throw std::runtime_error (oss.str ());
77+ }
78+
79+ if (expected_group !=
80+ std::string (rd_kafka_group_result_name (group_result[0 ]))) {
81+ std::ostringstream oss;
82+ oss << " Expected group '" << expected_group << " ', but got '"
83+ << rd_kafka_group_result_name (group_result[0 ])
84+ << " ' in ListConsumerGroupOffsets" ;
85+ throw std::runtime_error (oss.str ());
86+ }
87+
88+ std::map<std::string, int64_t > offsets;
89+ const auto *partitions = rd_kafka_group_result_partitions (group_result[0 ]);
90+ if (partitions == nullptr ) {
91+ return offsets;
92+ }
93+ for (size_t i = 0 ; i < partitions->cnt ; i++) {
94+ const auto &partition = partitions->elems [i];
95+ if (partition.offset == RD_KAFKA_OFFSET_INVALID) {
96+ continue ; // Skip invalid offsets
97+ }
98+ std::ostringstream oss;
99+ oss << partition;
100+ offsets[oss.str ()] = partition.offset ;
101+ }
102+ return offsets;
103+ }
104+
105+ // Return a map whose key is "<topic>-<partition>" and value is the end offset
106+ inline auto
107+ query_end_offsets (rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
108+ rd_kafka_topic_partition_list_t *rk_topic_partitions) {
109+ for (int i = 0 ; i < rk_topic_partitions->cnt ; i++) {
110+ rk_topic_partitions->elems [i].offset = RD_KAFKA_OFFSET_SPEC_LATEST;
111+ }
112+
113+ rd_kafka_ListOffsets (rk, rk_topic_partitions, nullptr , rkqu);
114+ auto event = RdKafkaEvent::poll (rkqu);
115+ const auto *result = rd_kafka_event_ListOffsets_result (event.handle ());
116+ assert (result != nullptr );
117+
118+ size_t num_partitions;
119+ const auto *offsets_result =
120+ rd_kafka_ListOffsets_result_infos (result, &num_partitions);
121+
122+ std::map<std::string, int64_t > offsets;
123+ for (size_t i = 0 ; i < num_partitions; i++) {
124+ const auto *rk_topic_partition =
125+ rd_kafka_ListOffsetsResultInfo_topic_partition (offsets_result[i]);
126+ std::ostringstream oss;
127+ oss << rk_topic_partition;
128+ offsets[oss.str ()] = rk_topic_partition->offset ;
129+ }
130+ return offsets;
131+ }
132+
40133inline void describe_group (rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
41- const std::string &group) {
134+ const std::string &group, bool show_lag ) {
42135 const char *groups[1 ] = {group.c_str ()};
43136 rd_kafka_DescribeConsumerGroups (rk, groups, 1 , nullptr , rkqu);
44137
@@ -84,11 +177,16 @@ inline void describe_group(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
84177
85178 if (member_count > 0 ) {
86179 std::cout << " There are " << member_count << " members:" << std::endl;
87- std::cout << " | index | client id | consumer id | host | assignments |"
88- << std::endl;
180+ std::cout << " | index | client id | consumer id | host | assignments |" ;
181+ if (show_lag) {
182+ std::cout << " lag | end offset |" ;
183+ }
184+ std::cout << std::endl;
89185 } else {
90186 std::cout << " No members" << std::endl;
91187 }
188+
189+ std::map<std::string, int > topic_to_partition_cnt;
92190 for (size_t i = 0 ; i < member_count; i++) {
93191 const auto *member = rd_kafka_ConsumerGroupDescription_member (group, i);
94192 assert (member != nullptr );
@@ -105,11 +203,53 @@ inline void describe_group(rd_kafka_t *rk, rd_kafka_queue_t *rkqu,
105203 if (j > 0 ) {
106204 std::cout << " , " ;
107205 }
108- std::cout << partitions->elems [j];
206+ const auto partition = partitions->elems [j];
207+ std::cout << partition;
208+ if (auto it = topic_to_partition_cnt.find (partition.topic );
209+ it != topic_to_partition_cnt.end ()) {
210+ if (it->second <= partition.partition ) {
211+ it->second = partition.partition + 1 ;
212+ }
213+ } else {
214+ topic_to_partition_cnt[partition.topic ] = partition.partition + 1 ;
215+ }
109216 }
110217 std::cout << " ] |" << std::endl;
111218 }
112219
220+ if (show_lag) {
221+ auto *rk_topic_partitions =
222+ rd_kafka_topic_partition_list_new (topic_to_partition_cnt.size ());
223+ GUARD (rk_topic_partitions, rd_kafka_topic_partition_list_destroy);
224+
225+ for (auto &&[topic, partition_cnt] : topic_to_partition_cnt) {
226+ for (int i = 0 ; i < partition_cnt; i++) {
227+ rd_kafka_topic_partition_list_add (rk_topic_partitions, topic.c_str (),
228+ i);
229+ }
230+ }
231+
232+ const auto committed_offsets =
233+ query_committed_offsets (rk, rkqu, group_id, rk_topic_partitions);
234+ const auto end_offsets = query_end_offsets (rk, rkqu, rk_topic_partitions);
235+ std::cout << " Offsets info for group '" << group_id << " ' with "
236+ << committed_offsets.size ()
237+ << " topic-partitions:" << std::endl;
238+ std::cout << " | topic-partition | committed offset | end offset | lag |"
239+ << std::endl;
240+ for (auto &&[topic_partition, committed_offset] : committed_offsets) {
241+ auto end_offset_it = end_offsets.find (topic_partition);
242+ if (end_offset_it == end_offsets.cend ()) {
243+ std::cout << " | " << topic_partition << " | " << committed_offset
244+ << " | N/A | N/A |" << std::endl;
245+ } else {
246+ auto end_offset = end_offset_it->second ;
247+ std::cout << " | " << topic_partition << " | " << committed_offset
248+ << " | " << end_offset << " | "
249+ << (end_offset - committed_offset) << " |" << std::endl;
250+ }
251+ }
252+ }
113253 } catch (const std::runtime_error &e) {
114254 std::cerr << " Failed to describe group '" << group << " ': " << e.what ()
115255 << std::endl;
0 commit comments