diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a4c2e3f8c..0b7dcaf0c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1475,6 +1475,14 @@ func (p *partitionProducer) failPendingMessages(err error) { if viewSize <= 0 { return } + + // fixbug: don't repeated call cb when producer is closed + // the cb funciton is calling by pi.sendRequests.done + state := p.getProducerState() + if state == producerClosing || state == producerClosed { + return + } + p.log.Infof("Failing %d messages on closing producer", viewSize) lastViewItem := curViewItems[viewSize-1].(*pendingItem)