Skip to content

Commit 74c078f

Browse files
committed
Handle port conflicts and track first error
1 parent c862035 commit 74c078f

File tree

2 files changed

+51
-16
lines changed

2 files changed

+51
-16
lines changed

adapter/test_util.go

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"testing"
1111
"time"
1212

13+
"golang.org/x/sys/unix"
14+
1315
"github.com/Jille/raft-grpc-leader-rpc/leaderhealth"
1416
transport "github.com/Jille/raft-grpc-transport"
1517
"github.com/Jille/raftadmin"
@@ -258,17 +260,50 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
258260
var nodes []Node
259261
var lc net.ListenConfig
260262

261-
leaderRedis := make(map[raft.ServerAddress]string, n)
262-
for i := 0; i < n; i++ {
263-
leaderRedis[raft.ServerAddress(ports[i].raftAddress)] = ports[i].redisAddress
264-
}
265-
266263
for i := 0; i < n; i++ {
267264
st := store.NewRbMemoryStore()
268265
trxSt := store.NewMemoryStoreDefaultTTL()
269266
fsm := kv.NewKvFSM(st, trxSt)
270267

271-
port := ports[i]
268+
var port portsAdress
269+
var grpcSock, redisSock, dynamoSock net.Listener
270+
271+
for {
272+
port = ports[i]
273+
var err error
274+
275+
grpcSock, err = lc.Listen(ctx, "tcp", port.grpcAddress)
276+
if err != nil && errors.Is(err, unix.EADDRINUSE) {
277+
ports[i] = portAssigner()
278+
cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress)
279+
continue
280+
}
281+
require.NoError(t, err)
282+
283+
redisSock, err = lc.Listen(ctx, "tcp", port.redisAddress)
284+
if err != nil && errors.Is(err, unix.EADDRINUSE) {
285+
_ = grpcSock.Close()
286+
ports[i] = portAssigner()
287+
cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress)
288+
continue
289+
}
290+
require.NoError(t, err)
291+
292+
dynamoSock, err = lc.Listen(ctx, "tcp", port.dynamoAddress)
293+
if err != nil && errors.Is(err, unix.EADDRINUSE) {
294+
_ = grpcSock.Close()
295+
_ = redisSock.Close()
296+
ports[i] = portAssigner()
297+
cfg.Servers[i].Address = raft.ServerAddress(ports[i].raftAddress)
298+
continue
299+
}
300+
require.NoError(t, err)
301+
break
302+
}
303+
304+
leaderRedis := map[raft.ServerAddress]string{
305+
raft.ServerAddress(ports[i].raftAddress): ports[i].redisAddress,
306+
}
272307

273308
// リーダーが先に投票を開始させる
274309
electionTimeout := leaderElectionTimeout
@@ -291,25 +326,18 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress, c
291326
leaderhealth.Setup(r, s, []string{"Example"})
292327
raftadmin.Register(s, r)
293328

294-
grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
295-
require.NoError(t, err)
296-
297329
grpcAdders = append(grpcAdders, port.grpcAddress)
298330
redisAdders = append(redisAdders, port.redisAddress)
299331
go func(srv *grpc.Server, lis net.Listener) {
300332
assert.NoError(t, srv.Serve(lis))
301333
}(s, grpcSock)
302334

303-
l, err := lc.Listen(ctx, "tcp", port.redisAddress)
304-
require.NoError(t, err)
305-
rd := NewRedisServer(l, st, coordinator, leaderRedis)
335+
rd := NewRedisServer(redisSock, st, coordinator, leaderRedis)
306336
go func(server *RedisServer) {
307337
assert.NoError(t, server.Run())
308338
}(rd)
309339

310-
dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress)
311-
assert.NoError(t, err)
312-
ds := NewDynamoDBServer(dl, st, coordinator)
340+
ds := NewDynamoDBServer(dynamoSock, st, coordinator)
313341
go func() {
314342
assert.NoError(t, ds.Run())
315343
}()

kv/shard_router.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Re
5858
return nil, errors.WithStack(err)
5959
}
6060

61+
var firstErr error
6162
var max uint64
6263
for gid, rs := range grouped {
6364
g, ok := s.getGroup(gid)
@@ -66,12 +67,18 @@ func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Re
6667
}
6768
r, err := fn(g, rs)
6869
if err != nil {
69-
return nil, errors.WithStack(err)
70+
if firstErr == nil {
71+
firstErr = errors.WithStack(err)
72+
}
73+
continue
7074
}
7175
if r.CommitIndex > max {
7276
max = r.CommitIndex
7377
}
7478
}
79+
if firstErr != nil {
80+
return nil, firstErr
81+
}
7582
return &TransactionResponse{CommitIndex: max}, nil
7683
}
7784

0 commit comments

Comments
 (0)