Skip to content

Commit 87aa52f

Browse files
authored
refactor(syncing): remove retry da retriever (#2711)
## Overview The da height is never bumped if a blob has not been fetched. It is bumped at successful blob retrieval (see syncer): an empty blob or a valid blobs bumps it. Additionally we already have a backoff for height in the future in the syncer. This system has been applied to all errors, which makes retries faster, as it waits for DA block time. We should remove the retry logic within the retriever, as we do still want to check p2p prior to that. We should automatically retry already at the next loop iteration - https://github.com/evstack/ev-node/blob/v1.0.0-beta.5/block/internal/syncing/syncer.go#L281-L283 - https://github.com/evstack/ev-node/blob/v1.0.0-beta.5/block/internal/syncing/syncer.go#L302-L303
1 parent fa9a03f commit 87aa52f

File tree

6 files changed

+633
-316
lines changed

6 files changed

+633
-316
lines changed

block/internal/syncing/da_retriever.go

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ import (
2020
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
2121
)
2222

23-
const (
24-
dAFetcherTimeout = 30 * time.Second
25-
dAFetcherRetries = 10
26-
)
23+
const dAFetcherTimeout = 10 * time.Second
2724

2825
// DARetriever handles DA retrieval operations for syncing
2926
type DARetriever struct {
@@ -70,49 +67,34 @@ func NewDARetriever(
7067
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events
7168
func (r *DARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
7269
r.logger.Debug().Uint64("da_height", daHeight).Msg("retrieving from DA")
70+
ctx, cancel := context.WithTimeout(ctx, dAFetcherTimeout)
71+
defer cancel()
7372

74-
var err error
75-
for retry := 0; retry < dAFetcherRetries; retry++ {
76-
select {
77-
case <-ctx.Done():
78-
return nil, ctx.Err()
79-
default:
80-
}
81-
82-
blobsResp, fetchErr := r.fetchBlobs(ctx, daHeight)
83-
if fetchErr == nil {
84-
if blobsResp.Code == coreda.StatusNotFound {
85-
r.logger.Debug().Uint64("da_height", daHeight).Msg("no blob data found")
86-
return nil, coreda.ErrBlobNotFound
87-
}
88-
89-
r.logger.Debug().Int("blobs", len(blobsResp.Data)).Uint64("da_height", daHeight).Msg("retrieved blob data")
90-
events := r.processBlobs(ctx, blobsResp.Data, daHeight)
91-
return events, nil
92-
}
93-
94-
if strings.Contains(fetchErr.Error(), coreda.ErrHeightFromFuture.Error()) {
73+
blobsResp, err := r.fetchBlobs(ctx, daHeight)
74+
if err != nil {
75+
if strings.Contains(err.Error(), coreda.ErrHeightFromFuture.Error()) {
9576
return nil, fmt.Errorf("%w: height from future", coreda.ErrHeightFromFuture)
9677
}
9778

98-
err = errors.Join(err, fetchErr)
79+
return nil, err
80+
}
9981

100-
// Delay before retrying
101-
select {
102-
case <-ctx.Done():
103-
return nil, err
104-
case <-time.After(100 * time.Millisecond):
105-
}
82+
// Check for context cancellation upfront
83+
if err := ctx.Err(); err != nil {
84+
return nil, err
85+
}
86+
87+
if blobsResp.Code == coreda.StatusNotFound {
88+
r.logger.Debug().Uint64("da_height", daHeight).Msg("no blob data found")
89+
return nil, coreda.ErrBlobNotFound
10690
}
10791

108-
return nil, err
92+
r.logger.Debug().Int("blobs", len(blobsResp.Data)).Uint64("da_height", daHeight).Msg("retrieved blob data")
93+
return r.processBlobs(ctx, blobsResp.Data, daHeight), nil
10994
}
11095

11196
// fetchBlobs retrieves blobs from the DA layer
11297
func (r *DARetriever) fetchBlobs(ctx context.Context, daHeight uint64) (coreda.ResultRetrieve, error) {
113-
ctx, cancel := context.WithTimeout(ctx, dAFetcherTimeout)
114-
defer cancel()
115-
11698
// Retrieve from both namespaces
11799
headerRes := types.RetrieveWithHelpers(ctx, r.da, r.logger, daHeight, r.namespaceBz)
118100

block/internal/syncing/da_retriever_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,61 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) {
105105
assert.Nil(t, events)
106106
}
107107

108+
func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) {
109+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
110+
st := store.New(ds)
111+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
112+
require.NoError(t, err)
113+
114+
mockDA := testmocks.NewMockDA(t)
115+
116+
// Mock GetIDs to hang longer than the timeout
117+
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
118+
Run(func(ctx context.Context, height uint64, namespace []byte) {
119+
<-ctx.Done()
120+
}).
121+
Return(nil, context.DeadlineExceeded).Maybe()
122+
123+
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
124+
125+
start := time.Now()
126+
events, err := r.RetrieveFromDA(context.Background(), 42)
127+
duration := time.Since(start)
128+
129+
// Verify error is returned and contains deadline exceeded information
130+
require.Error(t, err)
131+
assert.Contains(t, err.Error(), "DA retrieval failed")
132+
assert.Contains(t, err.Error(), "context deadline exceeded")
133+
assert.Len(t, events, 0)
134+
135+
// Verify timeout occurred approximately at expected time (with some tolerance)
136+
assert.Greater(t, duration, 9*time.Second, "should timeout after approximately 10 seconds")
137+
assert.Less(t, duration, 12*time.Second, "should not take much longer than timeout")
138+
}
139+
140+
func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) {
141+
ds := dssync.MutexWrap(datastore.NewMapDatastore())
142+
st := store.New(ds)
143+
cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop())
144+
require.NoError(t, err)
145+
146+
mockDA := testmocks.NewMockDA(t)
147+
148+
// Mock GetIDs to immediately return context deadline exceeded
149+
mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything).
150+
Return(nil, context.DeadlineExceeded).Maybe()
151+
152+
r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
153+
154+
events, err := r.RetrieveFromDA(context.Background(), 42)
155+
156+
// Verify error is returned and contains deadline exceeded information
157+
require.Error(t, err)
158+
assert.Contains(t, err.Error(), "DA retrieval failed")
159+
assert.Contains(t, err.Error(), "context deadline exceeded")
160+
assert.Len(t, events, 0)
161+
}
162+
108163
func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
109164
ds := dssync.MutexWrap(datastore.NewMapDatastore())
110165
st := store.New(ds)

block/internal/syncing/syncer.go

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"strings"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
goheader "github.com/celestiaorg/go-header"
@@ -30,6 +31,9 @@ type p2pHandler interface {
3031
ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
3132
}
3233

34+
// maxRetriesBeforeHalt is the maximum number of retries against the execution client before halting the syncer.
35+
const maxRetriesBeforeHalt = 3
36+
3337
// Syncer handles block synchronization from DA and P2P sources.
3438
type Syncer struct {
3539
// Core components
@@ -51,8 +55,7 @@ type Syncer struct {
5155
lastStateMtx *sync.RWMutex
5256

5357
// DA state
54-
daHeight uint64
55-
daStateMtx *sync.RWMutex
58+
daHeight uint64
5659

5760
// P2P stores
5861
headerStore goheader.Store[*types.SignedHeader]
@@ -70,9 +73,10 @@ type Syncer struct {
7073
logger zerolog.Logger
7174

7275
// Lifecycle
73-
ctx context.Context
74-
cancel context.CancelFunc
75-
wg sync.WaitGroup
76+
ctx context.Context
77+
cancel context.CancelFunc
78+
wg sync.WaitGroup
79+
retriesBeforeHalt map[uint64]uint64
7680
}
7781

7882
// NewSyncer creates a new block syncer
@@ -91,21 +95,21 @@ func NewSyncer(
9195
errorCh chan<- error,
9296
) *Syncer {
9397
return &Syncer{
94-
store: store,
95-
exec: exec,
96-
da: da,
97-
cache: cache,
98-
metrics: metrics,
99-
config: config,
100-
genesis: genesis,
101-
options: options,
102-
headerStore: headerStore,
103-
dataStore: dataStore,
104-
lastStateMtx: &sync.RWMutex{},
105-
daStateMtx: &sync.RWMutex{},
106-
heightInCh: make(chan common.DAHeightEvent, 10_000),
107-
errorCh: errorCh,
108-
logger: logger.With().Str("component", "syncer").Logger(),
98+
store: store,
99+
exec: exec,
100+
da: da,
101+
cache: cache,
102+
metrics: metrics,
103+
config: config,
104+
genesis: genesis,
105+
options: options,
106+
headerStore: headerStore,
107+
dataStore: dataStore,
108+
lastStateMtx: &sync.RWMutex{},
109+
heightInCh: make(chan common.DAHeightEvent, 10_000),
110+
errorCh: errorCh,
111+
logger: logger.With().Str("component", "syncer").Logger(),
112+
retriesBeforeHalt: make(map[uint64]uint64),
109113
}
110114
}
111115

@@ -166,16 +170,12 @@ func (s *Syncer) SetLastState(state types.State) {
166170

167171
// GetDAHeight returns the current DA height
168172
func (s *Syncer) GetDAHeight() uint64 {
169-
s.daStateMtx.RLock()
170-
defer s.daStateMtx.RUnlock()
171-
return s.daHeight
173+
return atomic.LoadUint64(&s.daHeight)
172174
}
173175

174176
// SetDAHeight updates the DA height
175177
func (s *Syncer) SetDAHeight(height uint64) {
176-
s.daStateMtx.Lock()
177-
defer s.daStateMtx.Unlock()
178-
s.daHeight = height
178+
atomic.StoreUint64(&s.daHeight, height)
179179
}
180180

181181
// initializeState loads the current sync state
@@ -246,15 +246,13 @@ func (s *Syncer) syncLoop() {
246246
lastHeaderHeight := initialHeight
247247
lastDataHeight := initialHeight
248248

249-
// Backoff control when DA replies with height-from-future
249+
// Backoff control when DA replies with errors
250250
var hffDelay time.Duration
251251
var nextDARequestAt time.Time
252252

253253
blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration)
254254
defer blockTicker.Stop()
255255

256-
// TODO: we should request to see what the head of the chain is at
257-
// then we know if we are falling behind or in sync mode
258256
for {
259257
select {
260258
case <-s.ctx.Done():
@@ -265,26 +263,34 @@ func (s *Syncer) syncLoop() {
265263
s.processPendingEvents()
266264

267265
now := time.Now()
266+
daHeight := s.GetDAHeight()
267+
268268
// Respect backoff window if set
269269
if nextDARequestAt.IsZero() || now.After(nextDARequestAt) || now.Equal(nextDARequestAt) {
270270
// Retrieve from DA as fast as possible (unless throttled by HFF)
271-
events, err := s.daRetriever.RetrieveFromDA(s.ctx, s.GetDAHeight())
271+
// DaHeight is only increased on successful retrieval, it will retry on failure at the next iteration
272+
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
272273
if err != nil {
273-
if s.isHeightFromFutureError(err) {
274+
if errors.Is(err, coreda.ErrBlobNotFound) {
275+
// no data at this height, increase DA height
276+
// we do still want to check p2p
277+
s.SetDAHeight(daHeight + 1)
278+
279+
// Reset backoff on success
280+
nextDARequestAt = time.Time{}
281+
} else {
274282
// Back off exactly by DA block time to avoid overloading
275283
hffDelay = s.config.DA.BlockTime.Duration
276284
if hffDelay <= 0 {
277285
hffDelay = 2 * time.Second
278286
}
279-
s.logger.Debug().Dur("delay", hffDelay).Uint64("da_height", s.GetDAHeight()).Msg("height from future; backing off DA requests")
280287
nextDARequestAt = now.Add(hffDelay)
281-
} else if errors.Is(err, coreda.ErrBlobNotFound) {
282-
// no data at this height, increase DA height
283-
s.SetDAHeight(s.GetDAHeight() + 1)
284-
} else {
285-
// Non-HFF errors: do not backoff artificially
286-
nextDARequestAt = time.Time{}
287-
s.logger.Error().Err(err).Msg("failed to retrieve from DA")
288+
289+
if s.isHeightFromFutureError(err) {
290+
s.logger.Debug().Dur("delay", hffDelay).Uint64("da_height", daHeight).Msg("height from future; backing off DA requests")
291+
} else {
292+
s.logger.Error().Err(err).Dur("delay", hffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests")
293+
}
288294
}
289295
} else {
290296
// Reset backoff on success
@@ -300,8 +306,8 @@ func (s *Syncer) syncLoop() {
300306
}
301307

302308
// increment DA height on successful retrieval and continue immediately
303-
s.SetDAHeight(s.GetDAHeight() + 1)
304-
continue
309+
s.SetDAHeight(daHeight + 1)
310+
continue // event sent, no need to check p2p
305311
}
306312
}
307313

@@ -469,9 +475,15 @@ func (s *Syncer) applyBlock(header types.Header, data *types.Data, currentState
469475
newAppHash, _, err := s.exec.ExecuteTxs(ctx, rawTxs, header.Height(),
470476
header.Time(), currentState.AppHash)
471477
if err != nil {
472-
s.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
473-
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
478+
s.retriesBeforeHalt[header.Height()]++
479+
if s.retriesBeforeHalt[header.Height()] > maxRetriesBeforeHalt {
480+
s.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
481+
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
482+
}
483+
484+
return types.State{}, fmt.Errorf("failed to execute transactions (retry %d / %d): %w", s.retriesBeforeHalt[header.Height()], maxRetriesBeforeHalt, err)
474485
}
486+
delete(s.retriesBeforeHalt, header.Height())
475487

476488
// Create new state
477489
newState, err := currentState.NextState(header, newAppHash)

0 commit comments

Comments
 (0)