Skip to content

Commit ffe947c

Browse files
committed
x
1 parent f15bf89 commit ffe947c

File tree

6 files changed

+41
-38
lines changed

6 files changed

+41
-38
lines changed

block/internal/common/raft.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,21 @@ package common
22

33
import (
44
"context"
5+
6+
"github.com/evstack/ev-node/pkg/raft"
57
)
68

79
// RaftNode interface for raft consensus integration
810
type RaftNode interface {
911
IsLeader() bool
1012
NodeID() string
11-
GetState() *RaftBlockState
13+
GetState() *raft.RaftBlockState
1214

13-
Broadcast(ctx context.Context, state *RaftBlockState) error
15+
Broadcast(ctx context.Context, state *raft.RaftBlockState) error
1416

15-
SetApplyCallback(ch chan<- RaftApplyMsg)
17+
SetApplyCallback(ch chan<- raft.RaftApplyMsg)
1618
Shutdown() error
1719

1820
AddPeer(nodeID, addr string) error
1921
RemovePeer(nodeID string) error
2022
}
21-
22-
// todo: refactor to use proto
23-
// RaftBlockState represents replicated block state
24-
type RaftBlockState struct {
25-
Height uint64
26-
Hash []byte
27-
Timestamp uint64
28-
Header []byte
29-
Data []byte
30-
}
31-
32-
// RaftApplyMsg is sent when raft applies a log entry
33-
type RaftApplyMsg struct {
34-
Index uint64
35-
State *RaftBlockState
36-
}

block/internal/executing/executor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
coresequencer "github.com/evstack/ev-node/core/sequencer"
1919
"github.com/evstack/ev-node/pkg/config"
2020
"github.com/evstack/ev-node/pkg/genesis"
21+
"github.com/evstack/ev-node/pkg/raft"
2122
"github.com/evstack/ev-node/pkg/signer"
2223
"github.com/evstack/ev-node/pkg/store"
2324
"github.com/evstack/ev-node/types"
@@ -399,7 +400,7 @@ func (e *Executor) produceBlock() error {
399400
return fmt.Errorf("failed to marshal data: %w", err)
400401
}
401402

402-
raftState := &common.RaftBlockState{
403+
raftState := &raft.RaftBlockState{
403404
Height: newHeight,
404405
Hash: header.Hash(),
405406
Timestamp: header.BaseHeader.Time,

block/internal/syncing/raft_retriever.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/evstack/ev-node/block/internal/common"
1010
"github.com/evstack/ev-node/pkg/genesis"
11+
"github.com/evstack/ev-node/pkg/raft"
1112
"github.com/evstack/ev-node/types"
1213
"github.com/rs/zerolog"
1314
)
@@ -55,7 +56,7 @@ func (r *raftRetriever) Start(ctx context.Context) error {
5556
return errors.New("syncer already started")
5657
}
5758
ctx, r.cancel = context.WithCancel(ctx)
58-
applyCh := make(chan common.RaftApplyMsg)
59+
applyCh := make(chan raft.RaftApplyMsg)
5960
r.raftNode.SetApplyCallback(applyCh)
6061

6162
r.wg.Add(1)
@@ -79,7 +80,7 @@ func (r *raftRetriever) Stop() {
7980
}
8081

8182
// raftApplyLoop processes blocks received from raft
82-
func (r *raftRetriever) raftApplyLoop(ctx context.Context, applyCh <-chan common.RaftApplyMsg) {
83+
func (r *raftRetriever) raftApplyLoop(ctx context.Context, applyCh <-chan raft.RaftApplyMsg) {
8384
r.logger.Info().Msg("starting raft apply loop")
8485
defer r.logger.Info().Msg("raft apply loop stopped")
8586

@@ -96,7 +97,7 @@ func (r *raftRetriever) raftApplyLoop(ctx context.Context, applyCh <-chan common
9697
}
9798

9899
// consumeRaftBlock applies a block received from raft consensus
99-
func (r *raftRetriever) consumeRaftBlock(ctx context.Context, state *common.RaftBlockState) error {
100+
func (r *raftRetriever) consumeRaftBlock(ctx context.Context, state *raft.RaftBlockState) error {
100101
r.logger.Debug().Uint64("height", state.Height).Msg("applying raft block")
101102

102103
// Unmarshal header and data

block/public.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package block
22

3-
import "github.com/evstack/ev-node/block/internal/common"
3+
import (
4+
"github.com/evstack/ev-node/block/internal/common"
5+
)
46

57
// BlockOptions defines the options for creating block components
68
type BlockOptions = common.BlockOptions
@@ -25,5 +27,3 @@ func NopMetrics() *Metrics {
2527

2628
// Expose Raft types for consensus integration
2729
type RaftNode = common.RaftNode
28-
type RaftBlockState = common.RaftBlockState
29-
type RaftApplyMsg = common.RaftApplyMsg

pkg/raft/node.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"github.com/hashicorp/raft"
1616
raftboltdb "github.com/hashicorp/raft-boltdb"
1717
"github.com/rs/zerolog"
18-
19-
"github.com/evstack/ev-node/block"
2018
)
2119

2220
type clusterClient interface {
@@ -47,8 +45,8 @@ type Config struct {
4745
// FSM implements raft.FSM for block state
4846
type FSM struct {
4947
logger zerolog.Logger
50-
state *block.RaftBlockState
51-
applyCh chan<- block.RaftApplyMsg
48+
state *RaftBlockState
49+
applyCh chan<- RaftApplyMsg
5250
}
5351

5452
// NewNode creates a new raft node
@@ -63,7 +61,7 @@ func NewNode(cfg *Config, clusterClient clusterClient, logger zerolog.Logger) (*
6361

6462
fsm := &FSM{
6563
logger: logger.With().Str("component", "raft-fsm").Logger(),
66-
state: &block.RaftBlockState{},
64+
state: &RaftBlockState{},
6765
}
6866

6967
logStore, err := raftboltdb.NewBoltStore(filepath.Join(cfg.RaftDir, "raft-log.db"))
@@ -165,7 +163,7 @@ func (n *Node) NodeID() string {
165163
}
166164

167165
// ProposeBlock proposes a block state to be replicated via raft
168-
func (n *Node) Broadcast(ctx context.Context, state *block.RaftBlockState) error {
166+
func (n *Node) Broadcast(ctx context.Context, state *RaftBlockState) error {
169167
if !n.IsLeader() {
170168
return fmt.Errorf("not leader")
171169
}
@@ -184,7 +182,7 @@ func (n *Node) Broadcast(ctx context.Context, state *block.RaftBlockState) error
184182
}
185183

186184
// GetState returns the current replicated state
187-
func (n *Node) GetState() *block.RaftBlockState {
185+
func (n *Node) GetState() *RaftBlockState {
188186
return n.fsm.state
189187
}
190188

@@ -231,13 +229,13 @@ func (n *Node) Shutdown() error {
231229
}
232230

233231
// SetApplyCallback sets a callback to be called when log entries are applied
234-
func (n *Node) SetApplyCallback(ch chan<- block.RaftApplyMsg) {
232+
func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) {
235233
n.fsm.applyCh = ch
236234
}
237235

238236
// Apply implements raft.FSM
239237
func (f *FSM) Apply(log *raft.Log) interface{} {
240-
var state block.RaftBlockState
238+
var state RaftBlockState
241239
if err := json.Unmarshal(log.Data, &state); err != nil {
242240
f.logger.Error().Err(err).Msg("unmarshal block state")
243241
return err
@@ -248,7 +246,7 @@ func (f *FSM) Apply(log *raft.Log) interface{} {
248246

249247
if f.applyCh != nil {
250248
select {
251-
case f.applyCh <- block.RaftApplyMsg{Index: log.Index, State: &state}:
249+
case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}:
252250
default:
253251
f.logger.Warn().Msg("apply channel full, dropping message")
254252
}
@@ -266,7 +264,7 @@ func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
266264
func (f *FSM) Restore(rc io.ReadCloser) error {
267265
defer rc.Close()
268266

269-
var state block.RaftBlockState
267+
var state RaftBlockState
270268
if err := json.NewDecoder(rc).Decode(&state); err != nil {
271269
return fmt.Errorf("decode snapshot: %w", err)
272270
}
@@ -277,7 +275,7 @@ func (f *FSM) Restore(rc io.ReadCloser) error {
277275
}
278276

279277
type fsmSnapshot struct {
280-
state *block.RaftBlockState
278+
state *RaftBlockState
281279
}
282280

283281
func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error {

pkg/raft/types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package raft
2+
3+
// todo: refactor to use proto
4+
// RaftBlockState represents replicated block state
5+
type RaftBlockState struct {
6+
Height uint64
7+
Hash []byte
8+
Timestamp uint64
9+
Header []byte
10+
Data []byte
11+
}
12+
13+
// RaftApplyMsg is sent when raft applies a log entry
14+
type RaftApplyMsg struct {
15+
Index uint64
16+
State *RaftBlockState
17+
}

0 commit comments

Comments
 (0)