Skip to content

Commit 45f1742

Browse files
committed
Add follower election timeout for Raft nodes
1 parent aa59294 commit 45f1742

File tree

2 files changed

+81
-6
lines changed

2 files changed

+81
-6
lines changed

adapter/test_util.go

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/hashicorp/go-hclog"
2121
"github.com/hashicorp/raft"
2222
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
2324
"google.golang.org/grpc"
2425
"google.golang.org/grpc/credentials/insecure"
2526
)
@@ -54,6 +55,9 @@ const (
5455
raftPort = 50000
5556

5657
redisPort = 63790
58+
59+
// followers wait longer before starting elections to give the leader time to bootstrap and share config.
60+
followerElectionTimeout = 30 * time.Second
5761
)
5862

5963
var mu sync.Mutex
@@ -121,12 +125,32 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
121125
cfg := buildRaftConfig(n, ports)
122126
nodes, grpcAdders, redisAdders := setupNodes(t, ctx, n, ports, cfg)
123127

128+
bootstrapFollowers(t, cfg, nodes)
124129
waitForNodeListeners(t, ctx, nodes, waitTimeout, waitInterval)
130+
waitForConfigReplication(t, cfg, nodes, waitTimeout, waitInterval)
125131
waitForRaftReadiness(t, nodes, waitTimeout, waitInterval)
126132

127133
return nodes, grpcAdders, redisAdders
128134
}
129135

136+
func bootstrapFollowers(t *testing.T, cfg raft.Configuration, nodes []Node) {
137+
t.Helper()
138+
for i, n := range nodes {
139+
if i == 0 {
140+
continue
141+
}
142+
143+
future := n.raft.GetConfiguration()
144+
assert.NoError(t, future.Error())
145+
if len(future.Configuration().Servers) != 0 {
146+
continue
147+
}
148+
149+
boot := n.raft.BootstrapCluster(cfg)
150+
assert.NoError(t, boot.Error())
151+
}
152+
}
153+
130154
func waitForNodeListeners(t *testing.T, ctx context.Context, nodes []Node, waitTimeout, waitInterval time.Duration) {
131155
t.Helper()
132156
d := &net.Dialer{Timeout: time.Second}
@@ -171,6 +195,40 @@ func waitForRaftReadiness(t *testing.T, nodes []Node, waitTimeout, waitInterval
171195
}, waitTimeout, waitInterval)
172196
}
173197

198+
func waitForConfigReplication(t *testing.T, cfg raft.Configuration, nodes []Node, waitTimeout, waitInterval time.Duration) {
199+
t.Helper()
200+
201+
assert.Eventually(t, func() bool {
202+
for _, n := range nodes {
203+
future := n.raft.GetConfiguration()
204+
if future.Error() != nil {
205+
return false
206+
}
207+
208+
current := future.Configuration().Servers
209+
if len(current) != len(cfg.Servers) {
210+
return false
211+
}
212+
213+
for _, expected := range cfg.Servers {
214+
if !containsServer(current, expected) {
215+
return false
216+
}
217+
}
218+
}
219+
return true
220+
}, waitTimeout, waitInterval)
221+
}
222+
223+
func containsServer(servers []raft.Server, expected raft.Server) bool {
224+
for _, s := range servers {
225+
if s.ID == expected.ID && s.Address == expected.Address && s.Suffrage == expected.Suffrage {
226+
return true
227+
}
228+
}
229+
return false
230+
}
231+
174232
func assignPorts(n int) []portsAdress {
175233
ports := make([]portsAdress, n)
176234
for i := 0; i < n; i++ {
@@ -197,6 +255,8 @@ func buildRaftConfig(n int, ports []portsAdress) raft.Configuration {
197255
return cfg
198256
}
199257

258+
const leaderElectionTimeout = 10 * time.Second
259+
200260
func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, cfg raft.Configuration) ([]Node, []string, []string) {
201261
t.Helper()
202262
var grpcAdders []string
@@ -211,7 +271,13 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
211271

212272
port := ports[i]
213273

214-
r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg)
274+
// リーダーが先に投票を開始させる
275+
electionTimeout := leaderElectionTimeout
276+
if i != 0 {
277+
electionTimeout = followerElectionTimeout
278+
}
279+
280+
r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg, electionTimeout)
215281
assert.NoError(t, err)
216282

217283
s := grpc.NewServer()
@@ -227,7 +293,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
227293
raftadmin.Register(s, r)
228294

229295
grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
230-
assert.NoError(t, err)
296+
require.NoError(t, err)
231297

232298
grpcAdders = append(grpcAdders, port.grpcAddress)
233299
redisAdders = append(redisAdders, port.redisAddress)
@@ -236,7 +302,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
236302
}(s, grpcSock)
237303

238304
l, err := lc.Listen(ctx, "tcp", port.redisAddress)
239-
assert.NoError(t, err)
305+
require.NoError(t, err)
240306
rd := NewRedisServer(l, st, coordinator)
241307
go func(server *RedisServer) {
242308
assert.NoError(t, server.Run())
@@ -256,10 +322,14 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
256322
return nodes, grpcAdders, redisAdders
257323
}
258324

259-
func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration) (*raft.Raft, *transport.Manager, error) {
325+
func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration, electionTimeout time.Duration) (*raft.Raft, *transport.Manager, error) {
260326
c := raft.DefaultConfig()
261327
c.LocalID = raft.ServerID(myID)
262328

329+
if electionTimeout > 0 {
330+
c.ElectionTimeout = electionTimeout
331+
}
332+
263333
// this config is for development
264334
ldb := raft.NewInmemStore()
265335
sdb := raft.NewInmemStore()

cmd/server/demo.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net"
77
"os"
88
"strconv"
9+
"time"
910

1011
"github.com/Jille/raft-grpc-leader-rpc/leaderhealth"
1112
transport "github.com/Jille/raft-grpc-transport"
@@ -80,7 +81,7 @@ func run(eg *errgroup.Group) error {
8081
trxSt := store.NewMemoryStoreDefaultTTL()
8182
fsm := kv.NewKvFSM(st, trxSt)
8283

83-
r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg)
84+
r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg, 0)
8485
if err != nil {
8586
return errors.WithStack(err)
8687
}
@@ -119,10 +120,14 @@ func run(eg *errgroup.Group) error {
119120
return nil
120121
}
121122

122-
func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration) (*raft.Raft, *transport.Manager, error) {
123+
func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration, electionTimeout time.Duration) (*raft.Raft, *transport.Manager, error) {
123124
c := raft.DefaultConfig()
124125
c.LocalID = raft.ServerID(myID)
125126

127+
if electionTimeout > 0 {
128+
c.ElectionTimeout = electionTimeout
129+
}
130+
126131
// this config is for development
127132
ldb := raft.NewInmemStore()
128133
sdb := raft.NewInmemStore()

0 commit comments

Comments
 (0)