Skip to content

Commit 70e2161

Browse files
authored
Merge pull request #269 from bootjp/feature/fix-create-node
Refactor test utilities and flaky test
2 parents 18423db + 52dc6c6 commit 70e2161

File tree

1 file changed

+83
-47
lines changed

1 file changed

+83
-47
lines changed

adapter/test_util.go

Lines changed: 83 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -122,43 +122,105 @@ func newNode(grpcAddress, raftAddress, redisAddress, dynamoAddress string, r *ra
122122

123123
//nolint:unparam
124124
func createNode(t *testing.T, n int) ([]Node, []string, []string) {
125-
var grpcAdders []string
126-
var redisAdders []string
127-
var nodes []Node
128-
129125
const (
130126
waitTimeout = 5 * time.Second
131127
waitInterval = 100 * time.Millisecond
132128
)
133129

134-
cfg := raft.Configuration{}
135-
ports := make([]portsAdress, n)
130+
t.Helper()
136131

137132
ctx := context.Background()
138-
var lc net.ListenConfig
139133

140-
// port assign
134+
ports := assignPorts(n)
135+
cfg := buildRaftConfig(n, ports)
136+
nodes, grpcAdders, redisAdders := setupNodes(t, ctx, n, ports, cfg)
137+
138+
waitForNodeListeners(t, ctx, nodes, waitTimeout, waitInterval)
139+
waitForRaftReadiness(t, nodes, waitTimeout, waitInterval)
140+
141+
return nodes, grpcAdders, redisAdders
142+
}
143+
144+
func waitForNodeListeners(t *testing.T, ctx context.Context, nodes []Node, waitTimeout, waitInterval time.Duration) {
145+
t.Helper()
146+
d := &net.Dialer{Timeout: time.Second}
147+
for _, n := range nodes {
148+
assert.Eventually(t, func() bool {
149+
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
150+
if err != nil {
151+
return false
152+
}
153+
_ = conn.Close()
154+
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
155+
if err != nil {
156+
return false
157+
}
158+
_ = conn.Close()
159+
return true
160+
}, waitTimeout, waitInterval)
161+
}
162+
}
163+
164+
func waitForRaftReadiness(t *testing.T, nodes []Node, waitTimeout, waitInterval time.Duration) {
165+
t.Helper()
166+
assert.Eventually(t, func() bool {
167+
return nodes[0].raft.State() == raft.Leader
168+
}, waitTimeout, waitInterval)
169+
170+
expectedLeader := raft.ServerAddress(nodes[0].raftAddress)
171+
assert.Eventually(t, func() bool {
172+
for i, n := range nodes {
173+
state := n.raft.State()
174+
if i == 0 {
175+
if state != raft.Leader {
176+
return false
177+
}
178+
} else if state != raft.Follower {
179+
return false
180+
}
181+
182+
addr, _ := n.raft.LeaderWithID()
183+
if addr != expectedLeader {
184+
return false
185+
}
186+
}
187+
return true
188+
}, waitTimeout, waitInterval)
189+
}
190+
191+
func assignPorts(n int) []portsAdress {
192+
ports := make([]portsAdress, n)
141193
for i := 0; i < n; i++ {
142194
ports[i] = portAssigner()
143195
}
196+
return ports
197+
}
144198

145-
// build raft node config
199+
func buildRaftConfig(n int, ports []portsAdress) raft.Configuration {
200+
cfg := raft.Configuration{}
146201
for i := 0; i < n; i++ {
147-
var suffrage raft.ServerSuffrage
202+
suffrage := raft.Nonvoter
148203
if i == 0 {
149204
suffrage = raft.Voter
150-
} else {
151-
suffrage = raft.Nonvoter
152205
}
153206

154-
server := raft.Server{
207+
cfg.Servers = append(cfg.Servers, raft.Server{
155208
Suffrage: suffrage,
156209
ID: raft.ServerID(strconv.Itoa(i)),
157210
Address: raft.ServerAddress(ports[i].raftAddress),
158-
}
159-
cfg.Servers = append(cfg.Servers, server)
211+
})
160212
}
161213

214+
return cfg
215+
}
216+
217+
func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, cfg raft.Configuration) ([]Node, []string, []string) {
218+
t.Helper()
219+
var grpcAdders []string
220+
var redisAdders []string
221+
var nodes []Node
222+
var lc net.ListenConfig
223+
162224
for i := 0; i < n; i++ {
163225
st := store.NewRbMemoryStore()
164226
trxSt := store.NewMemoryStoreDefaultTTL()
@@ -186,16 +248,16 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
186248

187249
grpcAdders = append(grpcAdders, port.grpcAddress)
188250
redisAdders = append(redisAdders, port.redisAddress)
189-
go func() {
190-
assert.NoError(t, s.Serve(grpcSock))
191-
}()
251+
go func(srv *grpc.Server, lis net.Listener) {
252+
assert.NoError(t, srv.Serve(lis))
253+
}(s, grpcSock)
192254

193255
l, err := lc.Listen(ctx, "tcp", port.redisAddress)
194256
assert.NoError(t, err)
195257
rd := NewRedisServer(l, st, coordinator)
196-
go func() {
197-
assert.NoError(t, rd.Run())
198-
}()
258+
go func(server *RedisServer) {
259+
assert.NoError(t, server.Run())
260+
}(rd)
199261

200262
dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress)
201263
assert.NoError(t, err)
@@ -217,32 +279,6 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
217279
))
218280
}
219281

220-
d := &net.Dialer{Timeout: time.Second}
221-
for _, n := range nodes {
222-
assert.Eventually(t, func() bool {
223-
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
224-
if err != nil {
225-
return false
226-
}
227-
_ = conn.Close()
228-
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
229-
if err != nil {
230-
return false
231-
}
232-
_ = conn.Close()
233-
conn, err = d.DialContext(ctx, "tcp", n.dynamoAddress)
234-
if err != nil {
235-
return false
236-
}
237-
_ = conn.Close()
238-
return true
239-
}, waitTimeout, waitInterval)
240-
}
241-
242-
assert.Eventually(t, func() bool {
243-
return nodes[0].raft.State() == raft.Leader
244-
}, waitTimeout, waitInterval)
245-
246282
return nodes, grpcAdders, redisAdders
247283
}
248284

0 commit comments

Comments
 (0)