Skip to content

Commit bd11581

Browse files
authored
[Fix][Producer] handle TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced when reconnecting (#1134)
Master Issue: #1128 ### Motivation In Java client, when we get TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced, we should failPendingMessages, and close producer. But in Go client, we forget to handle ProducerBlockedQuotaExceededException/ProducerFenced, and in #1128, we just call sr.done(), actually we should call failPendingMessages(). https://github.com/apache/pulsar-client-go/pull/1128/files https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1663 ### Modifications 1. rename `errMsgTopicNotFount` to `errMsgTopicNotFound` 2. handle TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced, call `failPendingMessages()`; --------- Co-authored-by: gunli <[email protected]>
1 parent 72aed95 commit bd11581

File tree

4 files changed

+60
-35
lines changed

4 files changed

+60
-35
lines changed

pulsar/consumer_partition.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1659,7 +1659,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
16591659
}
16601660
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
16611661
errMsg := err.Error()
1662-
if strings.Contains(errMsg, errTopicNotFount) {
1662+
if strings.Contains(errMsg, errMsgTopicNotFound) {
16631663
// when topic is deleted, we should give up reconnection.
16641664
pc.log.Warn("Topic Not Found.")
16651665
break

pulsar/error.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ const (
114114
TransactionNoFoundError
115115
// ClientMemoryBufferIsFull client limit buffer is full
116116
ClientMemoryBufferIsFull
117+
// ProducerFenced When a producer asks and fail to get exclusive producer access,
118+
// or loses the exclusive status after a reconnection, the broker will
119+
// use this error to indicate that this producer is now permanently
120+
// fenced. Applications are now supposed to close it and create a
121+
// new producer
122+
ProducerFenced
117123
)
118124

119125
// Error implement error interface, composed of two parts: msg and result.

pulsar/producer_partition.go

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ var (
6464
sendRequestPool *sync.Pool
6565
)
6666

67-
var errTopicNotFount = "TopicNotFound"
67+
const (
68+
errMsgTopicNotFound = "TopicNotFound"
69+
errMsgTopicTerminated = "TopicTerminatedError"
70+
errMsgProducerBlockedQuotaExceededException = "ProducerBlockedQuotaExceededException"
71+
errMsgProducerFenced = "ProducerFenced"
72+
)
6873

6974
func init() {
7075
sendRequestPool = &sync.Pool{
@@ -441,30 +446,28 @@ func (p *partitionProducer) reconnectToBroker() {
441446
}
442447
p.log.WithError(err).Error("Failed to create producer at reconnect")
443448
errMsg := err.Error()
444-
if strings.Contains(errMsg, errTopicNotFount) {
449+
if strings.Contains(errMsg, errMsgTopicNotFound) {
445450
// when topic is deleted, we should give up reconnection.
446-
p.log.Warn("Topic Not Found.")
451+
p.log.Warn("Topic not found, stop reconnecting, close the producer")
452+
p.doClose(newError(TopicNotFound, err.Error()))
447453
break
448454
}
449455

450-
if strings.Contains(errMsg, "TopicTerminatedError") {
451-
p.log.Info("Topic was terminated, failing pending messages, will not reconnect")
452-
pendingItems := p.pendingQueue.ReadableSlice()
453-
for _, item := range pendingItems {
454-
pi := item.(*pendingItem)
455-
if pi != nil {
456-
pi.Lock()
457-
requests := pi.sendRequests
458-
for _, req := range requests {
459-
sr := req.(*sendRequest)
460-
if sr != nil {
461-
sr.done(nil, newError(TopicTerminated, err.Error()))
462-
}
463-
}
464-
pi.Unlock()
465-
}
466-
}
467-
p.setProducerState(producerClosing)
456+
if strings.Contains(errMsg, errMsgTopicTerminated) {
457+
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
458+
p.doClose(newError(TopicTerminated, err.Error()))
459+
break
460+
}
461+
462+
if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
463+
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
464+
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, err.Error()))
465+
break
466+
}
467+
468+
if strings.Contains(errMsg, errMsgProducerFenced) {
469+
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
470+
p.doClose(newError(ProducerFenced, err.Error()))
468471
break
469472
}
470473

@@ -481,10 +484,18 @@ func (p *partitionProducer) reconnectToBroker() {
481484
func (p *partitionProducer) runEventsLoop() {
482485
for {
483486
select {
484-
case data := <-p.dataChan:
487+
case data, ok := <-p.dataChan:
488+
// when doClose() is call, p.dataChan will be closed, data will be nil
489+
if !ok {
490+
return
491+
}
485492
p.internalSend(data)
486-
case i := <-p.cmdChan:
487-
switch v := i.(type) {
493+
case cmd, ok := <-p.cmdChan:
494+
// when doClose() is call, p.dataChan will be closed, cmd will be nil
495+
if !ok {
496+
return
497+
}
498+
switch v := cmd.(type) {
488499
case *flushRequest:
489500
p.internalFlush(v)
490501
case *closeProducer:
@@ -1321,13 +1332,18 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
13211332

13221333
func (p *partitionProducer) internalClose(req *closeProducer) {
13231334
defer close(req.doneCh)
1335+
1336+
p.doClose(errProducerClosed)
1337+
}
1338+
1339+
func (p *partitionProducer) doClose(reason error) {
13241340
if !p.casProducerState(producerReady, producerClosing) {
13251341
return
13261342
}
13271343

1344+
p.log.Info("Closing producer")
13281345
defer close(p.dataChan)
13291346
defer close(p.cmdChan)
1330-
p.log.Info("Closing producer")
13311347

13321348
id := p.client.rpcClient.NewRequestID()
13331349
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
@@ -1340,7 +1356,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
13401356
} else {
13411357
p.log.Info("Closed producer")
13421358
}
1343-
p.failPendingMessages()
1359+
p.failPendingMessages(reason)
13441360

13451361
if p.batchBuilder != nil {
13461362
if err = p.batchBuilder.Close(); err != nil {
@@ -1353,7 +1369,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
13531369
p.batchFlushTicker.Stop()
13541370
}
13551371

1356-
func (p *partitionProducer) failPendingMessages() {
1372+
func (p *partitionProducer) failPendingMessages(err error) {
13571373
curViewItems := p.pendingQueue.ReadableSlice()
13581374
viewSize := len(curViewItems)
13591375
if viewSize <= 0 {
@@ -1378,11 +1394,11 @@ func (p *partitionProducer) failPendingMessages() {
13781394

13791395
for _, i := range pi.sendRequests {
13801396
sr := i.(*sendRequest)
1381-
sr.done(nil, errProducerClosed)
1397+
sr.done(nil, err)
13821398
}
13831399

13841400
// flag the sending has completed with error, flush make no effect
1385-
pi.done(errProducerClosed)
1401+
pi.done(err)
13861402
pi.Unlock()
13871403

13881404
// finally reached the last view item, current iteration ends

pulsar/producer_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@ import (
2929
"testing"
3030
"time"
3131

32-
"github.com/apache/pulsar-client-go/pulsar/internal"
33-
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
3432
"github.com/stretchr/testify/assert"
3533
"google.golang.org/protobuf/proto"
3634

35+
"github.com/apache/pulsar-client-go/pulsar/internal"
36+
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
37+
38+
log "github.com/sirupsen/logrus"
39+
3740
"github.com/apache/pulsar-client-go/pulsar/crypto"
3841
plog "github.com/apache/pulsar-client-go/pulsar/log"
39-
log "github.com/sirupsen/logrus"
4042
)
4143

4244
func TestInvalidURL(t *testing.T) {
@@ -1168,7 +1170,7 @@ func TestTopicTermination(t *testing.T) {
11681170
topicName := newTopicName()
11691171
consumer, err := client.Subscribe(ConsumerOptions{
11701172
Topic: topicName,
1171-
SubscriptionName: "send_timeout_sub",
1173+
SubscriptionName: "topic_terminated_sub",
11721174
})
11731175
assert.Nil(t, err)
11741176
defer consumer.Close() // subscribe but do nothing
@@ -1189,7 +1191,7 @@ func TestTopicTermination(t *testing.T) {
11891191
})
11901192
if err != nil {
11911193
e := err.(*Error)
1192-
if e.result == TopicTerminated {
1194+
if e.result == TopicTerminated || err == errProducerClosed {
11931195
terminatedChan <- true
11941196
} else {
11951197
terminatedChan <- false
@@ -1210,6 +1212,7 @@ func TestTopicTermination(t *testing.T) {
12101212
return
12111213
case <-afterCh:
12121214
assert.Fail(t, "Time is up. Topic should have been terminated by now")
1215+
return
12131216
}
12141217
}
12151218
}

0 commit comments

Comments
 (0)