Skip to content

Commit 6629c40

Browse files
authored
Merge pull request #194 from bootjp/oj8pu0-codex/investigate-raft-node-connection-errors
2 parents 50a83b0 + fcf753e commit 6629c40

File tree

4 files changed

+60
-16
lines changed

4 files changed

+60
-16
lines changed

adapter/grpc_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package adapter
33
import (
44
"context"
55
"strconv"
6-
"strings"
76
"sync"
87
"testing"
98

@@ -164,8 +163,7 @@ func Test_grpc_transaction(t *testing.T) {
164163
}
165164

166165
func rawKVClient(t *testing.T, hosts []string) pb.RawKVClient {
167-
dials := "multi:///" + strings.Join(hosts, ",")
168-
conn, err := grpc.NewClient(dials,
166+
conn, err := grpc.NewClient(hosts[0],
169167
grpc.WithTransportCredentials(insecure.NewCredentials()),
170168
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
171169
)
@@ -175,8 +173,7 @@ func rawKVClient(t *testing.T, hosts []string) pb.RawKVClient {
175173
}
176174

177175
func transactionalKVClient(t *testing.T, hosts []string) pb.TransactionalKVClient {
178-
dials := "multi:///" + strings.Join(hosts, ",")
179-
conn, err := grpc.NewClient(dials,
176+
conn, err := grpc.NewClient(hosts[0],
180177
grpc.WithTransportCredentials(insecure.NewCredentials()),
181178
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
182179
)

adapter/test_util.go

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package adapter
22

33
import (
4+
"context"
5+
"log"
46
"net"
57
"strconv"
68
"sync"
@@ -26,6 +28,14 @@ func shutdown(nodes []Node) {
2628
for _, n := range nodes {
2729
n.grpcServer.Stop()
2830
n.redisServer.Stop()
31+
if n.raft != nil {
32+
n.raft.Shutdown()
33+
}
34+
if n.tm != nil {
35+
if err := n.tm.Close(); err != nil {
36+
log.Printf("transport close: %v", err)
37+
}
38+
}
2939
}
3040
}
3141

@@ -80,15 +90,19 @@ type Node struct {
8090
redisAddress string
8191
grpcServer *grpc.Server
8292
redisServer *RedisServer
93+
raft *raft.Raft
94+
tm *transport.Manager
8395
}
8496

85-
func newNode(grpcAddress, raftAddress, redisAddress string, grpcs *grpc.Server, rd *RedisServer) Node {
97+
func newNode(grpcAddress, raftAddress, redisAddress string, r *raft.Raft, tm *transport.Manager, grpcs *grpc.Server, rd *RedisServer) Node {
8698
return Node{
8799
grpcAddress: grpcAddress,
88100
raftAddress: raftAddress,
89101
redisAddress: redisAddress,
90102
grpcServer: grpcs,
91103
redisServer: rd,
104+
raft: r,
105+
tm: tm,
92106
}
93107
}
94108

@@ -98,9 +112,17 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
98112
var redisAdders []string
99113
var nodes []Node
100114

115+
const (
116+
waitTimeout = 5 * time.Second
117+
waitInterval = 100 * time.Millisecond
118+
)
119+
101120
cfg := raft.Configuration{}
102121
ports := make([]portsAdress, n)
103122

123+
ctx := context.Background()
124+
var lc net.ListenConfig
125+
104126
// port assign
105127
for i := 0; i < n; i++ {
106128
ports[i] = portAssigner()
@@ -145,7 +167,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
145167
leaderhealth.Setup(r, s, []string{"Example"})
146168
raftadmin.Register(s, r)
147169

148-
grpcSock, err := net.Listen("tcp", port.grpcAddress)
170+
grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
149171
assert.NoError(t, err)
150172

151173
grpcAdders = append(grpcAdders, port.grpcAddress)
@@ -154,7 +176,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
154176
assert.NoError(t, s.Serve(grpcSock))
155177
}()
156178

157-
l, err := net.Listen("tcp", port.redisAddress)
179+
l, err := lc.Listen(ctx, "tcp", port.redisAddress)
158180
assert.NoError(t, err)
159181
rd := NewRedisServer(l, st, coordinator)
160182
go func() {
@@ -165,14 +187,34 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
165187
port.grpcAddress,
166188
port.raftAddress,
167189
port.redisAddress,
190+
r,
191+
tm,
168192
s,
169-
rd),
170-
)
193+
rd,
194+
))
171195

172196
}
173197

174-
//nolint:mnd
175-
time.Sleep(10 * time.Second)
198+
d := &net.Dialer{Timeout: time.Second}
199+
for _, n := range nodes {
200+
assert.Eventually(t, func() bool {
201+
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
202+
if err != nil {
203+
return false
204+
}
205+
_ = conn.Close()
206+
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
207+
if err != nil {
208+
return false
209+
}
210+
_ = conn.Close()
211+
return true
212+
}, waitTimeout, waitInterval)
213+
}
214+
215+
assert.Eventually(t, func() bool {
216+
return nodes[0].raft.State() == raft.Leader
217+
}, waitTimeout, waitInterval)
176218

177219
return nodes, grpcAdders, redisAdders
178220
}

cmd/server/demo.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"log/slog"
56
"net"
67
"os"
@@ -55,6 +56,8 @@ func main() {
5556
func run(eg *errgroup.Group) error {
5657

5758
cfg := raft.Configuration{}
59+
ctx := context.Background()
60+
var lc net.ListenConfig
5861

5962
for i := 0; i < 3; i++ {
6063
var suffrage raft.ServerSuffrage
@@ -93,7 +96,7 @@ func run(eg *errgroup.Group) error {
9396
leaderhealth.Setup(r, s, []string{"RawKV"})
9497
raftadmin.Register(s, r)
9598

96-
grpcSock, err := net.Listen("tcp", grpcAdders[i])
99+
grpcSock, err := lc.Listen(ctx, "tcp", grpcAdders[i])
97100
if err != nil {
98101
return errors.WithStack(err)
99102
}
@@ -102,7 +105,7 @@ func run(eg *errgroup.Group) error {
102105
return errors.WithStack(s.Serve(grpcSock))
103106
})
104107

105-
l, err := net.Listen("tcp", redisAdders[i])
108+
l, err := lc.Listen(ctx, "tcp", redisAdders[i])
106109
if err != nil {
107110
return errors.WithStack(err)
108111
}

main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ func main() {
4141
}
4242

4343
ctx := context.Background()
44+
var lc net.ListenConfig
45+
4446
_, port, err := net.SplitHostPort(*myAddr)
4547
if err != nil {
4648
log.Fatalf("failed to parse local address (%q): %v", *myAddr, err)
4749
}
4850

49-
grpcSock, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
51+
grpcSock, err := lc.Listen(ctx, "tcp", fmt.Sprintf(":%s", port))
5052
if err != nil {
5153
log.Fatalf("failed to listen: %v", err)
5254
}
@@ -72,7 +74,7 @@ func main() {
7274
raftadmin.Register(gs, r)
7375
reflection.Register(gs)
7476

75-
redisL, err := net.Listen("tcp", *redisAddr)
77+
redisL, err := lc.Listen(ctx, "tcp", *redisAddr)
7678
if err != nil {
7779
log.Fatalf("failed to listen: %v", err)
7880
}

0 commit comments

Comments
 (0)