Skip to content

Commit d6c4fd7

Browse files
committed
Refactor node setup with listener binding
1 parent 55ff106 commit d6c4fd7

File tree

1 file changed

+69
-38
lines changed

1 file changed

+69
-38
lines changed

adapter/test_util.go

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
136136
ctx := context.Background()
137137

138138
ports := assignPorts(n)
139-
cfg := buildRaftConfig(n, ports)
140-
nodes, grpcAdders, redisAdders := setupNodes(t, ctx, n, ports, cfg)
139+
nodes, grpcAdders, redisAdders, cfg := setupNodes(t, ctx, n, ports)
141140

142141
waitForNodeListeners(t, ctx, nodes, waitTimeout, waitInterval)
143142
waitForConfigReplication(t, cfg, nodes, waitTimeout, waitInterval)
@@ -146,6 +145,47 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
146145
return nodes, grpcAdders, redisAdders
147146
}
148147

148+
type listeners struct {
149+
grpc net.Listener
150+
redis net.Listener
151+
dynamo net.Listener
152+
}
153+
154+
func bindListeners(ctx context.Context, lc *net.ListenConfig, port portsAdress) (portsAdress, listeners, bool, error) {
155+
grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
156+
if err != nil {
157+
if errors.Is(err, unix.EADDRINUSE) {
158+
return port, listeners{}, true, nil
159+
}
160+
return port, listeners{}, false, errors.WithStack(err)
161+
}
162+
163+
redisSock, err := lc.Listen(ctx, "tcp", port.redisAddress)
164+
if err != nil {
165+
_ = grpcSock.Close()
166+
if errors.Is(err, unix.EADDRINUSE) {
167+
return port, listeners{}, true, nil
168+
}
169+
return port, listeners{}, false, errors.WithStack(err)
170+
}
171+
172+
dynamoSock, err := lc.Listen(ctx, "tcp", port.dynamoAddress)
173+
if err != nil {
174+
_ = grpcSock.Close()
175+
_ = redisSock.Close()
176+
if errors.Is(err, unix.EADDRINUSE) {
177+
return port, listeners{}, true, nil
178+
}
179+
return port, listeners{}, false, errors.WithStack(err)
180+
}
181+
182+
return port, listeners{
183+
grpc: grpcSock,
184+
redis: redisSock,
185+
dynamo: dynamoSock,
186+
}, false, nil
187+
}
188+
149189
func waitForNodeListeners(t *testing.T, ctx context.Context, nodes []Node, waitTimeout, waitInterval time.Duration) {
150190
t.Helper()
151191
d := &net.Dialer{Timeout: time.Second}
@@ -252,53 +292,44 @@ func buildRaftConfig(n int, ports []portsAdress) raft.Configuration {
252292

253293
const leaderElectionTimeout = 0 * time.Second
254294

255-
func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, cfg raft.Configuration) ([]Node, []string, []string) {
295+
func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) ([]Node, []string, []string, raft.Configuration) {
256296
t.Helper()
257297
var grpcAdders []string
258298
var redisAdders []string
259299
var nodes []Node
260-
var lc net.ListenConfig
261-
300+
lc := net.ListenConfig{}
301+
lis := make([]listeners, n)
262302
for i := 0; i < n; i++ {
263-
st := store.NewRbMemoryStore()
264-
trxSt := store.NewMemoryStoreDefaultTTL()
265-
fsm := kv.NewKvFSM(st, trxSt)
266-
267-
var port portsAdress
268-
var grpcSock, redisSock, dynamoSock net.Listener
269-
303+
var (
304+
bound portsAdress
305+
l listeners
306+
retry bool
307+
err error
308+
)
270309
for {
271-
port = ports[i]
272-
var err error
273-
274-
grpcSock, err = lc.Listen(ctx, "tcp", port.grpcAddress)
275-
if err != nil && errors.Is(err, unix.EADDRINUSE) {
276-
ports[i] = portAssigner()
277-
cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress)
278-
continue
279-
}
310+
bound, l, retry, err = bindListeners(ctx, &lc, ports[i])
280311
require.NoError(t, err)
281-
282-
redisSock, err = lc.Listen(ctx, "tcp", port.redisAddress)
283-
if err != nil && errors.Is(err, unix.EADDRINUSE) {
284-
_ = grpcSock.Close()
285-
ports[i] = portAssigner()
286-
cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress)
287-
continue
288-
}
289-
require.NoError(t, err)
290-
291-
dynamoSock, err = lc.Listen(ctx, "tcp", port.dynamoAddress)
292-
if err != nil && errors.Is(err, unix.EADDRINUSE) {
293-
_ = grpcSock.Close()
294-
_ = redisSock.Close()
312+
if retry {
295313
ports[i] = portAssigner()
296-
cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress)
297314
continue
298315
}
299-
require.NoError(t, err)
316+
ports[i] = bound
317+
lis[i] = l
300318
break
301319
}
320+
}
321+
322+
cfg := buildRaftConfig(n, ports)
323+
324+
for i := 0; i < n; i++ {
325+
st := store.NewRbMemoryStore()
326+
trxSt := store.NewMemoryStoreDefaultTTL()
327+
fsm := kv.NewKvFSM(st, trxSt)
328+
329+
port := ports[i]
330+
grpcSock := lis[i].grpc
331+
redisSock := lis[i].redis
332+
dynamoSock := lis[i].dynamo
302333

303334
leaderRedis := map[raft.ServerAddress]string{
304335
raft.ServerAddress(ports[i].raftAddress): ports[i].redisAddress,
@@ -354,7 +385,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
354385
))
355386
}
356387

357-
return nodes, grpcAdders, redisAdders
388+
return nodes, grpcAdders, redisAdders, cfg
358389
}
359390

360391
func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg raft.Configuration, electionTimeout time.Duration) (*raft.Raft, *transport.Manager, error) {

0 commit comments

Comments
 (0)