diff --git a/adapter/test_util.go b/adapter/test_util.go index 95ea22c..d44af44 100644 --- a/adapter/test_util.go +++ b/adapter/test_util.go @@ -122,43 +122,105 @@ func newNode(grpcAddress, raftAddress, redisAddress, dynamoAddress string, r *ra //nolint:unparam func createNode(t *testing.T, n int) ([]Node, []string, []string) { - var grpcAdders []string - var redisAdders []string - var nodes []Node - const ( waitTimeout = 5 * time.Second waitInterval = 100 * time.Millisecond ) - cfg := raft.Configuration{} - ports := make([]portsAdress, n) + t.Helper() ctx := context.Background() - var lc net.ListenConfig - // port assign + ports := assignPorts(n) + cfg := buildRaftConfig(n, ports) + nodes, grpcAdders, redisAdders := setupNodes(t, ctx, n, ports, cfg) + + waitForNodeListeners(t, ctx, nodes, waitTimeout, waitInterval) + waitForRaftReadiness(t, nodes, waitTimeout, waitInterval) + + return nodes, grpcAdders, redisAdders +} + +func waitForNodeListeners(t *testing.T, ctx context.Context, nodes []Node, waitTimeout, waitInterval time.Duration) { + t.Helper() + d := &net.Dialer{Timeout: time.Second} + for _, n := range nodes { + assert.Eventually(t, func() bool { + conn, err := d.DialContext(ctx, "tcp", n.grpcAddress) + if err != nil { + return false + } + _ = conn.Close() + conn, err = d.DialContext(ctx, "tcp", n.redisAddress) + if err != nil { + return false + } + _ = conn.Close() + return true + }, waitTimeout, waitInterval) + } +} + +func waitForRaftReadiness(t *testing.T, nodes []Node, waitTimeout, waitInterval time.Duration) { + t.Helper() + assert.Eventually(t, func() bool { + return nodes[0].raft.State() == raft.Leader + }, waitTimeout, waitInterval) + + expectedLeader := raft.ServerAddress(nodes[0].raftAddress) + assert.Eventually(t, func() bool { + for i, n := range nodes { + state := n.raft.State() + if i == 0 { + if state != raft.Leader { + return false + } + } else if state != raft.Follower { + return false + } + + addr, _ := n.raft.LeaderWithID() + if addr != expectedLeader { + return false + } + } + return true + }, waitTimeout, waitInterval) +} + +func assignPorts(n int) []portsAdress { + ports := make([]portsAdress, n) for i := 0; i < n; i++ { ports[i] = portAssigner() } + return ports +} - // build raft node config +func buildRaftConfig(n int, ports []portsAdress) raft.Configuration { + cfg := raft.Configuration{} for i := 0; i < n; i++ { - var suffrage raft.ServerSuffrage + suffrage := raft.Nonvoter if i == 0 { suffrage = raft.Voter - } else { - suffrage = raft.Nonvoter } - server := raft.Server{ + cfg.Servers = append(cfg.Servers, raft.Server{ Suffrage: suffrage, ID: raft.ServerID(strconv.Itoa(i)), Address: raft.ServerAddress(ports[i].raftAddress), - } - cfg.Servers = append(cfg.Servers, server) + }) } + return cfg +} + +func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, cfg raft.Configuration) ([]Node, []string, []string) { + t.Helper() + var grpcAdders []string + var redisAdders []string + var nodes []Node + var lc net.ListenConfig + for i := 0; i < n; i++ { st := store.NewRbMemoryStore() trxSt := store.NewMemoryStoreDefaultTTL() @@ -186,16 +248,16 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { grpcAdders = append(grpcAdders, port.grpcAddress) redisAdders = append(redisAdders, port.redisAddress) - go func() { - assert.NoError(t, s.Serve(grpcSock)) - }() + go func(srv *grpc.Server, lis net.Listener) { + assert.NoError(t, srv.Serve(lis)) + }(s, grpcSock) l, err := lc.Listen(ctx, "tcp", port.redisAddress) assert.NoError(t, err) rd := NewRedisServer(l, st, coordinator) - go func() { - assert.NoError(t, rd.Run()) - }() + go func(server *RedisServer) { + assert.NoError(t, server.Run()) + }(rd) dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress) assert.NoError(t, err) @@ -217,32 +279,6 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) { )) } - d := &net.Dialer{Timeout: time.Second} - for _, n := range nodes { - assert.Eventually(t, func() bool { - conn, err := d.DialContext(ctx, "tcp", n.grpcAddress) - if err != nil { - return false - } - _ = conn.Close() - conn, err = d.DialContext(ctx, "tcp", n.redisAddress) - if err != nil { - return false - } - _ = conn.Close() - conn, err = d.DialContext(ctx, "tcp", n.dynamoAddress) - if err != nil { - return false - } - _ = conn.Close() - return true - }, waitTimeout, waitInterval) - } - - assert.Eventually(t, func() bool { - return nodes[0].raft.State() == raft.Leader - }, waitTimeout, waitInterval) - return nodes, grpcAdders, redisAdders }