We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 1c39b66 commit 0dce2a5Copy full SHA for 0dce2a5
pulsar/consumer_partition.go
@@ -1138,6 +1138,15 @@ func (pc *partitionConsumer) internalAckList(request *ackListRequest) {
1138
}
1139
1140
func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
1141
+ defer func() {
1142
+ if r := recover(); r != nil {
1143
+ if err, ok := r.(error); ok && err.Error() == "send on closed channel" {
1144
+ pc.log.WithField("panic", r).Debug("panic recovered in MessageReceived")
1145
+ return
1146
+ }
1147
+ panic(r)
1148
1149
+ }()
1150
pbMsgID := response.GetMessageId()
1151
1152
reader := internal.NewMessageReader(headersAndPayload)
0 commit comments