1818
1919import org .apache .kafka .clients .CommonClientConfigs ;
2020import org .apache .kafka .clients .admin .Admin ;
21+ import org .apache .kafka .clients .admin .DescribeStreamsGroupsResult ;
2122import org .apache .kafka .clients .admin .GroupListing ;
23+ import org .apache .kafka .clients .admin .ListConsumerGroupOffsetsSpec ;
2224import org .apache .kafka .clients .admin .ListGroupsOptions ;
2325import org .apache .kafka .clients .admin .ListGroupsResult ;
26+ import org .apache .kafka .clients .admin .ListOffsetsResult ;
27+ import org .apache .kafka .clients .admin .OffsetSpec ;
28+ import org .apache .kafka .clients .admin .StreamsGroupDescription ;
29+ import org .apache .kafka .clients .admin .StreamsGroupMemberAssignment ;
30+ import org .apache .kafka .clients .admin .StreamsGroupMemberDescription ;
31+ import org .apache .kafka .clients .admin .StreamsGroupSubtopologyDescription ;
32+ import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
2433import org .apache .kafka .common .GroupState ;
2534import org .apache .kafka .common .GroupType ;
35+ import org .apache .kafka .common .TopicPartition ;
2636import org .apache .kafka .common .utils .Utils ;
2737import org .apache .kafka .server .util .CommandLineUtils ;
2838
2939import java .io .IOException ;
3040import java .util .ArrayList ;
3141import java .util .Arrays ;
3242import java .util .Collection ;
43+ import java .util .HashMap ;
44+ import java .util .HashSet ;
3345import java .util .List ;
3446import java .util .Map ;
3547import java .util .Optional ;
@@ -49,9 +61,9 @@ public static void main(String[] args) {
4961 opts .checkArgs ();
5062
5163 // should have exactly one action
52- long numberOfActions = Stream .of (opts .listOpt ).filter (opts .options ::has ).count ();
64+ long numberOfActions = Stream .of (opts .listOpt , opts . describeOpt ).filter (opts .options ::has ).count ();
5365 if (numberOfActions != 1 )
54- CommandLineUtils .printUsageAndExit (opts .parser , "Command must include exactly one action: --list." );
66+ CommandLineUtils .printUsageAndExit (opts .parser , "Command must include exactly one action: --list, or --describe ." );
5567
5668 run (opts );
5769 } catch (OptionException e ) {
@@ -63,6 +75,8 @@ public static void run(StreamsGroupCommandOptions opts) {
6375 try (StreamsGroupService streamsGroupService = new StreamsGroupService (opts , Map .of ())) {
6476 if (opts .options .has (opts .listOpt )) {
6577 streamsGroupService .listGroups ();
78+ } else if (opts .options .has (opts .describeOpt )) {
79+ streamsGroupService .describeGroups ();
6680 } else {
6781 throw new IllegalArgumentException ("Unknown action!" );
6882 }
@@ -79,7 +93,7 @@ static Set<GroupState> groupStatesFromString(String input) {
7993 Set <GroupState > validStates = GroupState .groupStatesForType (GroupType .STREAMS );
8094 if (!validStates .containsAll (parsedStates )) {
8195 throw new IllegalArgumentException ("Invalid state list '" + input + "'. Valid states are: " +
82- validStates .stream ().map (GroupState ::toString ).collect (Collectors .joining (", " )));
96+ validStates .stream ().map (GroupState ::toString ).collect (Collectors .joining (", " )));
8397 }
8498 return parsedStates ;
8599 }
@@ -154,6 +168,224 @@ private void printGroupInfo(List<GroupListing> groups) {
154168 }
155169 }
156170
171+ public void describeGroups () throws ExecutionException , InterruptedException {
172+ List <String > groups = listStreamsGroups ();
173+ if (!groups .isEmpty ()) {
174+ StreamsGroupDescription description = getDescribeGroup (groups .get (0 ));
175+ if (description == null )
176+ return ;
177+ boolean verbose = opts .options .has (opts .verboseOpt );
178+ if (opts .options .has (opts .membersOpt )) {
179+ printMembers (description , verbose );
180+ } else if (opts .options .has (opts .stateOpt )) {
181+ printStates (description , verbose );
182+ } else {
183+ printOffsets (description , verbose );
184+ }
185+ }
186+ }
187+
188+ StreamsGroupDescription getDescribeGroup (String group ) throws ExecutionException , InterruptedException {
189+ DescribeStreamsGroupsResult result = adminClient .describeStreamsGroups (List .of (group ));
190+ Map <String , StreamsGroupDescription > descriptionMap = result .all ().get ();
191+ return descriptionMap .get (group );
192+ }
193+
194+ private void printMembers (StreamsGroupDescription description , boolean verbose ) {
195+ final int groupLen = Math .max (15 , description .groupId ().length ());
196+ int maxMemberIdLen = 15 , maxHostLen = 15 , maxClientIdLen = 15 ;
197+ Collection <StreamsGroupMemberDescription > members = description .members ();
198+ if (isGroupStateValid (description .groupState (), description .members ().size ())) {
199+ maybePrintEmptyGroupState (description .groupId (), description .groupState ());
200+ for (StreamsGroupMemberDescription member : members ) {
201+ maxMemberIdLen = Math .max (maxMemberIdLen , member .memberId ().length ());
202+ maxHostLen = Math .max (maxHostLen , member .processId ().length ());
203+ maxClientIdLen = Math .max (maxClientIdLen , member .clientId ().length ());
204+ }
205+
206+ if (!verbose ) {
207+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n " ;
208+ System .out .printf (fmt , "GROUP" , "MEMBER" , "PROCESS" , "CLIENT-ID" , "ASSIGNMENTS" );
209+ for (StreamsGroupMemberDescription member : members ) {
210+ System .out .printf (fmt , description .groupId (), member .memberId (), member .processId (), member .clientId (),
211+ getTasksForPrinting (member .assignment (), Optional .empty ()));
212+ }
213+ } else {
214+ final int targetAssignmentEpochLen = 25 , topologyEpochLen = 15 , memberProtocolLen = 15 , memberEpochLen = 15 ;
215+ String fmt = "%" + -groupLen + "s %" + -targetAssignmentEpochLen + "s %" + -topologyEpochLen + "s%" + -maxMemberIdLen
216+ + "s %" + -memberProtocolLen + "s %" + -memberEpochLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n " ;
217+ System .out .printf (fmt , "GROUP" , "TARGET-ASSIGNMENT-EPOCH" , "TOPOLOGY-EPOCH" , "MEMBER" , "MEMBER-PROTOCOL" , "MEMBER-EPOCH" , "PROCESS" , "CLIENT-ID" , "ASSIGNMENTS" );
218+ for (StreamsGroupMemberDescription member : members ) {
219+ System .out .printf (fmt , description .groupId (), description .targetAssignmentEpoch (), description .topologyEpoch (), member .memberId (),
220+ member .isClassic () ? "classic" : "streams" , member .memberEpoch (), member .processId (), member .clientId (), getTasksForPrinting (member .assignment (), Optional .of (member .targetAssignment ())));
221+ }
222+ }
223+ }
224+ }
225+
226+ private String prepareTaskType (List <StreamsGroupMemberAssignment .TaskIds > tasks , String taskType ) {
227+ if (tasks .isEmpty ()) {
228+ return "" ;
229+ }
230+ StringBuilder builder = new StringBuilder (taskType ).append (": " );
231+ for (StreamsGroupMemberAssignment .TaskIds taskIds : tasks ) {
232+ builder .append (taskIds .subtopologyId ()).append (":[" );
233+ builder .append (taskIds .partitions ().stream ().map (String ::valueOf ).collect (Collectors .joining ("," )));
234+ builder .append ("]; " );
235+ }
236+ return builder .toString ();
237+ }
238+
239+ private String getTasksForPrinting (StreamsGroupMemberAssignment assignment , Optional <StreamsGroupMemberAssignment > targetAssignment ) {
240+ StringBuilder builder = new StringBuilder ();
241+ builder .append (prepareTaskType (assignment .activeTasks (), "ACTIVE" ))
242+ .append (prepareTaskType (assignment .standbyTasks (), "STANDBY" ))
243+ .append (prepareTaskType (assignment .warmupTasks (), "WARMUP" ));
244+ targetAssignment .ifPresent (target -> builder .append (prepareTaskType (target .activeTasks (), "TARGET-ACTIVE" ))
245+ .append (prepareTaskType (target .standbyTasks (), "TARGET-STANDBY" ))
246+ .append (prepareTaskType (target .warmupTasks (), "TARGET-WARMUP" )));
247+ return builder .toString ();
248+ }
249+
250+ private void printStates (StreamsGroupDescription description , boolean verbose ) {
251+ maybePrintEmptyGroupState (description .groupId (), description .groupState ());
252+
253+ final int groupLen = Math .max (15 , description .groupId ().length ());
254+ String coordinator = description .coordinator ().host () + ":" + description .coordinator ().port () + " (" + description .coordinator ().idString () + ")" ;
255+
256+ final int coordinatorLen = Math .max (25 , coordinator .length ());
257+ final int stateLen = 25 ;
258+ if (!verbose ) {
259+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %" + -stateLen + "s %s\n " ;
260+ System .out .printf (fmt , "GROUP" , "COORDINATOR (ID)" , "STATE" , "#MEMBERS" );
261+ System .out .printf (fmt , description .groupId (), coordinator , description .groupState ().toString (), description .members ().size ());
262+ } else {
263+ final int groupEpochLen = 15 , targetAssignmentEpochLen = 25 ;
264+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %" + -stateLen + "s %" + -groupEpochLen + "s %" + -targetAssignmentEpochLen + "s %s\n " ;
265+ System .out .printf (fmt , "GROUP" , "COORDINATOR (ID)" , "STATE" , "GROUP-EPOCH" , "TARGET-ASSIGNMENT-EPOCH" , "#MEMBERS" );
266+ System .out .printf (fmt , description .groupId (), coordinator , description .groupState ().toString (), description .groupEpoch (), description .targetAssignmentEpoch (), description .members ().size ());
267+ }
268+ }
269+
270+ private void printOffsets (StreamsGroupDescription description , boolean verbose ) throws ExecutionException , InterruptedException {
271+ Map <TopicPartition , OffsetsInfo > offsets = getOffsets (description );
272+ if (isGroupStateValid (description .groupState (), description .members ().size ())) {
273+ maybePrintEmptyGroupState (description .groupId (), description .groupState ());
274+ final int groupLen = Math .max (15 , description .groupId ().length ());
275+ int maxTopicLen = 15 ;
276+ for (TopicPartition topicPartition : offsets .keySet ()) {
277+ maxTopicLen = Math .max (maxTopicLen , topicPartition .topic ().length ());
278+ }
279+ final int maxPartitionLen = 10 ;
280+ if (!verbose ) {
281+ String fmt = "%" + -groupLen + "s %" + -maxTopicLen + "s %" + -maxPartitionLen + "s %s\n " ;
282+ System .out .printf (fmt , "GROUP" , "TOPIC" , "PARTITION" , "OFFSET-LAG" );
283+ for (Map .Entry <TopicPartition , OffsetsInfo > offset : offsets .entrySet ()) {
284+ System .out .printf (fmt , description .groupId (), offset .getKey ().topic (), offset .getKey ().partition (), offset .getValue ().lag );
285+ }
286+ } else {
287+ String fmt = "%" + (-groupLen ) + "s %" + (-maxTopicLen ) + "s %-10s %-15s %-15s %-15s %-15s%n" ;
288+ System .out .printf (fmt , "GROUP" , "TOPIC" , "PARTITION" , "CURRENT-OFFSET" , "LEADER-EPOCH" , "LOG-END-OFFSET" , "OFFSET-LAG" );
289+ for (Map .Entry <TopicPartition , OffsetsInfo > offset : offsets .entrySet ()) {
290+ System .out .printf (fmt , description .groupId (), offset .getKey ().topic (), offset .getKey ().partition (),
291+ offset .getValue ().currentOffset .map (Object ::toString ).orElse ("-" ), offset .getValue ().leaderEpoch .map (Object ::toString ).orElse ("-" ),
292+ offset .getValue ().logEndOffset , offset .getValue ().lag );
293+ }
294+ }
295+ }
296+ }
297+
298+ Map <TopicPartition , OffsetsInfo > getOffsets (StreamsGroupDescription description ) throws ExecutionException , InterruptedException {
299+ final Collection <StreamsGroupMemberDescription > members = description .members ();
300+ Set <TopicPartition > allTp = new HashSet <>();
301+ for (StreamsGroupMemberDescription memberDescription : members ) {
302+ allTp .addAll (getTopicPartitions (memberDescription .assignment ().activeTasks (), description ));
303+ }
304+ // fetch latest and earliest offsets
305+ Map <TopicPartition , OffsetSpec > earliest = new HashMap <>();
306+ Map <TopicPartition , OffsetSpec > latest = new HashMap <>();
307+
308+ for (TopicPartition tp : allTp ) {
309+ earliest .put (tp , OffsetSpec .earliest ());
310+ latest .put (tp , OffsetSpec .latest ());
311+ }
312+ Map <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo > earliestResult = adminClient .listOffsets (earliest ).all ().get ();
313+ Map <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo > latestResult = adminClient .listOffsets (latest ).all ().get ();
314+ Map <TopicPartition , OffsetAndMetadata > committedOffsets = getCommittedOffsets (description .groupId ());
315+
316+ Map <TopicPartition , OffsetsInfo > output = new HashMap <>();
317+ for (Map .Entry <TopicPartition , ListOffsetsResult .ListOffsetsResultInfo > tp : earliestResult .entrySet ()) {
318+ final Optional <Long > currentOffset = committedOffsets .containsKey (tp .getKey ()) ? Optional .of (committedOffsets .get (tp .getKey ()).offset ()) : Optional .empty ();
319+ final Optional <Integer > leaderEpoch = committedOffsets .containsKey (tp .getKey ()) ? committedOffsets .get (tp .getKey ()).leaderEpoch () : Optional .empty ();
320+ final long lag = currentOffset .map (current -> latestResult .get (tp .getKey ()).offset () - current ).orElseGet (() -> latestResult .get (tp .getKey ()).offset () - earliestResult .get (tp .getKey ()).offset ());
321+ output .put (tp .getKey (),
322+ new OffsetsInfo (
323+ currentOffset ,
324+ leaderEpoch ,
325+ latestResult .get (tp .getKey ()).offset (),
326+ lag ));
327+ }
328+ return output ;
329+ }
330+
331+ Map <TopicPartition , OffsetAndMetadata > getCommittedOffsets (String groupId ) {
332+ try {
333+ return adminClient .listConsumerGroupOffsets (
334+ Map .of (groupId , new ListConsumerGroupOffsetsSpec ())).partitionsToOffsetAndMetadata (groupId ).get ();
335+ } catch (InterruptedException | ExecutionException e ) {
336+ throw new RuntimeException (e );
337+ }
338+ }
339+
340+ /**
341+ * Prints an error message if the group state indicates that the group is either dead or empty.
342+ *
343+ * @param group The ID of the group being checked.
344+ * @param state The current state of the group, represented as a `GroupState` object.
345+ * Possible values include `DEAD` (indicating the group does not exist)
346+ * and `EMPTY` (indicating the group has no active members).
347+ */
348+ private static void maybePrintEmptyGroupState (String group , GroupState state ) {
349+ if (state == GroupState .DEAD ) {
350+ printError ("Streams group '" + group + "' does not exist." , Optional .empty ());
351+ } else if (state == GroupState .EMPTY ) {
352+ printError ("Streams group '" + group + "' has no active members." , Optional .empty ());
353+ }
354+ }
355+
356+ /**
357+ * Checks if the group state is valid based on its state and the number of rows.
358+ *
359+ * @param state The current state of the group, represented as a `GroupState` object.
360+ * @param numRows The number of rows associated with the group.
361+ * @return `true` if the group state is not `DEAD` and the number of rows is greater than 0; otherwise, `false`.
362+ */
363+ // Visibility for testing
364+ static boolean isGroupStateValid (GroupState state , int numRows ) {
365+ return !state .equals (GroupState .DEAD ) && numRows > 0 ;
366+ }
367+
368+ private static Set <TopicPartition > getTopicPartitions (List <StreamsGroupMemberAssignment .TaskIds > taskIds , StreamsGroupDescription description ) {
369+ Map <String , List <String >> allSourceTopics = new HashMap <>();
370+ for (StreamsGroupSubtopologyDescription subtopologyDescription : description .subtopologies ()) {
371+ allSourceTopics .put (subtopologyDescription .subtopologyId (), subtopologyDescription .sourceTopics ());
372+ }
373+ Set <TopicPartition > topicPartitions = new HashSet <>();
374+
375+ for (StreamsGroupMemberAssignment .TaskIds task : taskIds ) {
376+ List <String > sourceTopics = allSourceTopics .get (task .subtopologyId ());
377+ if (sourceTopics == null ) {
378+ throw new IllegalArgumentException ("Subtopology " + task .subtopologyId () + " not found in group description!" );
379+ }
380+ for (String topic : sourceTopics ) {
381+ for (Integer partition : task .partitions ()) {
382+ topicPartitions .add (new TopicPartition (topic , partition ));
383+ }
384+ }
385+ }
386+ return topicPartitions ;
387+ }
388+
157389 public void close () {
158390 adminClient .close ();
159391 }
@@ -165,4 +397,7 @@ protected Admin createAdminClient(Map<String, String> configOverrides) throws IO
165397 return Admin .create (props );
166398 }
167399 }
400+
401+ public record OffsetsInfo (Optional <Long > currentOffset , Optional <Integer > leaderEpoch , Long logEndOffset , Long lag ) {
402+ }
168403}
0 commit comments