@@ -21,6 +21,7 @@ import (
2121 "context"
2222 "errors"
2323 "fmt"
24+ "log/slog"
2425 "net/http"
2526 "os"
2627 "strconv"
@@ -2952,3 +2953,136 @@ func TestPartitionUpdateFailed(t *testing.T) {
29522953 time .Sleep (time .Second * 1 )
29532954 }
29542955}
2956+
2957+ type testReconnectBackoffPolicy struct {
2958+ curBackoff , minBackoff , maxBackoff time.Duration
2959+ retryTime int
2960+ lock sync.Mutex
2961+ }
2962+
2963+ func newTestReconnectBackoffPolicy (minBackoff , maxBackoff time.Duration ) * testReconnectBackoffPolicy {
2964+ return & testReconnectBackoffPolicy {
2965+ curBackoff : 0 ,
2966+ minBackoff : minBackoff ,
2967+ maxBackoff : maxBackoff ,
2968+ }
2969+ }
2970+
2971+ func (b * testReconnectBackoffPolicy ) Next () time.Duration {
2972+ b .lock .Lock ()
2973+ defer b .lock .Unlock ()
2974+
2975+ // Double the delay each time
2976+ b .curBackoff += b .curBackoff
2977+ if b .curBackoff .Nanoseconds () < b .minBackoff .Nanoseconds () {
2978+ b .curBackoff = b .minBackoff
2979+ } else if b .curBackoff .Nanoseconds () > b .maxBackoff .Nanoseconds () {
2980+ b .curBackoff = b .maxBackoff
2981+ }
2982+ b .retryTime ++
2983+ return b .curBackoff
2984+ }
2985+ func (b * testReconnectBackoffPolicy ) IsMaxBackoffReached () bool {
2986+ b .lock .Lock ()
2987+ defer b .lock .Unlock ()
2988+ return b .curBackoff >= b .maxBackoff
2989+ }
2990+
2991+ func (b * testReconnectBackoffPolicy ) Reset () {
2992+ }
2993+
2994+ func (b * testReconnectBackoffPolicy ) IsExpectedIntervalFrom () bool {
2995+ return true
2996+ }
2997+
2998+ func TestProducerReconnectWhenBacklogQuotaExceed (t * testing.T ) {
2999+ logger := slog .New (slog .NewJSONHandler (os .Stdout , & slog.HandlerOptions {Level : slog .LevelInfo }))
3000+ slog .SetDefault (logger )
3001+ client , err := NewClient (ClientOptions {
3002+ URL : serviceURL ,
3003+ Logger : plog .NewLoggerWithSlog (logger ),
3004+ })
3005+ defer client .Close ()
3006+ namespace := "public/" + generateRandomName ()
3007+ assert .NoError (t , err )
3008+ admin , err := pulsaradmin .NewClient (& config.Config {
3009+ WebServiceURL : adminURL ,
3010+ })
3011+ assert .NoError (t , err )
3012+ // Step 1: Create namespace and configure 512KB backlog quota with producer_exception policy
3013+ // When subscription backlog stats refresh and reach the limit, producer will encounter BlockQuotaExceed exception
3014+ err = admin .Namespaces ().CreateNamespace (namespace )
3015+ assert .NoError (t , err )
3016+ err = admin .Namespaces ().SetBacklogQuota (
3017+ namespace ,
3018+ utils .NewBacklogQuota (10 * 1024 , - 1 , utils .ProducerException ),
3019+ utils .DestinationStorage ,
3020+ )
3021+ assert .NoError (t , err )
3022+
3023+ // Verify backlog quota configuration
3024+ quotaMap , err := admin .Namespaces ().GetBacklogQuotaMap (namespace )
3025+ assert .NoError (t , err )
3026+ logger .Info (fmt .Sprintf ("quotaMap = %v" , quotaMap ))
3027+
3028+ // Create test topic
3029+ topicName := namespace + "/test-topic"
3030+ tn , err := utils .GetTopicName (topicName )
3031+ assert .NoError (t , err )
3032+ err = admin .Topics ().Create (* tn , 1 )
3033+ assert .NoError (t , err )
3034+
3035+ // Step 2: Create consumer with small receiver queue size and earliest subscription position
3036+ // This ensures that once 512KB message is sent, producer will quickly reach backlog quota limit
3037+ _consumer , err := client .Subscribe (ConsumerOptions {
3038+ Topic : topicName ,
3039+ SubscriptionName : "my-sub" ,
3040+ Type : Exclusive ,
3041+ ReceiverQueueSize : 1 ,
3042+ SubscriptionInitialPosition : SubscriptionPositionEarliest ,
3043+ })
3044+ assert .Nil (t , err )
3045+ defer _consumer .Close ()
3046+
3047+ // Step 3: Create producer with custom backoff policy to reduce retry interval
3048+ bo := newTestReconnectBackoffPolicy (100 * time .Millisecond , 1 * time .Second )
3049+ _producer , err := client .CreateProducer (ProducerOptions {
3050+ Topic : topicName ,
3051+ DisableBatching : true ,
3052+ SendTimeout : 5 * time .Minute ,
3053+ BackOffPolicyFunc : func () backoff.Policy {
3054+ return bo
3055+ },
3056+ })
3057+ assert .NoError (t , err )
3058+ defer _producer .Close ()
3059+
3060+ // Step 4: Send 512KB messages and monitor statistics
3061+ // Limit to 10 iterations to avoid infinite loop in test
3062+ isReachMaxBackoff := false
3063+ for i := 0 ; i < 10 ; i ++ {
3064+ _producer .SendAsync (context .Background (), & ProducerMessage {
3065+ Payload : make ([]byte , 512 * 1024 ),
3066+ }, func (msgId MessageID , _ * ProducerMessage , err error ) {
3067+ if err != nil {
3068+ logger .Error ("sendAsync fail" , "time" , time .Now ().String (), "err" , err .Error ())
3069+ return
3070+ }
3071+ logger .Info ("sendAsync success" , "msgId" , msgId .String (), "time" , time .Now ().String ())
3072+ })
3073+
3074+ // Get topic statistics for debugging
3075+ stats , err := admin .Topics ().GetPartitionedStats (* tn , false )
3076+ assert .NoError (t , err )
3077+ logger .Info ("current backlogSize" , "backlogSize" , stats .Subscriptions ["my-sub" ].BacklogSize )
3078+ if bo .IsMaxBackoffReached () {
3079+ isReachMaxBackoff = true
3080+ break
3081+ }
3082+ time .Sleep (10 * time .Second )
3083+ }
3084+
3085+ // Step 5: Verify that backoff mechanism reaches maximum retry limit
3086+ // This indicates that producer successfully detected backlog quota limit and triggered reconnection mechanism
3087+ assert .True (t , isReachMaxBackoff )
3088+ }
0 commit comments