Skip to content

Commit f3f3db3

Browse files
committed
fixes timeout handling when consuming.
1 parent d1fb3c7 commit f3f3db3

File tree

1 file changed

+10
-9
lines changed

1 file changed

+10
-9
lines changed

consume.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type consumeCmd struct {
2828
client sarama.Client
2929
consumer sarama.Consumer
3030

31-
closer chan struct{}
31+
q chan struct{}
3232
}
3333

3434
type offset struct {
@@ -183,7 +183,7 @@ func (cmd *consumeCmd) failStartup(msg string) {
183183
func (cmd *consumeCmd) parseArgs(as []string) {
184184
var (
185185
err error
186-
args = cmd.read(as)
186+
args = cmd.parseFlags(as)
187187
)
188188

189189
envTopic := os.Getenv("KT_TOPIC")
@@ -194,6 +194,7 @@ func (cmd *consumeCmd) parseArgs(as []string) {
194194
args.topic = envTopic
195195
}
196196
cmd.topic = args.topic
197+
cmd.timeout = args.timeout
197198
cmd.verbose = args.verbose
198199
cmd.version = kafkaVersion(args.version)
199200

@@ -218,7 +219,7 @@ func (cmd *consumeCmd) parseArgs(as []string) {
218219
}
219220
}
220221

221-
func (cmd *consumeCmd) read(as []string) consumeArgs {
222+
func (cmd *consumeCmd) parseFlags(as []string) consumeArgs {
222223
var args consumeArgs
223224
flags := flag.NewFlagSet("consume", flag.ExitOnError)
224225
flags.StringVar(&args.topic, "topic", "", "Topic to consume (required).")
@@ -259,11 +260,11 @@ func (cmd *consumeCmd) setupClient() {
259260
}
260261
}
261262

262-
func (cmd *consumeCmd) run(args []string, closer chan struct{}) {
263+
func (cmd *consumeCmd) run(args []string, q chan struct{}) {
263264
var err error
264265

265266
cmd.parseArgs(args)
266-
cmd.closer = closer
267+
cmd.q = q
267268

268269
if cmd.verbose {
269270
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
@@ -368,10 +369,10 @@ func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionC
368369

369370
select {
370371
case <-timeout:
371-
fmt.Fprintf(os.Stderr, "consuming from partition %v timed out after %s.", p, cmd.timeout)
372+
fmt.Fprintf(os.Stderr, "consuming from partition %v timed out after %s\n", p, cmd.timeout)
372373
return
373-
case <-cmd.closer:
374-
fmt.Fprintf(os.Stderr, "shuttin down partition consumer for partition %v\n", p)
374+
case <-cmd.q:
375+
fmt.Fprintf(os.Stderr, "shutting down partition consumer for partition %v\n", p)
375376
return
376377
case msg, ok := <-pc.Messages():
377378
var (
@@ -389,7 +390,7 @@ func (cmd *consumeCmd) partitionLoop(out chan printContext, pc sarama.PartitionC
389390
m := consumedMessage{msg.Partition, msg.Offset, &k, &v}
390391
if buf, err = json.Marshal(m); err != nil {
391392
fmt.Fprintf(os.Stderr, "Quitting due to unexpected error during marshal: %v\n", err)
392-
close(cmd.closer)
393+
close(cmd.q)
393394
return
394395
}
395396

0 commit comments

Comments
 (0)