Skip to content

Commit 860bb20

Browse files
authored
refactor(block): better sync/process loops (#2688)
## Overview Moves processPendingEvents into the producer loop. Minor refactorings <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent 727e591 commit 860bb20

File tree

7 files changed

+455
-46
lines changed

7 files changed

+455
-46
lines changed

.mockery.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,15 @@ packages:
4646
dir: ./test/mocks
4747
pkgname: mocks
4848
filename: external/hstore.go
49+
github.com/evstack/ev-node/block/internal/syncing:
50+
interfaces:
51+
daRetriever:
52+
config:
53+
dir: ./block/internal/syncing
54+
pkgname: syncing
55+
filename: syncer_mock.go
56+
p2pHandler:
57+
config:
58+
dir: ./block/internal/syncing
59+
pkgname: syncing
60+
filename: syncer_mock.go

apps/evm/single/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ require (
277277
github.com/spf13/cast v1.10.0 // indirect
278278
github.com/spf13/pflag v1.0.10 // indirect
279279
github.com/spf13/viper v1.21.0 // indirect
280+
github.com/stretchr/objx v0.5.2 // indirect
280281
github.com/stretchr/testify v1.11.1 // indirect
281282
github.com/subosito/gotenv v1.6.0 // indirect
282283
github.com/supranational/blst v0.3.14 // indirect

apps/grpc/single/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ require (
130130
github.com/spf13/cast v1.10.0 // indirect
131131
github.com/spf13/pflag v1.0.10 // indirect
132132
github.com/spf13/viper v1.21.0 // indirect
133+
github.com/stretchr/objx v0.5.2 // indirect
133134
github.com/stretchr/testify v1.11.1 // indirect
134135
github.com/subosito/gotenv v1.6.0 // indirect
135136
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect

apps/testapp/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ require (
139139
github.com/spf13/cast v1.10.0 // indirect
140140
github.com/spf13/pflag v1.0.10 // indirect
141141
github.com/spf13/viper v1.21.0 // indirect
142+
github.com/stretchr/objx v0.5.2 // indirect
142143
github.com/subosito/gotenv v1.6.0 // indirect
143144
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
144145
github.com/wlynxg/anet v0.0.5 // indirect

block/internal/syncing/syncer.go

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ import (
2222
"github.com/evstack/ev-node/types"
2323
)
2424

25+
type daRetriever interface {
26+
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
27+
}
28+
type p2pHandler interface {
29+
ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
30+
ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
31+
}
32+
2533
// Syncer handles block synchronization from DA and P2P sources.
2634
type Syncer struct {
2735
// Core components
@@ -51,14 +59,12 @@ type Syncer struct {
5159
dataStore goheader.Store[*types.Data]
5260

5361
// Channels for coordination
54-
heightInCh chan common.DAHeightEvent
55-
headerStoreCh chan struct{}
56-
dataStoreCh chan struct{}
57-
errorCh chan<- error // Channel to report critical execution client failures
62+
heightInCh chan common.DAHeightEvent
63+
errorCh chan<- error // Channel to report critical execution client failures
5864

5965
// Handlers
60-
daRetriever *DARetriever
61-
p2pHandler *P2PHandler
66+
daRetriever daRetriever
67+
p2pHandler p2pHandler
6268

6369
// Logging
6470
logger zerolog.Logger
@@ -85,23 +91,21 @@ func NewSyncer(
8591
errorCh chan<- error,
8692
) *Syncer {
8793
return &Syncer{
88-
store: store,
89-
exec: exec,
90-
da: da,
91-
cache: cache,
92-
metrics: metrics,
93-
config: config,
94-
genesis: genesis,
95-
options: options,
96-
headerStore: headerStore,
97-
dataStore: dataStore,
98-
lastStateMtx: &sync.RWMutex{},
99-
daStateMtx: &sync.RWMutex{},
100-
heightInCh: make(chan common.DAHeightEvent, 10000),
101-
headerStoreCh: make(chan struct{}, 1),
102-
dataStoreCh: make(chan struct{}, 1),
103-
errorCh: errorCh,
104-
logger: logger.With().Str("component", "syncer").Logger(),
94+
store: store,
95+
exec: exec,
96+
da: da,
97+
cache: cache,
98+
metrics: metrics,
99+
config: config,
100+
genesis: genesis,
101+
options: options,
102+
headerStore: headerStore,
103+
dataStore: dataStore,
104+
lastStateMtx: &sync.RWMutex{},
105+
daStateMtx: &sync.RWMutex{},
106+
heightInCh: make(chan common.DAHeightEvent, 10_000),
107+
errorCh: errorCh,
108+
logger: logger.With().Str("component", "syncer").Logger(),
105109
}
106110
}
107111

@@ -212,20 +216,10 @@ func (s *Syncer) processLoop() {
212216
s.logger.Info().Msg("starting process loop")
213217
defer s.logger.Info().Msg("process loop stopped")
214218

215-
blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration)
216-
defer blockTicker.Stop()
217-
218219
for {
219-
// Process pending events from cache on every iteration
220-
s.processPendingEvents()
221-
222220
select {
223221
case <-s.ctx.Done():
224222
return
225-
case <-blockTicker.C:
226-
// Signal P2P stores to check for new data
227-
s.sendNonBlockingSignal(s.headerStoreCh, "header_store")
228-
s.sendNonBlockingSignal(s.dataStoreCh, "data_store")
229223
case heightEvent := <-s.heightInCh:
230224
s.processHeightEvent(&heightEvent)
231225
}
@@ -250,15 +244,19 @@ func (s *Syncer) syncLoop() {
250244
var hffDelay time.Duration
251245
var nextDARequestAt time.Time
252246

247+
blockTicker := time.NewTicker(s.config.Node.BlockTime.Duration)
248+
defer blockTicker.Stop()
249+
253250
// TODO: we should request to see what the head of the chain is at
254251
// then we know if we are falling behind or in sync mode
255-
syncLoop:
256252
for {
257253
select {
258254
case <-s.ctx.Done():
259255
return
260256
default:
261257
}
258+
// Process pending events from cache on every iteration
259+
s.processPendingEvents()
262260

263261
now := time.Now()
264262
// Respect backoff window if set
@@ -291,9 +289,7 @@ syncLoop:
291289
select {
292290
case s.heightInCh <- event:
293291
default:
294-
s.logger.Warn().Msg("height channel full, dropping DA event")
295-
time.Sleep(10 * time.Millisecond)
296-
continue syncLoop
292+
s.cache.SetPendingEvent(event.Header.Height(), &event)
297293
}
298294
}
299295

@@ -303,34 +299,34 @@ syncLoop:
303299
}
304300
}
305301

302+
// Opportunistically process any P2P signals
306303
select {
307-
case <-s.headerStoreCh:
304+
case <-blockTicker.C:
308305
newHeaderHeight := s.headerStore.Height()
309306
if newHeaderHeight > lastHeaderHeight {
310307
events := s.p2pHandler.ProcessHeaderRange(s.ctx, lastHeaderHeight+1, newHeaderHeight)
311308
for _, event := range events {
312309
select {
313310
case s.heightInCh <- event:
314311
default:
315-
s.logger.Warn().Msg("height channel full, dropping P2P header event")
316-
time.Sleep(10 * time.Millisecond)
317-
continue syncLoop
312+
s.cache.SetPendingEvent(event.Header.Height(), &event)
318313
}
319314
}
320-
321315
lastHeaderHeight = newHeaderHeight
322316
}
323-
case <-s.dataStoreCh:
317+
324318
newDataHeight := s.dataStore.Height()
319+
if newDataHeight == newHeaderHeight {
320+
lastDataHeight = newDataHeight
321+
continue
322+
}
325323
if newDataHeight > lastDataHeight {
326324
events := s.p2pHandler.ProcessDataRange(s.ctx, lastDataHeight+1, newDataHeight)
327325
for _, event := range events {
328326
select {
329327
case s.heightInCh <- event:
330328
default:
331-
s.logger.Warn().Msg("height channel full, dropping P2P data event")
332-
time.Sleep(10 * time.Millisecond)
333-
continue syncLoop
329+
s.cache.SetPendingEvent(event.Header.Height(), &event)
334330
}
335331
}
336332
lastDataHeight = newDataHeight

0 commit comments

Comments
 (0)