@@ -19,8 +19,14 @@ import (
1919 "github.com/evstack/ev-node/pkg/store"
2020)
2121
22- // DefaultInterval is the default reaper interval
23- const DefaultInterval = 1 * time .Second
22+ const (
23+ // DefaultInterval is the default reaper interval
24+ DefaultInterval = 1 * time .Second
25+ // MaxBackoffInterval is the maximum backoff interval for retries
26+ MaxBackoffInterval = 30 * time .Second
27+ // BackoffMultiplier is the multiplier for exponential backoff
28+ BackoffMultiplier = 2
29+ )
2430
2531// Reaper is responsible for periodically retrieving transactions from the executor,
2632// filtering out already seen transactions, and submitting new transactions to the sequencer.
@@ -76,7 +82,7 @@ func NewReaper(
7682func (r * Reaper ) Start (ctx context.Context ) error {
7783 r .ctx , r .cancel = context .WithCancel (ctx )
7884
79- // Start repear loop
85+ // Start reaper loop
8086 r .wg .Add (1 )
8187 go func () {
8288 defer r .wg .Done ()
@@ -91,12 +97,35 @@ func (r *Reaper) reaperLoop() {
9197 ticker := time .NewTicker (r .interval )
9298 defer ticker .Stop ()
9399
100+ consecutiveFailures := 0
101+
94102 for {
95103 select {
96104 case <- r .ctx .Done ():
97105 return
98106 case <- ticker .C :
99- r .SubmitTxs ()
107+ err := r .SubmitTxs ()
108+ if err != nil {
109+ // Increment failure counter and apply exponential backoff
110+ consecutiveFailures ++
111+ backoff := r .interval * time .Duration (1 << min (consecutiveFailures , 5 )) // Cap at 2^5 = 32x
112+ backoff = min (backoff , MaxBackoffInterval )
113+ r .logger .Warn ().
114+ Err (err ).
115+ Int ("consecutive_failures" , consecutiveFailures ).
116+ Dur ("next_retry_in" , backoff ).
117+ Msg ("reaper encountered error, applying backoff" )
118+
119+ // Reset ticker with backoff interval
120+ ticker .Reset (backoff )
121+ } else {
122+ // Reset failure counter and backoff on success
123+ if consecutiveFailures > 0 {
124+ r .logger .Info ().Msg ("reaper recovered from errors, resetting backoff" )
125+ consecutiveFailures = 0
126+ ticker .Reset (r .interval )
127+ }
128+ }
100129 }
101130 }
102131}
@@ -113,34 +142,42 @@ func (r *Reaper) Stop() error {
113142}
114143
115144// SubmitTxs retrieves transactions from the executor and submits them to the sequencer.
116- func (r * Reaper ) SubmitTxs () {
145+ // Returns an error if any critical operation fails.
146+ func (r * Reaper ) SubmitTxs () error {
117147 txs , err := r .exec .GetTxs (r .ctx )
118148 if err != nil {
119149 r .logger .Error ().Err (err ).Msg ("failed to get txs from executor" )
120- return
150+ return fmt . Errorf ( "failed to get txs from executor: %w" , err )
121151 }
122152 if len (txs ) == 0 {
123153 r .logger .Debug ().Msg ("no new txs" )
124- return
154+ return nil
125155 }
126156
127157 var newTxs [][]byte
158+ var seenStoreErrors int
128159 for _ , tx := range txs {
129160 txHash := hashTx (tx )
130161 key := ds .NewKey (txHash )
131162 has , err := r .seenStore .Has (r .ctx , key )
132163 if err != nil {
133164 r .logger .Error ().Err (err ).Msg ("failed to check seenStore" )
165+ seenStoreErrors ++
134166 continue
135167 }
136168 if ! has {
137169 newTxs = append (newTxs , tx )
138170 }
139171 }
140172
173+ // If all transactions failed seenStore check, return error
174+ if seenStoreErrors > 0 && len (newTxs ) == 0 {
175+ return fmt .Errorf ("failed to check seenStore for all %d transactions" , seenStoreErrors )
176+ }
177+
141178 if len (newTxs ) == 0 {
142179 r .logger .Debug ().Msg ("no new txs to submit" )
143- return
180+ return nil
144181 }
145182
146183 r .logger .Debug ().Int ("txCount" , len (newTxs )).Msg ("submitting txs to sequencer" )
@@ -150,14 +187,14 @@ func (r *Reaper) SubmitTxs() {
150187 Batch : & coresequencer.Batch {Transactions : newTxs },
151188 })
152189 if err != nil {
153- r .logger .Error ().Err (err ).Msg ("failed to submit txs to sequencer" )
154- return
190+ return fmt .Errorf ("failed to submit txs to sequencer: %w" , err )
155191 }
156192
157193 for _ , tx := range newTxs {
158194 txHash := hashTx (tx )
159195 key := ds .NewKey (txHash )
160196 if err := r .seenStore .Put (r .ctx , key , []byte {1 }); err != nil {
197+ // Log but don't fail on persistence errors
161198 r .logger .Error ().Err (err ).Str ("txHash" , txHash ).Msg ("failed to persist seen tx" )
162199 }
163200 }
@@ -169,6 +206,7 @@ func (r *Reaper) SubmitTxs() {
169206 }
170207
171208 r .logger .Debug ().Msg ("successfully submitted txs" )
209+ return nil
172210}
173211
174212// SeenStore returns the datastore used to track seen transactions.
0 commit comments