diff --git a/pkg/debounce/debounce.go b/pkg/debounce/debounce.go index c72174ee..84391b6e 100644 --- a/pkg/debounce/debounce.go +++ b/pkg/debounce/debounce.go @@ -1,19 +1,57 @@ +// Package debounce provides a mechanism for rate-limiting and retrying RPC requests +// in a controlled manner. It includes support for handling rate limit errors by +// dynamically adjusting retry intervals and limiting burst rates. The package uses +// a rate limiter to ensure that requests stay within allowed limits, and provides +// detailed handling of HTTP and JSON-RPC error codes to decide whether retries +// should be attempted or the error is unrecoverable. package debounce import ( "context" "strings" + "sync/atomic" "github.com/ipfs/go-log/v2" "golang.org/x/time/rate" ) +const ( + // rateLimit is the maximum number of requests per second. + // As of spring 2024, Infura enables 10 req/s rate limit for free plan. + rateLimit = 10 + // rateLimitDebounceSeconds is the upper limit of seconds to wait before retrying the request. + // Wait time is increased slowly until it matches this value. + rateLimitDebounceSeconds = 60 + maxBurst = rateLimit * rateLimitDebounceSeconds + // burstDecrementAfter "debounces" the burst decrement. + // + // It is a number of consecutive successful requests + // after which the burst is decremented. + // This is to find the right tempo (request rate) + // if the RPC is under heavy load or is unstable. + // + // Consider the following case: if request Nth-2 was rate-limited and Nth-1 was successful, + // it does not mean that Nth request will be successful. + // Maybe the rate limit is right on the edge, + // then it does not make any sense to decrease the burst after Nth-1. + burstDecrementAfter = 5 + // burstBias is to avoid rate limiting on the first few requests. + // The client will eventually work its way up to the actual rate limit. + burstBias = 3 +) + var ( // rpcRateLimiter limits the number of requests to the RPC provider for DEX drivers. - // As of spring 2024, Infura enables 10 req/s rate limit for free plan. - // A lower limit of 5 req/s is used here just to be safe. - rpcRateLimiter = rate.NewLimiter(5, 1) - httpRpcErrors = map[int]rpcError{ + rpcRateLimiter = rate.NewLimiter(rate.Limit(rateLimit), maxBurst) + + // burst is a shared counter that is used to adjust the rate limiter's burst. + burst = atomic.Int64{} + // burstSuccessCount is a shared counter + // that is used to track the number of successful requests + // after which the burst is decremented. + burstSuccessCount = atomic.Int64{} + + httpRpcErrors = map[int]rpcError{ 400: { Recoverable: false, Patterns: []string{"400"}, @@ -52,6 +90,7 @@ var ( "project ID request rate exceeded", "daily request count exceeded", "request rate limited", + "Too Many Requests", }, Message: "Too Many Requests: The daily request total or request per second are higher than your plan allows. Refer to the Avoid rate limiting topic for more information.", }, @@ -133,6 +172,10 @@ var ( } ) +func init() { + burst.Store(rateLimit * burstBias) // to avoid rate limiting on the first few requests +} + type rpcError struct { Message string Patterns []string @@ -146,16 +189,25 @@ func Debounce( logger *log.ZapEventLogger, f func(context.Context) error, ) error { +outer: for { - if err := rpcRateLimiter.Wait(ctx); err != nil { + currBurst := int(burst.Load()) + if err := rpcRateLimiter.WaitN(ctx, int(currBurst)); err != nil { if logger != nil { - logger.Warnf("failed to acquire rate limiter: %s", err) + logger.Warnw("failed to acquire rate limiter", "error", err) } return err } err := f(ctx) if err == nil { + successCount := burstSuccessCount.Add(1) + if currBurst > 1 && successCount >= burstDecrementAfter { + burstSuccessCount.Store(0) // reset counter + burst.Add(-1) // decrease burst on success + logger.Debugw("decremented burst", "new_burst", currBurst-1) + } + return nil } @@ -164,11 +216,18 @@ func Debounce( for _, pattern := range httpRpcError.Patterns { if strings.Contains(err.Error(), pattern) && httpRpcError.Recoverable { if logger != nil { - logger.Warn("recoverable error", + logger.Warnw("recoverable error", + "burst", currBurst, "message", httpRpcError.Message, "error", err) } - continue // retry the request after a while + + // Adjust burst to wait a bit longer + if currBurst < maxBurst { + burst.Add(1) + } + + continue outer // retry the request after a while } } }