Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 83 additions & 47 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,43 +122,105 @@ func newNode(grpcAddress, raftAddress, redisAddress, dynamoAddress string, r *ra

//nolint:unparam
func createNode(t *testing.T, n int) ([]Node, []string, []string) {
var grpcAdders []string
var redisAdders []string
var nodes []Node

const (
waitTimeout = 5 * time.Second
waitInterval = 100 * time.Millisecond
)

cfg := raft.Configuration{}
ports := make([]portsAdress, n)
t.Helper()

ctx := context.Background()
var lc net.ListenConfig

// port assign
ports := assignPorts(n)
cfg := buildRaftConfig(n, ports)
nodes, grpcAdders, redisAdders := setupNodes(t, ctx, n, ports, cfg)

waitForNodeListeners(t, ctx, nodes, waitTimeout, waitInterval)
waitForRaftReadiness(t, nodes, waitTimeout, waitInterval)

return nodes, grpcAdders, redisAdders
}

func waitForNodeListeners(t *testing.T, ctx context.Context, nodes []Node, waitTimeout, waitInterval time.Duration) {
t.Helper()
d := &net.Dialer{Timeout: time.Second}
for _, n := range nodes {
assert.Eventually(t, func() bool {
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
if err != nil {
return false
}
_ = conn.Close()
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
if err != nil {
return false
}
_ = conn.Close()
return true
}, waitTimeout, waitInterval)
}
Comment on lines +147 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation waits for each node's listeners sequentially. This can slow down tests, especially with a larger number of nodes. By using t.Run with t.Parallel(), we can check all nodes concurrently, which can significantly speed up the test suite.

Suggested change
for _, n := range nodes {
assert.Eventually(t, func() bool {
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
if err != nil {
return false
}
_ = conn.Close()
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
if err != nil {
return false
}
_ = conn.Close()
return true
}, waitTimeout, waitInterval)
}
for _, node := range nodes {
n := node // capture range variable
t.Run("wait for "+n.grpcAddress, func(t *testing.T) {
t.Parallel()
assert.Eventually(t, func() bool {
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
if err != nil {
return false
}
_ = conn.Close()
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
if err != nil {
return false
}
_ = conn.Close()
return true
}, waitTimeout, waitInterval)
})
}

}

func waitForRaftReadiness(t *testing.T, nodes []Node, waitTimeout, waitInterval time.Duration) {
t.Helper()
assert.Eventually(t, func() bool {
return nodes[0].raft.State() == raft.Leader
}, waitTimeout, waitInterval)
Comment on lines +166 to +168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This assert.Eventually call is redundant because the subsequent assert.Eventually block already checks if nodes[0] is the leader within its loop. Removing this will make the function cleaner and slightly more efficient.


expectedLeader := raft.ServerAddress(nodes[0].raftAddress)
assert.Eventually(t, func() bool {
for i, n := range nodes {
state := n.raft.State()
if i == 0 {
if state != raft.Leader {
return false
}
} else if state != raft.Follower {
return false
}

addr, _ := n.raft.LeaderWithID()
if addr != expectedLeader {
return false
}
}
return true
}, waitTimeout, waitInterval)
}

func assignPorts(n int) []portsAdress {
ports := make([]portsAdress, n)
for i := 0; i < n; i++ {
ports[i] = portAssigner()
}
return ports
}

// build raft node config
func buildRaftConfig(n int, ports []portsAdress) raft.Configuration {
cfg := raft.Configuration{}
for i := 0; i < n; i++ {
var suffrage raft.ServerSuffrage
suffrage := raft.Nonvoter
if i == 0 {
suffrage = raft.Voter
} else {
suffrage = raft.Nonvoter
}

server := raft.Server{
cfg.Servers = append(cfg.Servers, raft.Server{
Suffrage: suffrage,
ID: raft.ServerID(strconv.Itoa(i)),
Address: raft.ServerAddress(ports[i].raftAddress),
}
cfg.Servers = append(cfg.Servers, server)
})
}

return cfg
}

func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, cfg raft.Configuration) ([]Node, []string, []string) {
t.Helper()
var grpcAdders []string
var redisAdders []string
var nodes []Node
var lc net.ListenConfig

for i := 0; i < n; i++ {
st := store.NewRbMemoryStore()
trxSt := store.NewMemoryStoreDefaultTTL()
Expand Down Expand Up @@ -186,16 +248,16 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {

grpcAdders = append(grpcAdders, port.grpcAddress)
redisAdders = append(redisAdders, port.redisAddress)
go func() {
assert.NoError(t, s.Serve(grpcSock))
}()
go func(srv *grpc.Server, lis net.Listener) {
assert.NoError(t, srv.Serve(lis))
}(s, grpcSock)

l, err := lc.Listen(ctx, "tcp", port.redisAddress)
assert.NoError(t, err)
rd := NewRedisServer(l, st, coordinator)
go func() {
assert.NoError(t, rd.Run())
}()
go func(server *RedisServer) {
assert.NoError(t, server.Run())
}(rd)

dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress)
assert.NoError(t, err)
Expand All @@ -217,32 +279,6 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
))
}

d := &net.Dialer{Timeout: time.Second}
for _, n := range nodes {
assert.Eventually(t, func() bool {
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
if err != nil {
return false
}
_ = conn.Close()
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
if err != nil {
return false
}
_ = conn.Close()
conn, err = d.DialContext(ctx, "tcp", n.dynamoAddress)
if err != nil {
return false
}
_ = conn.Close()
return true
}, waitTimeout, waitInterval)
}

assert.Eventually(t, func() bool {
return nodes[0].raft.State() == raft.Leader
}, waitTimeout, waitInterval)

return nodes, grpcAdders, redisAdders
}

Expand Down
Loading