Skip to content
This repository was archived by the owner on Jun 4, 2021. It is now read-only.

Commit 2d44c08

Browse files
fix custom consumer group error channel lifecycle (#263) (#1676)
* fix custom consumer group error channel lifecycle * fix import * remove unused var
1 parent eaa5916 commit 2d44c08

File tree

4 files changed

+18
-14
lines changed

4 files changed

+18
-14
lines changed

kafka/common/pkg/kafka/consumer_factory.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func (c *customConsumerGroup) Errors() <-chan error {
4848

4949
func (c *customConsumerGroup) Close() error {
5050
c.cancel()
51+
close(c.handlerErrorChannel)
5152
return c.ConsumerGroup.Close()
5253
}
5354

@@ -59,18 +60,20 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics
5960
return nil, err
6061
}
6162

62-
consumerHandler := NewConsumerHandler(logger, handler)
63-
63+
errorCh := make(chan error, 10)
6464
ctx, cancel := context.WithCancel(context.Background())
6565

6666
go func() {
6767
for {
68+
consumerHandler := NewConsumerHandler(logger, handler, errorCh)
69+
6870
err := consumerGroup.Consume(context.Background(), topics, &consumerHandler)
71+
6972
if err == sarama.ErrClosedConsumerGroup {
7073
return
7174
}
7275
if err != nil {
73-
consumerHandler.errors <- err
76+
errorCh <- err
7477
}
7578

7679
select {
@@ -81,7 +84,7 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics
8184
}
8285
}()
8386

84-
return &customConsumerGroup{cancel, consumerHandler.errors, consumerGroup}, err
87+
return &customConsumerGroup{cancel, errorCh, consumerGroup}, err
8588
}
8689

8790
func NewConsumerGroupFactory(addrs []string, config *sarama.Config) KafkaConsumerGroupFactory {

kafka/common/pkg/kafka/consumer_factory_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (m *mockConsumerGroup) Consume(ctx context.Context, topics []string, handle
4242
m.generateErrorOnce.Do(func() {
4343
h := handler.(*SaramaConsumerHandler)
4444
h.errors <- errors.New("cgh")
45-
_ = h.Cleanup(nil)
45+
close(h.errors)
4646
})
4747
}()
4848
}

kafka/common/pkg/kafka/consumer_handler.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka
1919
import (
2020
"context"
2121
"fmt"
22-
"sync"
2322

2423
"github.com/Shopify/sarama"
2524
"go.uber.org/zap"
@@ -38,29 +37,28 @@ type SaramaConsumerHandler struct {
3837
handler KafkaConsumerHandler
3938

4039
logger *zap.SugaredLogger
40+
4141
// Errors channel
42-
closeErrors sync.Once
43-
errors chan error
42+
errors chan error
4443
}
4544

46-
func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler) SaramaConsumerHandler {
45+
func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler, errorsCh chan error) SaramaConsumerHandler {
4746
return SaramaConsumerHandler{
4847
logger: logger,
4948
handler: handler,
50-
errors: make(chan error, 10), // Some buffering to avoid blocking the message processing
49+
errors: errorsCh,
5150
}
5251
}
5352

5453
// Setup is run at the beginning of a new session, before ConsumeClaim
5554
func (consumer *SaramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
55+
consumer.logger.Info("setting up handler")
5656
return nil
5757
}
5858

5959
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
6060
func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
61-
consumer.closeErrors.Do(func() {
62-
close(consumer.errors)
63-
})
61+
consumer.logger.Info("cleanup handler")
6462
return nil
6563
}
6664

@@ -85,6 +83,7 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup
8583
consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err))
8684
consumer.errors <- err
8785
}
86+
8887
if mustMark {
8988
session.MarkMessage(message, "") // Mark kafka message as processed
9089
if ce := consumer.logger.Desugar().Check(zap.DebugLevel, "debugging"); ce != nil {

kafka/common/pkg/kafka/consumer_handler_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ func Test(t *testing.T) {
133133
}
134134
for _, test := range tests {
135135
t.Run(fmt.Sprintf("shouldErr: %v, shouldMark: %v", test.shouldErr, test.shouldMark), func(t *testing.T) {
136-
cgh := NewConsumerHandler(zap.NewNop().Sugar(), test)
136+
errorCh := make(chan error, 1)
137+
cgh := NewConsumerHandler(zap.NewNop().Sugar(), test, errorCh)
137138

138139
session := mockConsumerGroupSession{}
139140
claim := mockConsumerGroupClaim{msg: &mockMessage}
@@ -155,6 +156,7 @@ func Test(t *testing.T) {
155156
}
156157

157158
_ = cgh.Cleanup(&session)
159+
close(errorCh)
158160

159161
})
160162
}

0 commit comments

Comments
 (0)