diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b79053e38..f139ff17a1 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -197,7 +197,49 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } else { p.userProvidedProducerName = false } - err := p.grabCnx() + // retry to create producer when failed with maxRetry + var maxRetry int + if p.options.MaxReconnectToBroker == nil { + maxRetry = -1 + } else { + maxRetry = int(*p.options.MaxReconnectToBroker) + } + + var delayReconnectTime time.Duration + defaultBackoff := internal.DefaultBackoff{} + + var err error + for maxRetry != 0 { + if p.options.BackoffPolicy == nil { + delayReconnectTime = defaultBackoff.Next() + } else { + delayReconnectTime = p.options.BackoffPolicy.Next() + } + + atomic.AddUint64(&p.epoch, 1) + err = p.grabCnx() + if err == nil { + break + } + p.log.WithError(err).Error("Failed to create producer at newPartitionProducer") + if errors.Is(err, ErrTopicNotfound) { + // when topic is not found, do not attempt to reconnect + p.log.Warn("Failed to create producer due to Topic Not Found") + break + } + + if errors.Is(err, ErrTopicTerminated) { + p.log.Info("Topic was terminated, failing pending messages, will not create producer") + break + } + + if maxRetry > 0 { + maxRetry-- + } + logger.WithError(err). + Error("Failed to create producer at newPartitionProducer retry to create producer") + time.Sleep(delayReconnectTime) + } if err != nil { p.batchFlushTicker.Stop() logger.WithError(err).Error("Failed to create producer at newPartitionProducer")