Skip to content

Commit 91f05cb

Browse files
authored
Merge pull request #111 from scribd/laynax/SERF-1677/rec-mw-kafka
[SERF1677] Add sentry recovery to Kafka consumer
2 parents 26b27df + 208a634 commit 91f05cb

File tree

1 file changed

+10
-0
lines changed

1 file changed

+10
-0
lines changed

pkg/pubsub/kafka/partitionconsumer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package kafka
22

33
import (
44
"context"
5+
"time"
56

7+
"github.com/getsentry/sentry-go"
68
"github.com/twmb/franz-go/pkg/kgo"
79

810
sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
@@ -18,6 +20,14 @@ type pconsumer struct {
1820
}
1921

2022
func (pc *pconsumer) consume(cl *kgo.Client, logger sdklogger.Logger, shouldCommit bool, handler func(*kgo.Record)) {
23+
defer func() {
24+
if rec := recover(); rec != nil {
25+
sentry.CurrentHub().Recover(rec)
26+
sentry.Flush(time.Second * 5)
27+
logger.Fatalf("kafka consumer: panic error: %v", rec)
28+
}
29+
}()
30+
2131
defer close(pc.done)
2232

2333
for {

0 commit comments

Comments
 (0)