Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 67 additions & 8 deletions pkg/debounce/debounce.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,57 @@
// Package debounce provides a mechanism for rate-limiting and retrying RPC requests
// in a controlled manner. It includes support for handling rate limit errors by
// dynamically adjusting retry intervals and limiting burst rates. The package uses
// a rate limiter to ensure that requests stay within allowed limits, and provides
// detailed handling of HTTP and JSON-RPC error codes to decide whether retries
// should be attempted or the error is unrecoverable.
package debounce

import (
"context"
"strings"
"sync/atomic"

"github.com/ipfs/go-log/v2"
"golang.org/x/time/rate"
)

const (
// rateLimit is the maximum number of requests per second.
// As of spring 2024, Infura enables 10 req/s rate limit for free plan.
rateLimit = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to add an option to pass custom rateLimit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think paid Infura accounts allow more than 10 rps

// rateLimitDebounceSeconds is the upper limit of seconds to wait before retrying the request.
// Wait time is increased slowly until it matches this value.
rateLimitDebounceSeconds = 60
maxBurst = rateLimit * rateLimitDebounceSeconds
// burstDecrementAfter "debounces" the burst decrement.
//
// It is a number of consecutive successful requests
// after which the burst is decremented.
// This is to find the right tempo (request rate)
// if the RPC is under heavy load or is unstable.
//
// Consider the following case: if request Nth-2 was rate-limited and Nth-1 was successful,
// it does not mean that Nth request will be successful.
// Maybe the rate limit is right on the edge,
// then it does not make any sense to decrease the burst after Nth-1.
burstDecrementAfter = 5
// burstBias is to avoid rate limiting on the first few requests.
// The client will eventually work its way up to the actual rate limit.
burstBias = 3
)

var (
// rpcRateLimiter limits the number of requests to the RPC provider for DEX drivers.
// As of spring 2024, Infura enables 10 req/s rate limit for free plan.
// A lower limit of 5 req/s is used here just to be safe.
rpcRateLimiter = rate.NewLimiter(5, 1)
httpRpcErrors = map[int]rpcError{
rpcRateLimiter = rate.NewLimiter(rate.Limit(rateLimit), maxBurst)

// burst is a shared counter that is used to adjust the rate limiter's burst.
burst = atomic.Int64{}
// burstSuccessCount is a shared counter
// that is used to track the number of successful requests
// after which the burst is decremented.
burstSuccessCount = atomic.Int64{}

httpRpcErrors = map[int]rpcError{
400: {
Recoverable: false,
Patterns: []string{"400"},
Expand Down Expand Up @@ -52,6 +90,7 @@ var (
"project ID request rate exceeded",
"daily request count exceeded",
"request rate limited",
"Too Many Requests",
},
Message: "Too Many Requests: The daily request total or request per second are higher than your plan allows. Refer to the Avoid rate limiting topic for more information.",
},
Expand Down Expand Up @@ -133,6 +172,10 @@ var (
}
)

func init() {
burst.Store(rateLimit * burstBias) // to avoid rate limiting on the first few requests
}

type rpcError struct {
Message string
Patterns []string
Expand All @@ -146,16 +189,25 @@ func Debounce(
logger *log.ZapEventLogger,
f func(context.Context) error,
) error {
outer:
for {
if err := rpcRateLimiter.Wait(ctx); err != nil {
currBurst := int(burst.Load())
if err := rpcRateLimiter.WaitN(ctx, int(currBurst)); err != nil {
if logger != nil {
logger.Warnf("failed to acquire rate limiter: %s", err)
logger.Warnw("failed to acquire rate limiter", "error", err)
}
return err
}

err := f(ctx)
if err == nil {
successCount := burstSuccessCount.Add(1)
if currBurst > 1 && successCount >= burstDecrementAfter {
burstSuccessCount.Store(0) // reset counter
burst.Add(-1) // decrease burst on success
logger.Debugw("decremented burst", "new_burst", currBurst-1)
}

return nil
}

Expand All @@ -164,11 +216,18 @@ func Debounce(
for _, pattern := range httpRpcError.Patterns {
if strings.Contains(err.Error(), pattern) && httpRpcError.Recoverable {
if logger != nil {
logger.Warn("recoverable error",
logger.Warnw("recoverable error",
"burst", currBurst,
"message", httpRpcError.Message,
"error", err)
}
continue // retry the request after a while

// Adjust burst to wait a bit longer
if currBurst < maxBurst {
burst.Add(1)
}

continue outer // retry the request after a while
}
}
}
Expand Down