Skip to content

Commit 0d3f398

Browse files
authored
fix: solve kafka incorrect when close channel. (#232)
* feat: implement circuitbreaker and ratelimit. * fix: solve kafka incorrect when close channel.
1 parent 48f7886 commit 0d3f398

File tree

1 file changed

+23
-23
lines changed

1 file changed

+23
-23
lines changed

mq/kafka/mq_consumer_group.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func NewMConsumerGroupV2(ctx context.Context, conf *Config, groupID string, topi
3030
msg: make(chan *consumerMessage, 64),
3131
}
3232
mcg.ctx, mcg.cancel = context.WithCancel(ctx)
33-
mcg.loopConsume()
33+
go mcg.loopConsume()
3434
return mcg, nil
3535
}
3636

@@ -56,38 +56,33 @@ func (*mqConsumerGroup) Cleanup(sarama.ConsumerGroupSession) error {
5656
}
5757

5858
func (x *mqConsumerGroup) closeMsgChan() {
59-
select {
60-
case <-x.ctx.Done():
61-
x.once.Do(func() {
62-
close(x.msg)
63-
})
64-
default:
65-
}
59+
x.once.Do(func() {
60+
x.cancel()
61+
close(x.msg)
62+
})
6663
}
6764

6865
func (x *mqConsumerGroup) loopConsume() {
69-
go func() {
70-
defer x.closeMsgChan()
71-
ctx := mcontext.SetOperationID(x.ctx, fmt.Sprintf("consumer_group_%s_%s_%d", strings.Join(x.topics, "_"), x.groupID, rand.Uint32()))
72-
for {
73-
if err := x.consumer.Consume(x.ctx, x.topics, x); err != nil {
74-
switch {
75-
case errors.Is(err, context.Canceled):
76-
return
77-
case errors.Is(err, sarama.ErrClosedConsumerGroup):
78-
return
79-
}
80-
log.ZWarn(ctx, "consume err", err, "topic", x.topics, "groupID", x.groupID)
66+
defer x.closeMsgChan()
67+
ctx := mcontext.SetOperationID(x.ctx, fmt.Sprintf("consumer_group_%s_%s_%d", strings.Join(x.topics, "_"), x.groupID, rand.Uint32()))
68+
for {
69+
if err := x.consumer.Consume(x.ctx, x.topics, x); err != nil {
70+
switch {
71+
case errors.Is(err, context.Canceled):
72+
return
73+
case errors.Is(err, sarama.ErrClosedConsumerGroup):
74+
return
8175
}
76+
log.ZWarn(ctx, "consume err", err, "topic", x.topics, "groupID", x.groupID)
8277
}
83-
}()
78+
}
8479
}
8580

8681
func (x *mqConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
8782
defer func() {
8883
_ = recover()
89-
x.closeMsgChan()
9084
}()
85+
9186
msg := claim.Messages()
9287
for {
9388
select {
@@ -97,7 +92,12 @@ func (x *mqConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, clai
9792
if !ok {
9893
return nil
9994
}
100-
x.msg <- &consumerMessage{Msg: val, Session: session}
95+
96+
select {
97+
case <-x.ctx.Done():
98+
return context.Canceled
99+
case x.msg <- &consumerMessage{Msg: val, Session: session}:
100+
}
101101
}
102102
}
103103
}

0 commit comments

Comments
 (0)