Skip to content
This repository was archived by the owner on Jun 4, 2021. It is now read-only.

Commit 1c66947

Browse files
don't use session context to send messages (#179) (#1664)
1 parent 2c7d0d0 commit 1c66947

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

kafka/common/pkg/kafka/consumer_handler.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
)
2727

2828
type KafkaConsumerHandler interface {
29-
// When this function returns true, the consumer group offset is committed.
29+
// When this function returns true, the consumer group offset is marked as consumed.
3030
// The returned error is enqueued in errors channel.
3131
Handle(context context.Context, message *sarama.ConsumerMessage) (bool, error)
3232
}
@@ -77,7 +77,9 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup
7777
consumer.logger.Debugw("Message claimed", zap.String("topic", message.Topic), zap.Binary("value", message.Value))
7878
}
7979

80-
mustMark, err := consumer.handler.Handle(session.Context(), message)
80+
// Don't use the session context since it is closed before messages are drained.
81+
// Handle must finish before the session timeout.
82+
mustMark, err := consumer.handler.Handle(context.Background(), message)
8183

8284
if err != nil {
8385
consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err))

0 commit comments

Comments
 (0)