Skip to content

Commit 1c5c317

Browse files
committed
feat: regard ProducerBlockedQuotaExceededException as retryable exception to continue to reconnect
1 parent 203dcf1 commit 1c5c317

File tree

3 files changed

+140
-4
lines changed

3 files changed

+140
-4
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ lint: bin/golangci-lint
4444
bin/golangci-lint run
4545

4646
bin/golangci-lint:
47-
GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.61.0
47+
GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.64.2
4848

4949
# an alternative to above `make lint` command
5050
# use golangCi-lint docker to avoid local golang env issues
5151
# https://golangci-lint.run/welcome/install/
5252
lint-docker:
53-
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.61.0 golangci-lint run -v
53+
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.64.2 golangci-lint run -v
5454

5555
container:
5656
docker build -t ${IMAGE_NAME} \

pulsar/producer_partition.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,9 +523,10 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
523523
}
524524

525525
if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
526-
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
526+
// ProducerBlockedQuotaExceededException is a retryable exception,
527+
// we only fail pending messages but continue trying to reconnect
528+
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, will retry reconnecting")
527529
p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
528-
return struct{}{}, nil
529530
}
530531

531532
if strings.Contains(errMsg, errMsgProducerFenced) {

pulsar/producer_test.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"log/slog"
2425
"net/http"
2526
"os"
2627
"strconv"
@@ -2952,3 +2953,137 @@ func TestPartitionUpdateFailed(t *testing.T) {
29522953
time.Sleep(time.Second * 1)
29532954
}
29542955
}
2956+
2957+
type testReconnectBackoffPolicy struct {
2958+
curBackoff, minBackoff, maxBackoff time.Duration
2959+
retryTime int
2960+
lock sync.Mutex
2961+
}
2962+
2963+
func newTestReconnectBackoffPolicy(minBackoff, maxBackoff time.Duration) *testReconnectBackoffPolicy {
2964+
return &testReconnectBackoffPolicy{
2965+
curBackoff: 0,
2966+
minBackoff: minBackoff,
2967+
maxBackoff: maxBackoff,
2968+
}
2969+
}
2970+
2971+
func (b *testReconnectBackoffPolicy) Next() time.Duration {
2972+
b.lock.Lock()
2973+
defer b.lock.Unlock()
2974+
2975+
// Double the delay each time
2976+
b.curBackoff += b.curBackoff
2977+
if b.curBackoff.Nanoseconds() < b.minBackoff.Nanoseconds() {
2978+
b.curBackoff = b.minBackoff
2979+
} else if b.curBackoff.Nanoseconds() > b.maxBackoff.Nanoseconds() {
2980+
b.curBackoff = b.maxBackoff
2981+
}
2982+
b.retryTime++
2983+
return b.curBackoff
2984+
}
2985+
func (b *testReconnectBackoffPolicy) IsMaxBackoffReached() bool {
2986+
b.lock.Lock()
2987+
defer b.lock.Unlock()
2988+
return b.curBackoff >= b.maxBackoff
2989+
}
2990+
2991+
func (b *testReconnectBackoffPolicy) Reset() {
2992+
}
2993+
2994+
func (b *testReconnectBackoffPolicy) IsExpectedIntervalFrom() bool {
2995+
return true
2996+
}
2997+
2998+
func TestProducerReconnectWhenBacklogQuotaExceed(t *testing.T) {
2999+
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
3000+
slog.SetDefault(logger)
3001+
client, err := NewClient(ClientOptions{
3002+
URL: serviceURL,
3003+
Logger: plog.NewLoggerWithSlog(logger),
3004+
})
3005+
defer client.Close()
3006+
namespace := "public/" + generateRandomName()
3007+
assert.NoError(t, err)
3008+
admin, err := pulsaradmin.NewClient(&config.Config{
3009+
WebServiceURL: adminURL,
3010+
})
3011+
assert.NoError(t, err)
3012+
// Step 1: Create namespace and configure 10KB backlog quota with producer_exception policy
3013+
// When subscription backlog stats refresh and reach the limit, producer will encounter BlockQuotaExceed exception
3014+
err = admin.Namespaces().CreateNamespace(namespace)
3015+
assert.NoError(t, err)
3016+
err = admin.Namespaces().SetBacklogQuota(
3017+
namespace,
3018+
utils.NewBacklogQuota(10*1024, -1, utils.ProducerException),
3019+
utils.DestinationStorage,
3020+
)
3021+
assert.NoError(t, err)
3022+
3023+
// Verify backlog quota configuration
3024+
quotaMap, err := admin.Namespaces().GetBacklogQuotaMap(namespace)
3025+
assert.NoError(t, err)
3026+
logger.Info(fmt.Sprintf("quotaMap = %v", quotaMap))
3027+
3028+
// Create test topic
3029+
topicName := namespace + "/test-topic"
3030+
tn, err := utils.GetTopicName(topicName)
3031+
assert.NoError(t, err)
3032+
err = admin.Topics().Create(*tn, 1)
3033+
assert.NoError(t, err)
3034+
3035+
// Step 2: Create consumer with small receiver queue size and earliest subscription position
3036+
// This ensures that by sending a 512KB message (much larger than the 10KB backlog quota),
3037+
// the producer will quickly reach the backlog quota limit
3038+
_consumer, err := client.Subscribe(ConsumerOptions{
3039+
Topic: topicName,
3040+
SubscriptionName: "my-sub",
3041+
Type: Exclusive,
3042+
ReceiverQueueSize: 1,
3043+
SubscriptionInitialPosition: SubscriptionPositionEarliest,
3044+
})
3045+
assert.Nil(t, err)
3046+
defer _consumer.Close()
3047+
3048+
// Step 3: Create producer with custom backoff policy to reduce retry interval
3049+
bo := newTestReconnectBackoffPolicy(100*time.Millisecond, 1*time.Second)
3050+
_producer, err := client.CreateProducer(ProducerOptions{
3051+
Topic: topicName,
3052+
DisableBatching: true,
3053+
SendTimeout: 5 * time.Minute,
3054+
BackOffPolicyFunc: func() backoff.Policy {
3055+
return bo
3056+
},
3057+
})
3058+
assert.NoError(t, err)
3059+
defer _producer.Close()
3060+
3061+
// Step 4: Send 512KB messages and monitor statistics
3062+
// Limit to 10 iterations to avoid infinite loop in test
3063+
isReachMaxBackoff := false
3064+
for i := 0; i < 10; i++ {
3065+
_producer.SendAsync(context.Background(), &ProducerMessage{
3066+
Payload: make([]byte, 512*1024),
3067+
}, func(msgId MessageID, _ *ProducerMessage, err error) {
3068+
if err != nil {
3069+
logger.Error("sendAsync fail", "time", time.Now().String(), "err", err.Error())
3070+
return
3071+
}
3072+
logger.Info("sendAsync success", "msgId", msgId.String(), "time", time.Now().String())
3073+
})
3074+
3075+
// Get topic statistics for debugging
3076+
stats, err := admin.Topics().GetPartitionedStats(*tn, false)
3077+
assert.NoError(t, err)
3078+
logger.Info("current backlogSize", "backlogSize", stats.Subscriptions["my-sub"].BacklogSize)
3079+
if bo.IsMaxBackoffReached() {
3080+
isReachMaxBackoff = true
3081+
break
3082+
}
3083+
time.Sleep(10 * time.Second)
3084+
}
3085+
3086+
// Step 5: Verify that backoff mechanism reaches maximum retry limit
3087+
// This indicates that producer successfully detected backlog quota limit and triggered reconnection mechanism
3088+
assert.True(t, isReachMaxBackoff)
3089+
}

0 commit comments

Comments
 (0)