Skip to content

Commit 27a6b02

Browse files
committed
x
1 parent bd85910 commit 27a6b02

File tree

11 files changed

+812
-115
lines changed

11 files changed

+812
-115
lines changed

block/internal/syncing/syncer.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,28 @@ func (s *Syncer) Start(ctx context.Context) error {
151151

152152
// Stop shuts down the syncing component
153153
func (s *Syncer) Stop() error {
154-
if s.cancel != nil {
155-
s.cancel()
154+
if s.cancel == nil {
155+
return nil
156156
}
157+
s.cancel()
157158
s.wg.Wait()
158159
s.logger.Info().Msg("syncer stopped")
160+
close(s.heightInCh)
161+
s.cancel = nil
159162
return nil
160163
}
161164

165+
var alwaysTickCh chan time.Time
166+
167+
func init() {
168+
alwaysTickCh = make(chan time.Time)
169+
close(alwaysTickCh) // always picked in select
170+
}
171+
172+
func (s *Syncer) HasUnprocessedEvents() bool {
173+
return len(s.heightInCh) != 0 || s.tryFetchFromP2P(&s.lastState.LastBlockHeight, &s.lastState.LastBlockHeight, alwaysTickCh)
174+
}
175+
162176
// GetLastState returns the current state
163177
func (s *Syncer) GetLastState() types.State {
164178
s.lastStateMtx.RLock()
@@ -222,8 +236,10 @@ func (s *Syncer) processLoop() {
222236
select {
223237
case <-s.ctx.Done():
224238
return
225-
case heightEvent := <-s.heightInCh:
226-
s.processHeightEvent(&heightEvent)
239+
case heightEvent, ok := <-s.heightInCh:
240+
if ok {
241+
s.processHeightEvent(&heightEvent)
242+
}
227243
}
228244
}
229245
}
@@ -409,8 +425,7 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
409425
}
410426

411427
// If this is not the next block in sequence, store as pending event
412-
// This check is crucial as try
413-
//SyncNextBlock simply attempts to sync the next block
428+
// This check is crucial as trySyncNextBlock simply attempts to sync the next block
414429
if height != currentHeight+1 {
415430
s.cache.SetPendingEvent(height, event)
416431
s.logger.Debug().Uint64("height", height).Uint64("current_height", currentHeight).Msg("stored as pending event")

node/failover.go

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
package node
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/http"
8+
"time"
9+
10+
"github.com/evstack/ev-node/block"
11+
coreda "github.com/evstack/ev-node/core/da"
12+
coreexecutor "github.com/evstack/ev-node/core/execution"
13+
coresequencer "github.com/evstack/ev-node/core/sequencer"
14+
"github.com/evstack/ev-node/pkg/config"
15+
genesispkg "github.com/evstack/ev-node/pkg/genesis"
16+
"github.com/evstack/ev-node/pkg/p2p"
17+
"github.com/evstack/ev-node/pkg/p2p/key"
18+
rpcserver "github.com/evstack/ev-node/pkg/rpc/server"
19+
"github.com/evstack/ev-node/pkg/signer"
20+
"github.com/evstack/ev-node/pkg/store"
21+
evsync "github.com/evstack/ev-node/pkg/sync"
22+
ds "github.com/ipfs/go-datastore"
23+
"github.com/rs/zerolog"
24+
)
25+
26+
// failoverState collect the components to reset when switching modes.
27+
type failoverState struct {
28+
logger zerolog.Logger
29+
30+
p2pClient *p2p.Client
31+
headerSyncService *evsync.HeaderSyncService
32+
dataSyncService *evsync.DataSyncService
33+
rpcServer *http.Server
34+
bc *block.Components
35+
}
36+
37+
func newSyncMode(
38+
nodeConfig config.Config,
39+
nodeKey *key.NodeKey,
40+
genesis genesispkg.Genesis,
41+
database ds.Batching,
42+
exec coreexecutor.Executor,
43+
da coreda.DA,
44+
logger zerolog.Logger,
45+
rktStore store.Store,
46+
mainKV ds.Batching,
47+
blockMetrics *block.Metrics,
48+
nodeOpts NodeOptions,
49+
raftNode block.RaftNode,
50+
) (*failoverState, error) {
51+
blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) {
52+
return block.NewSyncComponents(
53+
nodeConfig,
54+
genesis,
55+
rktStore,
56+
exec,
57+
da,
58+
headerSyncService.Store(),
59+
dataSyncService.Store(),
60+
logger,
61+
blockMetrics,
62+
nodeOpts.BlockOptions,
63+
raftNode,
64+
)
65+
}
66+
return setupFailoverState(nodeConfig, nodeKey, database, genesis, logger, mainKV, rktStore, blockComponentsFn)
67+
}
68+
func newAggregatorMode(
69+
nodeConfig config.Config,
70+
nodeKey *key.NodeKey,
71+
signer signer.Signer,
72+
genesis genesispkg.Genesis,
73+
database ds.Batching,
74+
exec coreexecutor.Executor,
75+
sequencer coresequencer.Sequencer,
76+
da coreda.DA,
77+
logger zerolog.Logger,
78+
rktStore store.Store,
79+
mainKV ds.Batching,
80+
blockMetrics *block.Metrics,
81+
nodeOpts NodeOptions,
82+
raftNode block.RaftNode,
83+
) (*failoverState, error) {
84+
85+
blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) {
86+
return block.NewAggregatorComponents(
87+
nodeConfig,
88+
genesis,
89+
rktStore,
90+
exec,
91+
sequencer,
92+
da,
93+
signer,
94+
headerSyncService,
95+
dataSyncService,
96+
logger,
97+
blockMetrics,
98+
nodeOpts.BlockOptions,
99+
raftNode,
100+
)
101+
}
102+
103+
return setupFailoverState(nodeConfig, nodeKey, database, genesis, logger, mainKV, rktStore, blockComponentsFn)
104+
}
105+
106+
func setupFailoverState(
107+
nodeConfig config.Config,
108+
nodeKey *key.NodeKey,
109+
database ds.Batching,
110+
genesis genesispkg.Genesis,
111+
logger zerolog.Logger,
112+
mainKV ds.Batching,
113+
rktStore store.Store,
114+
buildComponentsFn func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error),
115+
) (*failoverState, error) {
116+
p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, database, genesis.ChainID, logger, nil)
117+
if err != nil {
118+
return nil, err
119+
}
120+
121+
headerSyncService, err := evsync.NewHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger())
122+
if err != nil {
123+
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
124+
}
125+
126+
dataSyncService, err := evsync.NewDataSyncService(mainKV, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger())
127+
if err != nil {
128+
return nil, fmt.Errorf("error while initializing DataSyncService: %w", err)
129+
}
130+
131+
bestKnownHeightProvider := func() uint64 {
132+
hHeight := headerSyncService.Store().Height()
133+
dHeight := dataSyncService.Store().Height()
134+
return min(hHeight, dHeight)
135+
}
136+
handler, err := rpcserver.NewServiceHandler(rktStore, p2pClient, genesis.ProposerAddress, logger, nodeConfig, bestKnownHeightProvider)
137+
if err != nil {
138+
return nil, fmt.Errorf("error creating RPC handler: %w", err)
139+
}
140+
141+
rpcServer := &http.Server{
142+
Addr: nodeConfig.RPC.Address,
143+
Handler: handler,
144+
ReadTimeout: 10 * time.Second,
145+
WriteTimeout: 10 * time.Second,
146+
IdleTimeout: 120 * time.Second,
147+
}
148+
bc, err := buildComponentsFn(headerSyncService, dataSyncService)
149+
if err != nil {
150+
return nil, fmt.Errorf("build follower components: %w", err)
151+
}
152+
153+
return &failoverState{
154+
logger: logger,
155+
p2pClient: p2pClient,
156+
headerSyncService: headerSyncService,
157+
dataSyncService: dataSyncService,
158+
rpcServer: rpcServer,
159+
bc: bc,
160+
}, nil
161+
}
162+
163+
func (f *failoverState) Run(ctx context.Context) (multiErr error) {
164+
defer f.rpcServer.Shutdown(context.Background()) // nolint: errcheck
165+
166+
if err := f.p2pClient.Start(ctx); err != nil {
167+
return err
168+
}
169+
defer f.p2pClient.Close() // nolint: errcheck
170+
171+
if err := f.headerSyncService.Start(ctx); err != nil {
172+
return fmt.Errorf("error while starting header sync service: %w", err)
173+
}
174+
defer func() {
175+
if err := f.headerSyncService.Stop(context.Background()); err != nil && !errors.Is(err, context.Canceled) {
176+
multiErr = errors.Join(multiErr, fmt.Errorf("stopping header sync: %w", err))
177+
}
178+
}()
179+
180+
if err := f.dataSyncService.Start(ctx); err != nil {
181+
return fmt.Errorf("error while starting data sync service: %w", err)
182+
}
183+
defer func() {
184+
if err := f.dataSyncService.Stop(context.Background()); err != nil && !errors.Is(err, context.Canceled) {
185+
multiErr = errors.Join(multiErr, fmt.Errorf("stopping data sync: %w", err))
186+
}
187+
}()
188+
189+
defer func() {
190+
if f.bc.Syncer != nil {
191+
for f.bc.Syncer.HasUnprocessedEvents() {
192+
// give it some time to gracefully complete
193+
time.Sleep(100 * time.Millisecond)
194+
}
195+
}
196+
if err := f.bc.Stop(); err != nil && !errors.Is(err, context.Canceled) {
197+
multiErr = errors.Join(multiErr, fmt.Errorf("stopping block components: %w", err))
198+
}
199+
}()
200+
201+
errChan := make(chan error)
202+
go func() {
203+
f.logger.Info().Str("addr", f.rpcServer.Addr).Msg("started RPC server")
204+
if err := f.rpcServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
205+
select {
206+
case errChan <- fmt.Errorf("RPC server error: %w", err):
207+
default:
208+
f.logger.Error().Err(err).Msg("RPC server error")
209+
}
210+
}
211+
}()
212+
213+
go func() {
214+
if err := f.bc.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
215+
select {
216+
case errChan <- fmt.Errorf("components started with error: %w", err):
217+
default:
218+
f.logger.Error().Err(err).Msg("Components start error")
219+
}
220+
return
221+
}
222+
select {
223+
case errChan <- nil:
224+
default:
225+
}
226+
}()
227+
228+
return <-errChan
229+
}

0 commit comments

Comments
 (0)