Skip to content
65 changes: 56 additions & 9 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand Down Expand Up @@ -57,9 +58,11 @@ const (
// raft and the grpc requested by the client use grpc and are received on the same port
grpcPort = 50000
raftPort = 50000

redisPort = 63790
redisPort = 63790
dynamoPort = 28000

// followers wait longer before starting elections to give the leader time to bootstrap and share config.
followerElectionTimeout = 10 * time.Second
)

var mu sync.Mutex
Expand Down Expand Up @@ -136,6 +139,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
nodes, grpcAdders, redisAdders := setupNodes(t, ctx, n, ports, cfg)

waitForNodeListeners(t, ctx, nodes, waitTimeout, waitInterval)
waitForConfigReplication(t, cfg, nodes, waitTimeout, waitInterval)
waitForRaftReadiness(t, nodes, waitTimeout, waitInterval)

return nodes, grpcAdders, redisAdders
Expand Down Expand Up @@ -163,9 +167,6 @@ func waitForNodeListeners(t *testing.T, ctx context.Context, nodes []Node, waitT

func waitForRaftReadiness(t *testing.T, nodes []Node, waitTimeout, waitInterval time.Duration) {
t.Helper()
assert.Eventually(t, func() bool {
return nodes[0].raft.State() == raft.Leader
}, waitTimeout, waitInterval)

expectedLeader := raft.ServerAddress(nodes[0].raftAddress)
assert.Eventually(t, func() bool {
Expand All @@ -188,6 +189,40 @@ func waitForRaftReadiness(t *testing.T, nodes []Node, waitTimeout, waitInterval
}, waitTimeout, waitInterval)
}

func waitForConfigReplication(t *testing.T, cfg raft.Configuration, nodes []Node, waitTimeout, waitInterval time.Duration) {
t.Helper()

assert.Eventually(t, func() bool {
for _, n := range nodes {
future := n.raft.GetConfiguration()
if future.Error() != nil {
return false
}

current := future.Configuration().Servers
if len(current) != len(cfg.Servers) {
return false
}

for _, expected := range cfg.Servers {
if !containsServer(current, expected) {
return false
}
}
}
return true
}, waitTimeout, waitInterval)
}

func containsServer(servers []raft.Server, expected raft.Server) bool {
for _, s := range servers {
if s.ID == expected.ID && s.Address == expected.Address && s.Suffrage == expected.Suffrage {
return true
}
}
return false
}

func assignPorts(n int) []portsAdress {
ports := make([]portsAdress, n)
for i := 0; i < n; i++ {
Expand All @@ -214,6 +249,8 @@ func buildRaftConfig(n int, ports []portsAdress) raft.Configuration {
return cfg
}

const leaderElectionTimeout = 0 * time.Second

func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, cfg raft.Configuration) ([]Node, []string, []string) {
t.Helper()
var grpcAdders []string
Expand All @@ -228,7 +265,13 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c

port := ports[i]

r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg)
// リーダーが先に投票を開始させる
electionTimeout := leaderElectionTimeout
if i != 0 {
electionTimeout = followerElectionTimeout
}

r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg, electionTimeout)
assert.NoError(t, err)

s := grpc.NewServer()
Expand All @@ -244,7 +287,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
raftadmin.Register(s, r)

grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
assert.NoError(t, err)
require.NoError(t, err)

grpcAdders = append(grpcAdders, port.grpcAddress)
redisAdders = append(redisAdders, port.redisAddress)
Expand All @@ -253,7 +296,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
}(s, grpcSock)

l, err := lc.Listen(ctx, "tcp", port.redisAddress)
assert.NoError(t, err)
require.NoError(t, err)
rd := NewRedisServer(l, st, coordinator)
go func(server *RedisServer) {
assert.NoError(t, server.Run())
Expand Down Expand Up @@ -282,10 +325,14 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
return nodes, grpcAdders, redisAdders
}

func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration) (*raft.Raft, *transport.Manager, error) {
func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration, electionTimeout time.Duration) (*raft.Raft, *transport.Manager, error) {
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(myID)

if electionTimeout > 0 {
c.ElectionTimeout = electionTimeout
}

// this config is for development
ldb := raft.NewInmemStore()
sdb := raft.NewInmemStore()
Expand Down
Loading