@@ -18,6 +18,7 @@ import (
18
18
coresequencer "github.com/evstack/ev-node/core/sequencer"
19
19
"github.com/evstack/ev-node/pkg/config"
20
20
"github.com/evstack/ev-node/pkg/genesis"
21
+ "github.com/evstack/ev-node/pkg/raft"
21
22
"github.com/evstack/ev-node/pkg/signer"
22
23
"github.com/evstack/ev-node/pkg/store"
23
24
"github.com/evstack/ev-node/types"
@@ -28,6 +29,13 @@ type broadcaster[T any] interface {
28
29
WriteToStoreAndBroadcast (ctx context.Context , payload T ) error
29
30
}
30
31
32
+ // RaftNode interface for leader election and state replication
33
+ type RaftNode interface {
34
+ IsLeader () bool
35
+ Propose (ctx context.Context , data []byte ) error
36
+ GetStateMachine () interface {}
37
+ }
38
+
31
39
// Executor handles block production, transaction processing, and state management
32
40
type Executor struct {
33
41
// Core components
@@ -64,6 +72,9 @@ type Executor struct {
64
72
ctx context.Context
65
73
cancel context.CancelFunc
66
74
wg sync.WaitGroup
75
+
76
+ // Raft for leader election and state replication
77
+ raftNode RaftNode
67
78
}
68
79
69
80
// NewExecutor creates a new block executor.
@@ -86,6 +97,7 @@ func NewExecutor(
86
97
logger zerolog.Logger ,
87
98
options common.BlockOptions ,
88
99
errorCh chan <- error ,
100
+ raftNode RaftNode ,
89
101
) (* Executor , error ) {
90
102
if signer == nil {
91
103
return nil , errors .New ("signer cannot be nil" )
@@ -116,6 +128,7 @@ func NewExecutor(
116
128
txNotifyCh : make (chan struct {}, 1 ),
117
129
errorCh : errorCh ,
118
130
logger : logger .With ().Str ("component" , "executor" ).Logger (),
131
+ raftNode : raftNode ,
119
132
}, nil
120
133
}
121
134
@@ -291,6 +304,34 @@ func (e *Executor) produceBlock() error {
291
304
}
292
305
}()
293
306
307
+ // Check leadership if raft is enabled
308
+ if e .raftNode != nil {
309
+ if ! e .raftNode .IsLeader () {
310
+ e .logger .Debug ().Msg ("not leader, skipping block production" )
311
+ return nil
312
+ }
313
+
314
+ // New leader catch-up: apply latest block from raft if behind
315
+ if sm := e .raftNode .GetStateMachine (); sm != nil {
316
+ if blockSM , ok := sm .(* raft.BlockStateMachine ); ok {
317
+ raftHeader , raftData := blockSM .GetLastBlock ()
318
+ if raftHeader != nil && raftData != nil {
319
+ currentState := e .GetLastState ()
320
+ if raftHeader .Height () > currentState .LastBlockHeight {
321
+ e .logger .Info ().
322
+ Uint64 ("current_height" , currentState .LastBlockHeight ).
323
+ Uint64 ("raft_height" , raftHeader .Height ()).
324
+ Msg ("new leader catching up from raft" )
325
+
326
+ if err := e .applyRaftBlock (raftHeader , raftData ); err != nil {
327
+ return fmt .Errorf ("catch up from raft: %w" , err )
328
+ }
329
+ }
330
+ }
331
+ }
332
+ }
333
+ }
334
+
294
335
currentState := e .GetLastState ()
295
336
newHeight := currentState .LastBlockHeight + 1
296
337
@@ -378,6 +419,21 @@ func (e *Executor) produceBlock() error {
378
419
return fmt .Errorf ("failed to update state: %w" , err )
379
420
}
380
421
422
+ // Replicate full block via raft before broadcasting to P2P
423
+ // This ensures follower nodes have the full block data and can apply it
424
+ if e .raftNode != nil {
425
+ blockStateData , err := raft .CreateBlockStateData (header , data )
426
+ if err != nil {
427
+ return fmt .Errorf ("failed to create block state data: %w" , err )
428
+ }
429
+
430
+ if err := e .raftNode .Propose (e .ctx , blockStateData ); err != nil {
431
+ return fmt .Errorf ("failed to propose block to raft: %w" , err )
432
+ }
433
+
434
+ e .logger .Debug ().Uint64 ("height" , newHeight ).Msg ("full block replicated via raft" )
435
+ }
436
+
381
437
// broadcast header and data to P2P network
382
438
g , ctx := errgroup .WithContext (e .ctx )
383
439
g .Go (func () error { return e .headerBroadcaster .WriteToStoreAndBroadcast (ctx , header ) })
@@ -629,6 +685,56 @@ func (e *Executor) recordBlockMetrics(data *types.Data) {
629
685
e .metrics .CommittedHeight .Set (float64 (data .Metadata .Height ))
630
686
}
631
687
688
+ // applyRaftBlock applies a block received via raft (for follower nodes)
689
+ func (e * Executor ) applyRaftBlock (header * types.SignedHeader , data * types.Data ) error {
690
+ e .logger .Info ().Uint64 ("height" , header .Height ()).Msg ("applying block from raft" )
691
+
692
+ currentState := e .GetLastState ()
693
+
694
+ // Skip if already processed
695
+ if header .Height () <= currentState .LastBlockHeight {
696
+ e .logger .Debug ().Uint64 ("height" , header .Height ()).Msg ("block already applied, skipping" )
697
+ return nil
698
+ }
699
+
700
+ // Validate block
701
+ if err := e .validateBlock (currentState , header , data ); err != nil {
702
+ return fmt .Errorf ("validate block: %w" , err )
703
+ }
704
+
705
+ // Apply block to execution client
706
+ newState , err := e .applyBlock (e .ctx , header .Header , data )
707
+ if err != nil {
708
+ return fmt .Errorf ("apply block: %w" , err )
709
+ }
710
+
711
+ // Save block data
712
+ if err := e .store .SaveBlockData (e .ctx , header , data , & header .Signature ); err != nil {
713
+ return fmt .Errorf ("save block data: %w" , err )
714
+ }
715
+
716
+ // Update store height
717
+ if err := e .store .SetHeight (e .ctx , header .Height ()); err != nil {
718
+ return fmt .Errorf ("set height: %w" , err )
719
+ }
720
+
721
+ // Update state
722
+ if err := e .updateState (e .ctx , newState ); err != nil {
723
+ return fmt .Errorf ("update state: %w" , err )
724
+ }
725
+
726
+ e .recordBlockMetrics (data )
727
+
728
+ e .logger .Info ().Uint64 ("height" , header .Height ()).Int ("txs" , len (data .Txs )).Msg ("applied block from raft" )
729
+
730
+ return nil
731
+ }
732
+
733
+ // GetRaftStateMachine returns a reference to the raft block state machine if raft is enabled
734
+ func (e * Executor ) GetRaftStateMachine () interface {} {
735
+ return e .raftNode
736
+ }
737
+
632
738
// BatchData represents batch data from sequencer
633
739
type BatchData struct {
634
740
* coresequencer.Batch
0 commit comments