Skip to content

Commit 1917bd7

Browse files
authored
refactor(block): add retries on SetFinal (#2721)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Add retries on SetFinal. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent bc6a0b1 commit 1917bd7

File tree

7 files changed

+256
-40
lines changed

7 files changed

+256
-40
lines changed

apps/evm/single/cmd/rollback.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,12 @@ func NewRollbackCmd() *cobra.Command {
9393
}
9494

9595
if err := headerStore.Start(goCtx); err != nil {
96-
return err
96+
return fmt.Errorf("failed to start header store: %w", err)
9797
}
9898
defer headerStore.Stop(goCtx)
9999

100100
if err := dataStore.Start(goCtx); err != nil {
101-
return err
101+
return fmt.Errorf("failed to start data store: %w", err)
102102
}
103103
defer dataStore.Stop(goCtx)
104104

File renamed without changes.

block/internal/common/retry.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package common
2+
3+
import "time"
4+
5+
// MaxRetriesBeforeHalt is the maximum number of retries before halting.
6+
const MaxRetriesBeforeHalt = 3
7+
8+
// MaxRetriesTimeout is the maximum time to wait before halting.
9+
const MaxRetriesTimeout = 10 * time.Second

block/internal/submitting/submitter.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (s *Submitter) processDAInclusionLoop() {
218218
}
219219

220220
// Set final height in executor
221-
if err := s.exec.SetFinal(s.ctx, nextHeight); err != nil {
221+
if err := s.setFinalWithRetry(nextHeight); err != nil {
222222
s.sendCriticalError(fmt.Errorf("failed to set final height: %w", err))
223223
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("failed to set final height")
224224
break
@@ -239,6 +239,34 @@ func (s *Submitter) processDAInclusionLoop() {
239239
}
240240
}
241241

242+
// setFinalWithRetry sets the final height in executor with retry logic
243+
func (s *Submitter) setFinalWithRetry(nextHeight uint64) error {
244+
for attempt := 1; attempt <= common.MaxRetriesBeforeHalt; attempt++ {
245+
if err := s.exec.SetFinal(s.ctx, nextHeight); err != nil {
246+
if attempt == common.MaxRetriesBeforeHalt {
247+
return fmt.Errorf("failed to set final height after %d attempts: %w", attempt, err)
248+
}
249+
250+
s.logger.Error().Err(err).
251+
Int("attempt", attempt).
252+
Int("max_attempts", common.MaxRetriesBeforeHalt).
253+
Uint64("da_height", nextHeight).
254+
Msg("failed to set final height, retrying")
255+
256+
select {
257+
case <-time.After(common.MaxRetriesTimeout):
258+
continue
259+
case <-s.ctx.Done():
260+
return fmt.Errorf("context cancelled during retry: %w", s.ctx.Err())
261+
}
262+
}
263+
264+
return nil
265+
}
266+
267+
return nil
268+
}
269+
242270
// GetDAIncludedHeight returns the DA included height
243271
func (s *Submitter) GetDAIncludedHeight() uint64 {
244272
s.daStateMtx.RLock()

block/internal/submitting/submitter_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package submitting
33
import (
44
"context"
55
"encoding/binary"
6+
"errors"
67
"fmt"
78
"sync"
89
"testing"
@@ -27,6 +28,82 @@ import (
2728
"github.com/libp2p/go-libp2p/core/crypto"
2829
)
2930

31+
func TestSubmitter_setFinalWithRetry(t *testing.T) {
32+
t.Parallel()
33+
34+
tests := []struct {
35+
name string
36+
setupMock func(*testmocks.MockExecutor)
37+
expectSuccess bool
38+
expectAttempts int
39+
expectError string
40+
}{
41+
{
42+
name: "success on first attempt",
43+
setupMock: func(exec *testmocks.MockExecutor) {
44+
exec.On("SetFinal", mock.Anything, uint64(100)).Return(nil).Once()
45+
},
46+
expectSuccess: true,
47+
expectAttempts: 1,
48+
},
49+
{
50+
name: "success on second attempt",
51+
setupMock: func(exec *testmocks.MockExecutor) {
52+
exec.On("SetFinal", mock.Anything, uint64(100)).Return(errors.New("temporary failure")).Once()
53+
exec.On("SetFinal", mock.Anything, uint64(100)).Return(nil).Once()
54+
},
55+
expectSuccess: true,
56+
expectAttempts: 2,
57+
},
58+
{
59+
name: "success on third attempt",
60+
setupMock: func(exec *testmocks.MockExecutor) {
61+
exec.On("SetFinal", mock.Anything, uint64(100)).Return(errors.New("temporary failure")).Times(2)
62+
exec.On("SetFinal", mock.Anything, uint64(100)).Return(nil).Once()
63+
},
64+
expectSuccess: true,
65+
expectAttempts: 3,
66+
},
67+
{
68+
name: "failure after max retries",
69+
setupMock: func(exec *testmocks.MockExecutor) {
70+
exec.On("SetFinal", mock.Anything, uint64(100)).Return(errors.New("persistent failure")).Times(common.MaxRetriesBeforeHalt)
71+
},
72+
expectSuccess: false,
73+
expectError: "failed to set final height after 3 attempts",
74+
},
75+
}
76+
77+
for _, tt := range tests {
78+
t.Run(tt.name, func(t *testing.T) {
79+
t.Parallel()
80+
81+
ctx := context.Background()
82+
exec := testmocks.NewMockExecutor(t)
83+
tt.setupMock(exec)
84+
85+
s := &Submitter{
86+
exec: exec,
87+
ctx: ctx,
88+
logger: zerolog.Nop(),
89+
}
90+
91+
err := s.setFinalWithRetry(100)
92+
93+
if tt.expectSuccess {
94+
require.NoError(t, err)
95+
} else {
96+
require.Error(t, err)
97+
if tt.expectError != "" {
98+
assert.Contains(t, err.Error(), tt.expectError)
99+
}
100+
}
101+
102+
exec.AssertExpectations(t)
103+
})
104+
}
105+
}
106+
30107
func TestSubmitter_IsHeightDAIncluded(t *testing.T) {
31108
t.Parallel()
32109

block/internal/syncing/syncer.go

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,6 @@ type p2pHandler interface {
3030
ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
3131
}
3232

33-
const (
34-
// maxRetriesBeforeHalt is the maximum number of retries against the execution client before halting the syncer.
35-
maxRetriesBeforeHalt = 3
36-
// maxRetriesTimeout is the maximum time to wait for a retry before halting the syncer.
37-
maxRetriesTimeout = time.Second * 10
38-
)
39-
4033
// Syncer handles block synchronization from DA and P2P sources.
4134
type Syncer struct {
4235
// Core components
@@ -76,10 +69,9 @@ type Syncer struct {
7669
logger zerolog.Logger
7770

7871
// Lifecycle
79-
ctx context.Context
80-
cancel context.CancelFunc
81-
wg sync.WaitGroup
82-
retriesBeforeHalt map[uint64]uint64
72+
ctx context.Context
73+
cancel context.CancelFunc
74+
wg sync.WaitGroup
8375
}
8476

8577
// NewSyncer creates a new block syncer
@@ -98,21 +90,20 @@ func NewSyncer(
9890
errorCh chan<- error,
9991
) *Syncer {
10092
return &Syncer{
101-
store: store,
102-
exec: exec,
103-
da: da,
104-
cache: cache,
105-
metrics: metrics,
106-
config: config,
107-
genesis: genesis,
108-
options: options,
109-
headerStore: headerStore,
110-
dataStore: dataStore,
111-
lastStateMtx: &sync.RWMutex{},
112-
heightInCh: make(chan common.DAHeightEvent, 10_000),
113-
errorCh: errorCh,
114-
logger: logger.With().Str("component", "syncer").Logger(),
115-
retriesBeforeHalt: make(map[uint64]uint64),
93+
store: store,
94+
exec: exec,
95+
da: da,
96+
cache: cache,
97+
metrics: metrics,
98+
config: config,
99+
genesis: genesis,
100+
options: options,
101+
headerStore: headerStore,
102+
dataStore: dataStore,
103+
lastStateMtx: &sync.RWMutex{},
104+
heightInCh: make(chan common.DAHeightEvent, 10_000),
105+
errorCh: errorCh,
106+
logger: logger.With().Str("component", "syncer").Logger(),
116107
}
117108
}
118109

@@ -489,19 +480,11 @@ func (s *Syncer) applyBlock(header types.Header, data *types.Data, currentState
489480

490481
// Execute transactions
491482
ctx := context.WithValue(s.ctx, types.HeaderContextKey, header)
492-
newAppHash, _, err := s.exec.ExecuteTxs(ctx, rawTxs, header.Height(),
493-
header.Time(), currentState.AppHash)
483+
newAppHash, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState)
494484
if err != nil {
495-
s.retriesBeforeHalt[header.Height()]++
496-
if s.retriesBeforeHalt[header.Height()] > maxRetriesBeforeHalt {
497-
s.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
498-
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
499-
}
500-
501-
time.Sleep(maxRetriesTimeout) // sleep before retrying
502-
return types.State{}, fmt.Errorf("failed to execute transactions (retry %d / %d): %w", s.retriesBeforeHalt[header.Height()], maxRetriesBeforeHalt, err)
485+
s.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
486+
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
503487
}
504-
delete(s.retriesBeforeHalt, header.Height())
505488

506489
// Create new state
507490
newState, err := currentState.NextState(header, newAppHash)
@@ -512,6 +495,35 @@ func (s *Syncer) applyBlock(header types.Header, data *types.Data, currentState
512495
return newState, nil
513496
}
514497

498+
// executeTxsWithRetry executes transactions with retry logic
499+
func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, header types.Header, currentState types.State) ([]byte, error) {
500+
for attempt := 1; attempt <= common.MaxRetriesBeforeHalt; attempt++ {
501+
newAppHash, _, err := s.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), currentState.AppHash)
502+
if err != nil {
503+
if attempt == common.MaxRetriesBeforeHalt {
504+
return nil, fmt.Errorf("failed to execute transactions: %w", err)
505+
}
506+
507+
s.logger.Error().Err(err).
508+
Int("attempt", attempt).
509+
Int("max_attempts", common.MaxRetriesBeforeHalt).
510+
Uint64("height", header.Height()).
511+
Msg("failed to execute transactions, retrying")
512+
513+
select {
514+
case <-time.After(common.MaxRetriesTimeout):
515+
continue
516+
case <-s.ctx.Done():
517+
return nil, fmt.Errorf("context cancelled during retry: %w", s.ctx.Err())
518+
}
519+
}
520+
521+
return newAppHash, nil
522+
}
523+
524+
return nil, nil
525+
}
526+
515527
// validateBlock validates a synced block
516528
func (s *Syncer) validateBlock(
517529
lastState types.State,

block/internal/syncing/syncer_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package syncing
33
import (
44
"context"
55
crand "crypto/rand"
6+
"errors"
67
"testing"
78
"time"
89

@@ -376,3 +377,92 @@ func TestSyncLoopPersistState(t *testing.T) {
376377

377378
t.Log("syncLoop exited")
378379
}
380+
381+
func TestSyncer_executeTxsWithRetry(t *testing.T) {
382+
t.Parallel()
383+
384+
tests := []struct {
385+
name string
386+
setupMock func(*testmocks.MockExecutor)
387+
expectSuccess bool
388+
expectHash []byte
389+
expectError string
390+
}{
391+
{
392+
name: "success on first attempt",
393+
setupMock: func(exec *testmocks.MockExecutor) {
394+
exec.On("ExecuteTxs", mock.Anything, mock.Anything, uint64(100), mock.Anything, mock.Anything).
395+
Return([]byte("new-hash"), uint64(0), nil).Once()
396+
},
397+
expectSuccess: true,
398+
expectHash: []byte("new-hash"),
399+
},
400+
{
401+
name: "success on second attempt",
402+
setupMock: func(exec *testmocks.MockExecutor) {
403+
exec.On("ExecuteTxs", mock.Anything, mock.Anything, uint64(100), mock.Anything, mock.Anything).
404+
Return([]byte(nil), uint64(0), errors.New("temporary failure")).Once()
405+
exec.On("ExecuteTxs", mock.Anything, mock.Anything, uint64(100), mock.Anything, mock.Anything).
406+
Return([]byte("new-hash"), uint64(0), nil).Once()
407+
},
408+
expectSuccess: true,
409+
expectHash: []byte("new-hash"),
410+
},
411+
{
412+
name: "success on third attempt",
413+
setupMock: func(exec *testmocks.MockExecutor) {
414+
exec.On("ExecuteTxs", mock.Anything, mock.Anything, uint64(100), mock.Anything, mock.Anything).
415+
Return([]byte(nil), uint64(0), errors.New("temporary failure")).Times(2)
416+
exec.On("ExecuteTxs", mock.Anything, mock.Anything, uint64(100), mock.Anything, mock.Anything).
417+
Return([]byte("new-hash"), uint64(0), nil).Once()
418+
},
419+
expectSuccess: true,
420+
expectHash: []byte("new-hash"),
421+
},
422+
{
423+
name: "failure after max retries",
424+
setupMock: func(exec *testmocks.MockExecutor) {
425+
exec.On("ExecuteTxs", mock.Anything, mock.Anything, uint64(100), mock.Anything, mock.Anything).
426+
Return([]byte(nil), uint64(0), errors.New("persistent failure")).Times(common.MaxRetriesBeforeHalt)
427+
},
428+
expectSuccess: false,
429+
expectError: "failed to execute transactions",
430+
},
431+
}
432+
433+
for _, tt := range tests {
434+
t.Run(tt.name, func(t *testing.T) {
435+
t.Parallel()
436+
437+
ctx := context.Background()
438+
exec := testmocks.NewMockExecutor(t)
439+
tt.setupMock(exec)
440+
441+
s := &Syncer{
442+
exec: exec,
443+
ctx: ctx,
444+
logger: zerolog.Nop(),
445+
}
446+
447+
rawTxs := [][]byte{[]byte("tx1"), []byte("tx2")}
448+
header := types.Header{
449+
BaseHeader: types.BaseHeader{Height: 100, Time: uint64(time.Now().UnixNano())},
450+
}
451+
currentState := types.State{AppHash: []byte("current-hash")}
452+
453+
result, err := s.executeTxsWithRetry(ctx, rawTxs, header, currentState)
454+
455+
if tt.expectSuccess {
456+
require.NoError(t, err)
457+
assert.Equal(t, tt.expectHash, result)
458+
} else {
459+
require.Error(t, err)
460+
if tt.expectError != "" {
461+
assert.Contains(t, err.Error(), tt.expectError)
462+
}
463+
}
464+
465+
exec.AssertExpectations(t)
466+
})
467+
}
468+
}

0 commit comments

Comments
 (0)