Skip to content

Commit aac6b57

Browse files
committed
retry producer creation upon error
1 parent 9347690 commit aac6b57

File tree

1 file changed

+43
-1
lines changed

1 file changed

+43
-1
lines changed

pulsar/producer_partition.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,49 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
197197
} else {
198198
p.userProvidedProducerName = false
199199
}
200-
err := p.grabCnx()
200+
// retry to create producer when failed with maxRetry
201+
var maxRetry int
202+
if p.options.MaxReconnectToBroker == nil {
203+
maxRetry = -1
204+
} else {
205+
maxRetry = int(*p.options.MaxReconnectToBroker)
206+
}
207+
208+
var delayReconnectTime time.Duration
209+
defaultBackoff := internal.DefaultBackoff{}
210+
211+
var err error
212+
for maxRetry != 0 {
213+
if p.options.BackoffPolicy == nil {
214+
delayReconnectTime = defaultBackoff.Next()
215+
} else {
216+
delayReconnectTime = p.options.BackoffPolicy.Next()
217+
}
218+
219+
atomic.AddUint64(&p.epoch, 1)
220+
err = p.grabCnx()
221+
if err == nil {
222+
break
223+
}
224+
p.log.WithError(err).Error("Failed to create producer at newPartitionProducer")
225+
errMsg := err.Error()
226+
if strings.Contains(errMsg, errTopicNotFount) {
227+
// when topic is not found, do not attempt to reconnect
228+
p.log.Warn("Failed to create producer due to Topic Not Found")
229+
break
230+
}
231+
232+
if strings.Contains(errMsg, "TopicTerminatedError") {
233+
p.log.Info("Topic was terminated, failing pending messages, will not create producer")
234+
break
235+
}
236+
237+
if maxRetry > 0 {
238+
maxRetry--
239+
}
240+
logger.WithError(err).Error("Failed to create producer at newPartitionProducer retry to create producer", delayReconnectTime)
241+
time.Sleep(delayReconnectTime)
242+
}
201243
if err != nil {
202244
p.batchFlushTicker.Stop()
203245
logger.WithError(err).Error("Failed to create producer at newPartitionProducer")

0 commit comments

Comments
 (0)