Skip to content

Commit 85d32c3

Browse files
committed
some small fixes to offset command.
1 parent f3f3db3 commit 85d32c3

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

offset.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"regexp"
1111
"strconv"
1212
"strings"
13-
"time"
1413

1514
"github.com/Shopify/sarama"
1615
)
@@ -147,11 +146,11 @@ func (cmd *offsetCmd) connect() {
147146
if cmd.broker, err = cmd.client.Coordinator(cmd.group); err != nil {
148147
failf("failed to create broker err=%v", err)
149148
}
150-
defer logClose("broker", cmd.broker)
151149
}
152150
}
153151

154152
func (cmd *offsetCmd) run(as []string, q chan struct{}) {
153+
cmd.parseArgs(as)
155154
if cmd.verbose {
156155
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
157156
}
@@ -260,7 +259,7 @@ func (cmd *offsetCmd) join(top string) (string, int32) {
260259

261260
joinGroupReq := &sarama.JoinGroupRequest{
262261
GroupId: cmd.group,
263-
SessionTimeout: int32((30 * time.Second) / time.Millisecond),
262+
SessionTimeout: 30 * 1000,
264263
ProtocolType: "consumer",
265264
}
266265

@@ -277,8 +276,12 @@ func (cmd *offsetCmd) join(top string) (string, int32) {
277276
failf("failed to add meta data err=%v", err)
278277
}
279278

280-
if res, err := cmd.broker.JoinGroup(joinGroupReq); err != nil || res.Err != sarama.ErrNoError {
281-
failf("failed to join consumer group err=%v responseErr=%v", err, res.Err)
279+
if res, err = cmd.broker.JoinGroup(joinGroupReq); err != nil {
280+
failf("failed to join consumer group err=%v", err)
281+
}
282+
283+
if res.Err != sarama.ErrNoError {
284+
failf("failed to join consumer group responseErr=%v", res.Err)
282285
}
283286

284287
return res.MemberId, res.GenerationId
@@ -311,10 +314,10 @@ func (cmd *offsetCmd) commit(top string, prt int32, offset int64, generationID i
311314
failf("failed to commit offsets err=%v", err)
312315
}
313316

314-
for topic, perrs := range ocr.Errors {
315-
for partition, kerr := range perrs {
317+
for top, perrs := range ocr.Errors {
318+
for prt, kerr := range perrs {
316319
if kerr != sarama.ErrNoError {
317-
failf("failed to commit offsets topic=%s, partition=%s. err=%v", topic, partition, err)
320+
failf("failed to commit offsets topic=%s, partition=%v err=%v kerr=%v", top, prt, err, kerr)
318321
}
319322
}
320323
}

0 commit comments

Comments
 (0)