Skip to content

Commit ce94d05

Browse files
committed
Use ephemeral ports in adapter tests to prevent collisions
1 parent 28b9b34 commit ce94d05

File tree

3 files changed

+71
-95
lines changed

3 files changed

+71
-95
lines changed

adapter/test_util.go

Lines changed: 33 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"log"
66
"net"
77
"strconv"
8-
"sync"
9-
"sync/atomic"
108
"testing"
119
"time"
1210

@@ -19,7 +17,7 @@ import (
1917
"github.com/cockroachdb/errors"
2018
"github.com/hashicorp/go-hclog"
2119
"github.com/hashicorp/raft"
22-
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/require"
2321
"google.golang.org/grpc"
2422
"google.golang.org/grpc/credentials/insecure"
2523
)
@@ -42,57 +40,7 @@ func shutdown(nodes []Node) {
4240
}
4341
}
4442

45-
type portsAdress struct {
46-
grpc int
47-
raft int
48-
redis int
49-
dynamo int
50-
grpcAddress string
51-
raftAddress string
52-
redisAddress string
53-
dynamoAddress string
54-
}
55-
56-
const (
57-
// raft and the grpc requested by the client use grpc and are received on the same port
58-
grpcPort = 50000
59-
raftPort = 50000
60-
61-
redisPort = 63790
62-
dynamoPort = 28000
63-
)
64-
65-
var mu sync.Mutex
66-
var portGrpc atomic.Int32
67-
var portRaft atomic.Int32
68-
var portRedis atomic.Int32
69-
var portDynamo atomic.Int32
70-
71-
func init() {
72-
portGrpc.Store(raftPort)
73-
portRaft.Store(grpcPort)
74-
portRedis.Store(redisPort)
75-
portDynamo.Store(dynamoPort)
76-
}
77-
78-
func portAssigner() portsAdress {
79-
mu.Lock()
80-
defer mu.Unlock()
81-
gp := portGrpc.Add(1)
82-
rp := portRaft.Add(1)
83-
rd := portRedis.Add(1)
84-
dn := portDynamo.Add(1)
85-
return portsAdress{
86-
grpc: int(gp),
87-
raft: int(rp),
88-
redis: int(rd),
89-
dynamo: int(dn),
90-
grpcAddress: net.JoinHostPort("localhost", strconv.Itoa(int(gp))),
91-
raftAddress: net.JoinHostPort("localhost", strconv.Itoa(int(rp))),
92-
redisAddress: net.JoinHostPort("localhost", strconv.Itoa(int(rd))),
93-
dynamoAddress: net.JoinHostPort("localhost", strconv.Itoa(int(dn))),
94-
}
95-
}
43+
// Node groups the servers and addresses used in tests.
9644

9745
type Node struct {
9846
grpcAddress string
@@ -131,43 +79,38 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
13179
waitInterval = 100 * time.Millisecond
13280
)
13381

134-
cfg := raft.Configuration{}
135-
ports := make([]portsAdress, n)
136-
13782
ctx := context.Background()
138-
var lc net.ListenConfig
13983

140-
// port assign
84+
// allocate listeners for gRPC/raft in advance so ports are reserved
85+
grpcListeners := make([]net.Listener, n)
86+
cfg := raft.Configuration{}
14187
for i := 0; i < n; i++ {
142-
ports[i] = portAssigner()
143-
}
88+
l, err := net.Listen("tcp", "127.0.0.1:0")
89+
require.NoError(t, err)
90+
grpcListeners[i] = l
91+
addr := l.Addr().String()
92+
grpcAdders = append(grpcAdders, addr)
14493

145-
// build raft node config
146-
for i := 0; i < n; i++ {
14794
var suffrage raft.ServerSuffrage
14895
if i == 0 {
14996
suffrage = raft.Voter
15097
} else {
15198
suffrage = raft.Nonvoter
15299
}
153-
154-
server := raft.Server{
100+
cfg.Servers = append(cfg.Servers, raft.Server{
155101
Suffrage: suffrage,
156102
ID: raft.ServerID(strconv.Itoa(i)),
157-
Address: raft.ServerAddress(ports[i].raftAddress),
158-
}
159-
cfg.Servers = append(cfg.Servers, server)
103+
Address: raft.ServerAddress(addr),
104+
})
160105
}
161106

162107
for i := 0; i < n; i++ {
163108
st := store.NewRbMemoryStore()
164109
trxSt := store.NewMemoryStoreDefaultTTL()
165110
fsm := kv.NewKvFSM(st, trxSt)
166111

167-
port := ports[i]
168-
169-
r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg)
170-
assert.NoError(t, err)
112+
r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg)
113+
require.NoError(t, err)
171114

172115
s := grpc.NewServer()
173116
trx := kv.NewTransaction(r)
@@ -181,34 +124,32 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
181124
leaderhealth.Setup(r, s, []string{"Example"})
182125
raftadmin.Register(s, r)
183126

184-
grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
185-
assert.NoError(t, err)
186-
187-
grpcAdders = append(grpcAdders, port.grpcAddress)
188-
redisAdders = append(redisAdders, port.redisAddress)
189-
go func() {
190-
assert.NoError(t, s.Serve(grpcSock))
191-
}()
127+
go func(l net.Listener) {
128+
require.NoError(t, s.Serve(l))
129+
}(grpcListeners[i])
192130

193-
l, err := lc.Listen(ctx, "tcp", port.redisAddress)
194-
assert.NoError(t, err)
131+
l, err := net.Listen("tcp", "127.0.0.1:0")
132+
require.NoError(t, err)
133+
redisAddr := l.Addr().String()
134+
redisAdders = append(redisAdders, redisAddr)
195135
rd := NewRedisServer(l, st, coordinator)
196136
go func() {
197-
assert.NoError(t, rd.Run())
137+
require.NoError(t, rd.Run())
198138
}()
199139

200-
dl, err := lc.Listen(ctx, "tcp", port.dynamoAddress)
201-
assert.NoError(t, err)
140+
dl, err := net.Listen("tcp", "127.0.0.1:0")
141+
require.NoError(t, err)
142+
dynamoAddr := dl.Addr().String()
202143
ds := NewDynamoDBServer(dl, st, coordinator)
203144
go func() {
204-
assert.NoError(t, ds.Run())
145+
require.NoError(t, ds.Run())
205146
}()
206147

207148
nodes = append(nodes, newNode(
208-
port.grpcAddress,
209-
port.raftAddress,
210-
port.redisAddress,
211-
port.dynamoAddress,
149+
grpcAdders[i],
150+
grpcAdders[i],
151+
redisAddr,
152+
dynamoAddr,
212153
r,
213154
tm,
214155
s,
@@ -219,7 +160,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
219160

220161
d := &net.Dialer{Timeout: time.Second}
221162
for _, n := range nodes {
222-
assert.Eventually(t, func() bool {
163+
require.Eventually(t, func() bool {
223164
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
224165
if err != nil {
225166
return false
@@ -239,7 +180,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
239180
}, waitTimeout, waitInterval)
240181
}
241182

242-
assert.Eventually(t, func() bool {
183+
require.Eventually(t, func() bool {
243184
return nodes[0].raft.State() == raft.Leader
244185
}, waitTimeout, waitInterval)
245186

kv/shard_router.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,25 @@ func (s *ShardRouter) process(reqs []*pb.Request, fn func(*routerGroup, []*pb.Re
5959
}
6060

6161
var max uint64
62+
var errs error
6263
for gid, rs := range grouped {
6364
g, ok := s.getGroup(gid)
6465
if !ok {
65-
return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid)
66+
err := errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid)
67+
errs = errors.CombineErrors(errs, err)
68+
continue
6669
}
6770
r, err := fn(g, rs)
6871
if err != nil {
69-
return nil, errors.WithStack(err)
72+
errs = errors.CombineErrors(errs, errors.WithStack(err))
73+
continue
7074
}
7175
if r.CommitIndex > max {
7276
max = r.CommitIndex
7377
}
7478
}
75-
return &TransactionResponse{CommitIndex: max}, nil
79+
resp := &TransactionResponse{CommitIndex: max}
80+
return resp, errs
7681
}
7782

7883
func (s *ShardRouter) getGroup(id uint64) (*routerGroup, bool) {

kv/shard_router_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kv
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"testing"
78
"time"
89

@@ -253,3 +254,32 @@ func TestShardRouterCommitFailure(t *testing.T) {
253254
t.Fatalf("unexpected abort on successful group")
254255
}
255256
}
257+
258+
func TestShardRouterCommitMultipleFailures(t *testing.T) {
259+
e := distribution.NewEngine()
260+
e.UpdateRoute([]byte("a"), []byte("m"), 1)
261+
e.UpdateRoute([]byte("m"), nil, 2)
262+
263+
router := NewShardRouter(e)
264+
265+
fail1 := &fakeTM{commitErr: true}
266+
fail2 := &fakeTM{commitErr: true}
267+
router.Register(1, fail1, nil)
268+
router.Register(2, fail2, nil)
269+
270+
reqs := []*pb.Request{
271+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v1")}}},
272+
{IsTxn: false, Phase: pb.Phase_NONE, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v2")}}},
273+
}
274+
275+
_, err := router.Commit(reqs)
276+
if err == nil {
277+
t.Fatalf("expected error")
278+
}
279+
if c := strings.Count(fmt.Sprintf("%+v", err), "commit fail"); c < 2 {
280+
t.Fatalf("expected combined errors, got %d: %+v", c, err)
281+
}
282+
if fail1.commitCalls == 0 || fail2.commitCalls == 0 {
283+
t.Fatalf("expected commits on both groups")
284+
}
285+
}

0 commit comments

Comments
 (0)