Skip to content

Commit fa09397

Browse files
committed
group: resolves partitions once per topic.
before we would have resolved them once per topic * group.
1 parent 46b39de commit fa09397

File tree

1 file changed

+18
-12
lines changed

1 file changed

+18
-12
lines changed

group.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ func (cmd *groupCmd) run(args []string) {
6060
}
6161

6262
brokers := cmd.client.Brokers()
63+
fmt.Fprintf(os.Stderr, "found %v brokers\n", len(brokers))
64+
6365
groups := []string{cmd.group}
6466
if cmd.group == "" {
6567
groups = []string{}
@@ -93,30 +95,34 @@ func (cmd *groupCmd) run(args []string) {
9395
return
9496
}
9597

98+
topicPartitions := map[string][]int32{}
99+
for _, topic := range topics {
100+
parts := cmd.partitions
101+
if len(parts) == 0 {
102+
parts = cmd.fetchPartitions(topic)
103+
fmt.Fprintf(os.Stderr, "found partitions=%v for topic=%v\n", parts, topic)
104+
}
105+
topicPartitions[topic] = parts
106+
}
107+
96108
wg := &sync.WaitGroup{}
97109
wg.Add(len(groups) * len(topics))
98110
for _, grp := range groups {
99-
for _, top := range topics {
100-
go func(grp, topic string) {
101-
cmd.printGroupTopicOffset(out, grp, topic)
111+
for top, parts := range topicPartitions {
112+
go func(grp, topic string, partitions []int32) {
113+
cmd.printGroupTopicOffset(out, grp, topic, partitions)
102114
wg.Done()
103-
}(grp, top)
115+
}(grp, top, parts)
104116
}
105117
}
106118
wg.Wait()
107119
}
108120

109-
func (cmd *groupCmd) printGroupTopicOffset(out chan printContext, grp, top string) {
121+
func (cmd *groupCmd) printGroupTopicOffset(out chan printContext, grp, top string, parts []int32) {
110122
target := group{Name: grp, Topic: top, Offsets: []groupOffset{}}
111123
results := make(chan groupOffset)
112124
done := make(chan struct{})
113-
parts := cmd.partitions
114-
if len(cmd.partitions) == 0 {
115-
parts = cmd.fetchPartitions(top)
116-
if cmd.verbose {
117-
fmt.Fprintf(os.Stderr, "resolved partitions for topic=%v to %v\n", top, parts)
118-
}
119-
}
125+
120126
wg := &sync.WaitGroup{}
121127
wg.Add(len(parts))
122128
for _, part := range parts {

0 commit comments

Comments
 (0)