diff --git a/go.mod b/go.mod index 30a2eae5..11fc2595 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/stackup-wallet/stackup-bundler go 1.19 require ( + github.com/cenkalti/backoff/v4 v4.2.1 github.com/deckarep/golang-set/v2 v2.3.0 github.com/dgraph-io/badger/v3 v3.2103.5 github.com/ethereum/go-ethereum v1.11.5 @@ -29,6 +30,7 @@ require ( go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/sdk/metric v0.39.0 go.opentelemetry.io/otel/trace v1.16.0 + go.uber.org/multierr v1.8.0 golang.org/x/sync v0.1.0 golang.org/x/text v0.9.0 google.golang.org/grpc v1.55.0 @@ -39,7 +41,6 @@ require ( github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect github.com/bytedance/sonic v1.8.0 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect @@ -91,6 +92,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.39.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect + go.uber.org/atomic v1.9.0 // indirect golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect golang.org/x/crypto v0.7.0 // indirect golang.org/x/net v0.10.0 // indirect diff --git a/go.sum b/go.sum index 21bdf2f1..428874c5 100644 --- a/go.sum +++ b/go.sum @@ -445,7 +445,12 @@ go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLk go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/pkg/bundler/bundler.go b/pkg/bundler/bundler.go index 13e04958..8a7b02f1 100644 --- a/pkg/bundler/bundler.go +++ b/pkg/bundler/bundler.go @@ -6,6 +6,7 @@ import ( "math/big" "time" + backoff "github.com/cenkalti/backoff/v4" "github.com/ethereum/go-ethereum/common" "github.com/go-logr/logr" "github.com/stackup-wallet/stackup-bundler/internal/logger" @@ -16,6 +17,7 @@ import ( "github.com/stackup-wallet/stackup-bundler/pkg/userop" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" + "go.uber.org/multierr" ) // Bundler controls the end to end process of creating a batch of UserOperations from the mempool and sending @@ -196,20 +198,32 @@ func (i *Bundler) Run() error { return nil } - ticker := time.NewTicker(1 * time.Second) + // Construct an exponential backoff to avoid overwhelming the system if error + // happens during the ticking. + bo := backoff.NewExponentialBackOff() + { + bo.InitialInterval = 5 * time.Second + bo.Multiplier = 2 + bo.MaxElapsedTime = 0 + bo.Reset() + } + + ticker := time.NewTicker(bo.InitialInterval) go func(i *Bundler) { for { select { case <-i.done: return case <-ticker.C: - for _, ep := range i.supportedEntryPoints { - _, err := i.Process(ep) - if err != nil { - // Already logged. - continue - } + if err := i.runOnce(); err != nil { + // Use exponential backoff. + ticker.Reset(bo.NextBackOff()) + continue } + + // Reset back to normal ticking. + ticker.Reset(bo.InitialInterval) + bo.Reset() } } }(i) @@ -219,6 +233,20 @@ func (i *Bundler) Run() error { return nil } +func (i *Bundler) runOnce() error { + var tickErrs []error + for _, ep := range i.supportedEntryPoints { + _, err := i.Process(ep) + if err != nil { + // Already logged. + tickErrs = append(tickErrs, err) + continue + } + } + + return multierr.Combine(tickErrs...) +} + // Stop signals the bundler to stop continuously processing batches from the mempool. func (i *Bundler) Stop() { if !i.isRunning {