Skip to content

Commit ef0ba67

Browse files
authored
fix: double check before producer reconnect (#1131)
Co-authored-by: gunli <[email protected]>
1 parent c91a800 commit ef0ba67

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

pulsar/producer_partition.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,14 @@ func (p *partitionProducer) reconnectToBroker() {
424424
}
425425
p.log.Info("Reconnecting to broker in ", delayReconnectTime)
426426
time.Sleep(delayReconnectTime)
427+
428+
// double check
429+
if p.getProducerState() != producerReady {
430+
// Producer is already closing
431+
p.log.Info("producer state not ready, exit reconnect")
432+
return
433+
}
434+
427435
atomic.AddUint64(&p.epoch, 1)
428436
err := p.grabCnx()
429437
if err == nil {

0 commit comments

Comments
 (0)