Skip to content
Open
9 changes: 9 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type partitionProducer struct {
topic string
log log.Logger
cnx internal.Connection
err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hello @cckellogg , It seems that some of our Error messages are not fully exposed, such as sending failures after reaching the Backlog threshold. It seems that @yorkhellen is encapsulating their business based on the Go SDK, and may need to process specific business logic based on these specific errors. It seems that it is OK to expose the error field to the user here. This way the business can be more flexible in error handling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide some examples on the use case and or what the business logic will do with these errors? I'm just trying to understand better. Also, I think this should not be part of the 0.7.0 release so we can think about it more. If we are going to bubble some more errors up we should come up with a good api.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will provide a code example later to illustrate this matter.

Yes, we always include this change in 0.7.0


options *ProducerOptions
producerName string
Expand Down Expand Up @@ -350,6 +351,10 @@ func (p *partitionProducer) reconnectToBroker() {
time.Sleep(d)
atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx()
// In reconnection logic, grabCnx maybe return err, but we did not return the error.
// So in partitionProducer struct, we define an err object to make it easier for users to
// determine what caused the grabCnx error.
p.err = err
if err == nil {
// Successfully reconnected
p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
Expand Down Expand Up @@ -742,6 +747,10 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
callback(nil, msg, errProducerClosed)
return
}
if p.err != nil {
Copy link
Contributor

@cckellogg cckellogg Nov 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now changes the current behavior. Before if there was room the message would be added to the pending queue and sent when a new connection is established. Now the message will not be added. I'm not sure changing the default behavior is the correct approach here. Also, there could be a race condition checking the error and sending it since the the error is set in another go routine?

How do you envision your application handling this error?

callback(nil, msg, p.err)
return
}

sr := &sendRequest{
ctx: ctx,
Expand Down