Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,33 +205,37 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon

err := pc.grabConn()
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
pc.nackTracker.Close()
return nil, err
}
pc.log.Info("Created consumer")
pc.setConsumerState(consumerReady)

if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
pc.nackTracker.Close()
errMsg := err.Error()
if !strings.Contains(errMsg, errConnectError) && !strings.Contains(errMsg, errLookupError) {
// when topic is deleted, we should give up reconnection.
pc.log.WithError(err).Error("Failed to create consumer")
return nil, err
}
if msgID.entryID != noMessageEntry {
pc.startMessageID = msgID
pc.log.WithError(err).Error("Failed to create consumer, it will be retried later!")
pc.nackTracker.Close()
} else {
pc.log.Info("Created consumer")
pc.setConsumerState(consumerReady)

// use the WithoutClear version because the dispatcher is not started yet
err = pc.requestSeekWithoutClear(msgID.messageID)
if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
pc.nackTracker.Close()
return nil, err
}
if msgID.entryID != noMessageEntry {
pc.startMessageID = msgID

// use the WithoutClear version because the dispatcher is not started yet
err = pc.requestSeekWithoutClear(msgID.messageID)
if err != nil {
pc.nackTracker.Close()
return nil, err
}
}
}
go pc.dispatcher()
}

go pc.dispatcher()

go pc.runEventsLoop()

return pc, nil
Expand Down Expand Up @@ -1004,7 +1008,6 @@ func (pc *partitionConsumer) reconnectToBroker() {
if strings.Contains(errMsg, errTopicNotFount) {
// when topic is deleted, we should give up reconnection.
pc.log.Warn("Topic Not Found.")
break
}

if maxRetry > 0 {
Expand All @@ -1016,7 +1019,8 @@ func (pc *partitionConsumer) reconnectToBroker() {
func (pc *partitionConsumer) grabConn() error {
lr, err := pc.client.lookupService.Lookup(pc.topic)
if err != nil {
pc.log.WithError(err).Warn("Failed to lookup topic")
pc.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!")
pc.connectClosedCh <- connectionClosed{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the Lookup request fails, it is inconsistent with the connectionClosed error. At this time, we don’t need to write back the semaphore to the closed channel.

return err
}
pc.log.Debugf("Lookup result: %+v", lr)
Expand Down Expand Up @@ -1079,7 +1083,8 @@ func (pc *partitionConsumer) grabConn() error {
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)

if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
pc.log.WithError(err).Error("Failed to create consumer, it may be retried later when connection error!")
pc.connectClosedCh <- connectionClosed{}
return err
}

Expand Down
44 changes: 27 additions & 17 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -61,6 +62,10 @@ var (

var errTopicNotFount = "TopicNotFound"

var errConnectError = "connection error"

var errLookupError = "lookup error"

type partitionProducer struct {
state ua.Int32
client *client
Expand Down Expand Up @@ -160,22 +165,26 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions

err := p.grabCnx()
if err != nil {
logger.WithError(err).Error("Failed to create producer")
return nil, err
}

p.log = p.log.SubLogger(log.Fields{
"producer_name": p.producerName,
"producerID": p.producerID,
})
errMsg := err.Error()
if !strings.Contains(errMsg, errConnectError) && !strings.Contains(errMsg, errLookupError) {
// when topic is deleted, we should give up reconnection.
logger.WithError(err).Error("Failed to create producer")
return nil, err
}
logger.WithError(err).Error("Failed to create producer, it will be retried later!")
} else {
p.log = p.log.SubLogger(log.Fields{
"producer_name": p.producerName,
"producerID": p.producerID,
})

p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.setProducerState(producerReady)
p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.setProducerState(producerReady)

if p.options.SendTimeout > 0 {
go p.failTimeoutMessages()
if p.options.SendTimeout > 0 {
go p.failTimeoutMessages()
}
}

go p.runEventsLoop()

return p, nil
Expand All @@ -184,8 +193,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
func (p *partitionProducer) grabCnx() error {
lr, err := p.client.lookupService.Lookup(p.topic)
if err != nil {
p.log.WithError(err).Warn("Failed to lookup topic")
return err
p.log.WithError(err).Warn("Failed to lookup topic, it will be retried later!")
p.connectClosedCh <- connectionClosed{}
return errors.New(errLookupError)
}

p.log.Debug("Lookup result: ", lr)
Expand Down Expand Up @@ -227,7 +237,8 @@ func (p *partitionProducer) grabCnx() error {
}
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p.log.WithError(err).Error("Failed to create producer")
p.log.WithError(err).Error("Failed to create producer, it may be retried later when connection error!")
p.connectClosedCh <- connectionClosed{}
return err
}

Expand Down Expand Up @@ -359,7 +370,6 @@ func (p *partitionProducer) reconnectToBroker() {
if strings.Contains(errMsg, errTopicNotFount) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic Not Found.")
break
}

if maxRetry > 0 {
Expand Down
6 changes: 3 additions & 3 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ func TestReaderConnectError(t *testing.T) {
})

// Expect error in creating consumer
assert.Nil(t, reader)
assert.NotNil(t, err)
assert.NotNil(t, reader)
assert.Nil(t, err)

assert.Equal(t, err.Error(), "connection error")
//assert.Equal(t, err.Error(), "connection error")
}

func TestReaderOnSpecificMessage(t *testing.T) {
Expand Down