Skip to content

Commit 8cf62a3

Browse files
add retry on failed ack for consume endpoint
1 parent 7334e41 commit 8cf62a3

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

handlers/consumer.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
100100
memphis.FetchBatchSize(reqBody.BatchSize),
101101
memphis.FetchConsumerGroup(reqBody.ConsumerGroup),
102102
memphis.FetchBatchMaxWaitTime(time.Duration(reqBody.BatchMaxWaitTimeMs)*time.Millisecond),
103-
memphis.FetchMaxMsgDeliveries(3)) // for cases of broker crash before sending the messages to the client
103+
memphis.FetchMaxMsgDeliveries(1)) // for cases of broker crash before sending the messages to the client
104104

105105
if err != nil && !strings.Contains(err.Error(), "fetch timed out") {
106106
log.Errorf("ConsumeHandleMessage - fetch messages: %s", err.Error())
@@ -120,7 +120,12 @@ func ConsumeHandleMessage() func(*fiber.Ctx) error {
120120
for _, msg := range msgs {
121121
err := msg.Ack()
122122
if err != nil {
123-
log.Errorf("ConsumeHandleMessage - acknowledge message: %s", err)
123+
time.AfterFunc(5*time.Second, func() { // retry after 5 seconds for cases of broker crash
124+
err := msg.Ack()
125+
if err != nil {
126+
log.Errorf("ConsumeHandleMessage - acknowledge message: %s", err)
127+
}
128+
})
124129
}
125130
messages = append(messages, message{
126131
Message: string(msg.Data()),

0 commit comments

Comments
 (0)