Skip to content

Commit f8db26f

Browse files
committed
x
1 parent ffe947c commit f8db26f

File tree

5 files changed

+43
-25
lines changed

5 files changed

+43
-25
lines changed

node/full.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,13 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod
146146
}
147147

148148
raftCfg := &raftpkg.Config{
149-
NodeID: nodeConfig.Raft.NodeID,
150-
RaftAddr: nodeConfig.Raft.RaftAddr,
151-
RaftDir: raftDir,
152-
Bootstrap: nodeConfig.Raft.Bootstrap,
153-
SnapCount: nodeConfig.Raft.SnapCount,
154-
SendTimeout: nodeConfig.Raft.SendTimeout,
149+
NodeID: nodeConfig.Raft.NodeID,
150+
RaftAddr: nodeConfig.Raft.RaftAddr,
151+
RaftDir: raftDir,
152+
Bootstrap: nodeConfig.Raft.Bootstrap,
153+
SnapCount: nodeConfig.Raft.SnapCount,
154+
SendTimeout: nodeConfig.Raft.SendTimeout,
155+
HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout,
155156
}
156157

157158
if nodeConfig.Raft.Peers != "" {

pkg/config/config.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ const (
142142
FlagRaftSnapCount = FlagPrefixEvnode + "raft.snap_count"
143143
// FlagRaftSendTimeout max time to wait for a message to be sent to a peer
144144
FlagRaftSendTimeout = FlagPrefixEvnode + "raft.send_timeout"
145+
// FlagRaftHeartbeatTimeout is a flag for specifying heartbeat timeout
146+
FlagRaftHeartbeatTimeout = FlagPrefixEvnode + "raft.heartbeat_timeout"
145147
)
146148

147149
// Config stores Rollkit configuration.
@@ -252,14 +254,15 @@ type RPCConfig struct {
252254

253255
// RaftConfig contains all Raft consensus configuration parameters
254256
type RaftConfig struct {
255-
Enable bool `mapstructure:"enable" yaml:"enable" comment:"Enable Raft consensus for leader election and state replication"`
256-
NodeID string `mapstructure:"node_id" yaml:"node_id" comment:"Unique identifier for this node in the Raft cluster"`
257-
RaftAddr string `mapstructure:"raft_addr" yaml:"raft_addr" comment:"Address for Raft communication (host:port)"`
258-
RaftDir string `mapstructure:"raft_dir" yaml:"raft_dir" comment:"Directory for Raft logs and snapshots"`
259-
Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new Raft cluster (only for the first node)"`
260-
Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"`
261-
SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of log entries between snapshots"`
262-
SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"`
257+
Enable bool `mapstructure:"enable" yaml:"enable" comment:"Enable Raft consensus for leader election and state replication"`
258+
NodeID string `mapstructure:"node_id" yaml:"node_id" comment:"Unique identifier for this node in the Raft cluster"`
259+
RaftAddr string `mapstructure:"raft_addr" yaml:"raft_addr" comment:"Address for Raft communication (host:port)"`
260+
RaftDir string `mapstructure:"raft_dir" yaml:"raft_dir" comment:"Directory for Raft logs and snapshots"`
261+
Bootstrap bool `mapstructure:"bootstrap" yaml:"bootstrap" comment:"Bootstrap a new Raft cluster (only for the first node)"`
262+
Peers string `mapstructure:"peers" yaml:"peers" comment:"Comma-separated list of peer Raft addresses (nodeID@host:port)"`
263+
SnapCount uint64 `mapstructure:"snap_count" yaml:"snap_count" comment:"Number of log entries between snapshots"`
264+
SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"`
265+
HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"`
263266
}
264267

265268
// Validate ensures validates the config and ensure that the root directory exists.
@@ -394,6 +397,7 @@ func AddFlags(cmd *cobra.Command) {
394397
cmd.Flags().String(FlagRaftPeers, def.Raft.Peers, "comma-separated list of peer Raft addresses (nodeID@host:port)")
395398
cmd.Flags().Uint64(FlagRaftSnapCount, def.Raft.SnapCount, "number of log entries between snapshots")
396399
cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer")
400+
cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers")
397401
}
398402

399403
// Load loads the node configuration in the following order of precedence:

pkg/config/defaults.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ func DefaultConfig() Config {
7777
Address: "127.0.0.1:7331",
7878
EnableDAVisualization: false,
7979
},
80+
Raft: RaftConfig{
81+
HeartbeatTimeout: 1000 * time.Millisecond,
82+
},
8083
}
8184
}
8285

pkg/raft/node.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,14 @@ type Node struct {
3333

3434
// Config holds raft node configuration
3535
type Config struct {
36-
NodeID string
37-
RaftAddr string
38-
RaftDir string
39-
Bootstrap bool
40-
Peers []string
41-
SnapCount uint64
42-
SendTimeout time.Duration
36+
NodeID string
37+
RaftAddr string
38+
RaftDir string
39+
Bootstrap bool
40+
Peers []string
41+
SnapCount uint64
42+
SendTimeout time.Duration
43+
HeartbeatTimeout time.Duration
4344
}
4445

4546
// FSM implements raft.FSM for block state
@@ -58,6 +59,8 @@ func NewNode(cfg *Config, clusterClient clusterClient, logger zerolog.Logger) (*
5859
raftConfig := raft.DefaultConfig()
5960
raftConfig.LocalID = raft.ServerID(cfg.NodeID)
6061
raftConfig.LogLevel = "INFO"
62+
raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout
63+
raftConfig.LeaderLeaseTimeout = cfg.HeartbeatTimeout / 2
6164

6265
fsm := &FSM{
6366
logger: logger.With().Str("component", "raft-fsm").Logger(),

test/e2e/failover_e2e_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ func TestLeaseFailoverE2E(t *testing.T) {
135135
require.EventuallyWithT(t, func(collect *assert.CollectT) {
136136
newLeader = clusterNodes.Leader(collect)
137137
}, 5*time.Second, 200*time.Millisecond)
138-
t.Logf("+++ New leader: %s within %s\n", newLeader, time.Since(leaderElectionStart))
138+
leaderFailoverTime := time.Since(leaderElectionStart)
139+
t.Logf("+++ New leader: %s within %s\n", newLeader, leaderFailoverTime)
139140
require.NotEqual(t, oldLeader, newLeader)
140141

141142
_, blk2 := submitTxToURL(t, clusterNodes.Details(newLeader).ethClient(t))
@@ -164,6 +165,7 @@ func TestLeaseFailoverE2E(t *testing.T) {
164165

165166
// Cleanup processes
166167
clusterNodes.killAll()
168+
t.Logf("Completed leader change in: %s", leaderFailoverTime)
167169
}
168170

169171
func initChain(t *testing.T, sut *SystemUnderTest, workDir string) {
@@ -222,7 +224,8 @@ func setupRaftSequencerNode(
222224
"--evnode.raft.bootstrap="+strconv.FormatBool(bootstrap),
223225
"--evnode.raft.peers="+strings.Join(raftPeers, ","),
224226
"--evnode.raft.snap_count=10",
225-
"--evnode.raft.send_timeout=5s",
227+
"--evnode.raft.send_timeout=300ms",
228+
"--evnode.raft.heartbeat_timeout=300ms",
226229

227230
"--rollkit.p2p.peers", p2pPeers,
228231
"--rollkit.rpc.address", rpcAddr,
@@ -268,7 +271,11 @@ func submitTxToURL(t *testing.T, client *ethclient.Client) (common.Hash, uint64)
268271

269272
func queryLastDAHeight(t *testing.T, startHeight uint64, jwtSecret string, daAddress string) uint64 {
270273
t.Helper()
271-
client, err := jsonrpc.NewClient(t.Context(), zerolog.New(zerolog.NewTestWriter(t)), daAddress, jwtSecret, 0, 1, 0)
274+
logger := zerolog.Nop()
275+
if testing.Verbose() {
276+
logger = zerolog.New(zerolog.NewTestWriter(t)).Level(zerolog.DebugLevel)
277+
}
278+
client, err := jsonrpc.NewClient(t.Context(), logger, daAddress, jwtSecret, 0, 1, 0)
272279
require.NoError(t, err)
273280
defer client.Close()
274281
var lastDABlock = startHeight
@@ -280,7 +287,7 @@ func queryLastDAHeight(t *testing.T, startHeight uint64, jwtSecret string, daAdd
280287
}
281288
t.Fatal("failed to get IDs:", err)
282289
}
283-
if len(res.IDs) != 0 {
290+
if len(res.IDs) != 0 && testing.Verbose() {
284291
t.Log("+++ DA block: ", lastDABlock, " ids: ", len(res.IDs))
285292
}
286293
lastDABlock++

0 commit comments

Comments
 (0)