From 5548abb4b35edf1c0acf539387d4005d06f16377 Mon Sep 17 00:00:00 2001 From: idehong Date: Tue, 13 May 2025 11:00:23 +0800 Subject: [PATCH 1/2] fixbug: don't repeated call cb when producer is closed --- pulsar/producer_partition.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a4c2e3f8c4..198ee0978e 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1475,6 +1475,13 @@ func (p *partitionProducer) failPendingMessages(err error) { if viewSize <= 0 { return } + + //fixbug: don't repeated call cb when producer is closed + state := p.getProducerState() + if state == producerClosing || state == producerClosed { + return + } + p.log.Infof("Failing %d messages on closing producer", viewSize) lastViewItem := curViewItems[viewSize-1].(*pendingItem) From 5b3674479bfaed4eacca95cdb44c650c09280da7 Mon Sep 17 00:00:00 2001 From: idehong Date: Tue, 13 May 2025 11:04:07 +0800 Subject: [PATCH 2/2] fixbug: don't repeated call cb when producer is closed --- pulsar/producer_partition.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 198ee0978e..0b7dcaf0c5 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1476,7 +1476,8 @@ func (p *partitionProducer) failPendingMessages(err error) { return } - //fixbug: don't repeated call cb when producer is closed + // fixbug: don't repeated call cb when producer is closed + // the cb funciton is calling by pi.sendRequests.done state := p.getProducerState() if state == producerClosing || state == producerClosed { return