Skip to content

Commit c0a21a0

Browse files
authored
Fix Kafka consumer (dapr#3297)
Signed-off-by: yaron2 <[email protected]>
1 parent 56579c6 commit c0a21a0

File tree

1 file changed

+78
-53
lines changed

1 file changed

+78
-53
lines changed

common/component/kafka/consumer.go

Lines changed: 78 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@ import (
2929
)
3030

3131
type consumer struct {
32-
k *Kafka
33-
ready chan bool
34-
running chan struct{}
35-
stopped atomic.Bool
36-
once sync.Once
37-
mutex sync.Mutex
32+
k *Kafka
33+
ready chan bool
34+
running chan struct{}
35+
stopped atomic.Bool
36+
once sync.Once
37+
mutex sync.Mutex
38+
skipConsume bool
39+
consumeCtx context.Context
40+
consumeCancel context.CancelFunc
3841
}
3942

4043
func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
@@ -261,71 +264,93 @@ func (k *Kafka) Subscribe(ctx context.Context) error {
261264
k.subscribeLock.Lock()
262265
defer k.subscribeLock.Unlock()
263266

264-
// Close resources and reset synchronization primitives
265-
k.closeSubscriptionResources()
266-
267267
topics := k.subscribeTopics.TopicList()
268268
if len(topics) == 0 {
269269
// Nothing to subscribe to
270270
return nil
271271
}
272+
k.consumer.skipConsume = true
272273

273-
cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
274-
if err != nil {
275-
return err
276-
}
274+
ctxCreateFn := func() {
275+
consumeCtx, cancel := context.WithCancel(context.Background())
277276

278-
k.cg = cg
277+
k.consumer.consumeCtx = consumeCtx
278+
k.consumer.consumeCancel = cancel
279279

280-
ready := make(chan bool)
281-
k.consumer = consumer{
282-
k: k,
283-
ready: ready,
284-
running: make(chan struct{}),
280+
k.consumer.skipConsume = false
285281
}
286282

287-
go func() {
288-
k.logger.Debugf("Subscribed and listening to topics: %s", topics)
283+
if k.cg == nil {
284+
cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config)
285+
if err != nil {
286+
return err
287+
}
289288

290-
for {
291-
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
292-
// us out of the consume loop
293-
if ctx.Err() != nil {
294-
break
295-
}
289+
k.cg = cg
290+
291+
ready := make(chan bool)
292+
k.consumer = consumer{
293+
k: k,
294+
ready: ready,
295+
running: make(chan struct{}),
296+
}
296297

297-
k.logger.Debugf("Starting loop to consume.")
298+
ctxCreateFn()
298299

299-
// Consume the requested topics
300-
bo := backoff.WithContext(backoff.NewConstantBackOff(k.consumeRetryInterval), ctx)
301-
innerErr := retry.NotifyRecover(func() error {
302-
if ctxErr := ctx.Err(); ctxErr != nil {
303-
return backoff.Permanent(ctxErr)
300+
go func() {
301+
k.logger.Debugf("Subscribed and listening to topics: %s", topics)
302+
303+
for {
304+
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
305+
// us out of the consume loop
306+
if ctx.Err() != nil {
307+
k.logger.Info("Consume context cancelled")
308+
break
309+
}
310+
311+
k.logger.Debugf("Starting loop to consume.")
312+
313+
if k.consumer.skipConsume {
314+
continue
315+
}
316+
317+
topics = k.subscribeTopics.TopicList()
318+
319+
// Consume the requested topics
320+
bo := backoff.WithContext(backoff.NewConstantBackOff(k.consumeRetryInterval), ctx)
321+
innerErr := retry.NotifyRecover(func() error {
322+
if ctxErr := ctx.Err(); ctxErr != nil {
323+
return backoff.Permanent(ctxErr)
324+
}
325+
return k.cg.Consume(k.consumer.consumeCtx, topics, &(k.consumer))
326+
}, bo, func(err error, t time.Duration) {
327+
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
328+
}, func() {
329+
k.logger.Infof("Recovered consuming %v", topics)
330+
})
331+
if innerErr != nil && !errors.Is(innerErr, context.Canceled) {
332+
k.logger.Errorf("Permanent error consuming %v: %v", topics, innerErr)
304333
}
305-
return k.cg.Consume(ctx, topics, &(k.consumer))
306-
}, bo, func(err error, t time.Duration) {
307-
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
308-
}, func() {
309-
k.logger.Infof("Recovered consuming %v", topics)
310-
})
311-
if innerErr != nil && !errors.Is(innerErr, context.Canceled) {
312-
k.logger.Errorf("Permanent error consuming %v: %v", topics, innerErr)
313334
}
314-
}
315335

316-
k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
317-
err := k.cg.Close()
318-
if err != nil {
319-
k.logger.Errorf("Error closing consumer group: %v", err)
320-
}
336+
k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
337+
err := k.cg.Close()
338+
if err != nil {
339+
k.logger.Errorf("Error closing consumer group: %v", err)
340+
}
321341

322-
// Ensure running channel is only closed once.
323-
if k.consumer.stopped.CompareAndSwap(false, true) {
324-
close(k.consumer.running)
325-
}
326-
}()
342+
// Ensure running channel is only closed once.
343+
if k.consumer.stopped.CompareAndSwap(false, true) {
344+
close(k.consumer.running)
345+
}
346+
}()
327347

328-
<-ready
348+
<-ready
349+
} else {
350+
// The consumer group is already created and consuming topics. This means a new subscription is being added
351+
k.consumer.consumeCancel()
352+
ctxCreateFn()
353+
}
329354

330355
return nil
331356
}

0 commit comments

Comments
 (0)