@@ -33,29 +33,6 @@ type consumer struct {
3333 mutex sync.Mutex
3434}
3535
36- func notifyRecover (consumer * consumer , message * sarama.ConsumerMessage , session sarama.ConsumerGroupSession , b backoff.BackOff ) error {
37- for {
38- if err := retry .NotifyRecover (func () error {
39- return consumer .doCallback (session , message )
40- }, b , func (err error , d time.Duration ) {
41- consumer .k .logger .Warnf ("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying..." , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ), err )
42- }, func () {
43- consumer .k .logger .Infof ("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]" , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ))
44- }); err != nil {
45- // If the retry policy got interrupted, it could mean that either
46- // the policy has reached its maximum number of attempts or the context has been cancelled.
47- // There is a weird edge case where the error returned is a 'context canceled' error but the session.Context is not done.
48- // This is a workaround to handle that edge case and reprocess the current message.
49- if err == context .Canceled && session .Context ().Err () == nil {
50- consumer .k .logger .Warnf ("Error processing Kafka message: %s/%d/%d [key=%s]. The error returned is 'context canceled' but the session context is not done. Retrying..." )
51- continue
52- }
53- return err
54- }
55- return nil
56- }
57- }
58-
5936func (consumer * consumer ) ConsumeClaim (session sarama.ConsumerGroupSession , claim sarama.ConsumerGroupClaim ) error {
6037 b := consumer .k .backOffConfig .NewBackOffWithContext (session .Context ())
6138 isBulkSubscribe := consumer .k .checkBulkSubscribe (claim .Topic ())
@@ -107,8 +84,21 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
10784 }
10885
10986 if consumer .k .consumeRetryEnabled {
110- if err := notifyRecover (consumer , message , session , b ); err != nil {
87+ if err := retry .NotifyRecover (func () error {
88+ return consumer .doCallback (session , message )
89+ }, b , func (err error , d time.Duration ) {
90+ consumer .k .logger .Warnf ("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying..." , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ), err )
91+ }, func () {
92+ consumer .k .logger .Infof ("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]" , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ))
93+ }); err != nil {
11194 consumer .k .logger .Errorf ("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v." , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ), err )
95+ if errors .Is (session .Context ().Err (), context .Canceled ) {
96+ // If the context is canceled, we should not attempt to consume any more messages. Exiting the loop.
97+ // Otherwise, there is a race condition when this loop keeps processing messages from the claim.Messages() channel
98+ // before the session.Context().Done() is closed. If there are other messages that can successfully be processed,
99+ // they will be marked as processed and this failing message will be lost.
100+ return nil
101+ }
112102 }
113103 } else {
114104 err := consumer .doCallback (session , message )
0 commit comments