Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions adapter/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package adapter
import (
"context"
"strconv"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -164,8 +163,7 @@ func Test_grpc_transaction(t *testing.T) {
}

func rawKVClient(t *testing.T, hosts []string) pb.RawKVClient {
dials := "multi:///" + strings.Join(hosts, ",")
conn, err := grpc.NewClient(dials,
conn, err := grpc.NewClient(hosts[0],
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
Expand All @@ -175,8 +173,7 @@ func rawKVClient(t *testing.T, hosts []string) pb.RawKVClient {
}

func transactionalKVClient(t *testing.T, hosts []string) pb.TransactionalKVClient {
dials := "multi:///" + strings.Join(hosts, ",")
conn, err := grpc.NewClient(dials,
conn, err := grpc.NewClient(hosts[0],
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
Expand Down
56 changes: 49 additions & 7 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package adapter

import (
"context"
"log"
"net"
"strconv"
"sync"
Expand All @@ -26,6 +28,14 @@ func shutdown(nodes []Node) {
for _, n := range nodes {
n.grpcServer.Stop()
n.redisServer.Stop()
if n.raft != nil {
n.raft.Shutdown()
}
if n.tm != nil {
if err := n.tm.Close(); err != nil {
log.Printf("transport close: %v", err)
}
}
}
}

Expand Down Expand Up @@ -80,15 +90,19 @@ type Node struct {
redisAddress string
grpcServer *grpc.Server
redisServer *RedisServer
raft *raft.Raft
tm *transport.Manager
}

func newNode(grpcAddress, raftAddress, redisAddress string, grpcs *grpc.Server, rd *RedisServer) Node {
func newNode(grpcAddress, raftAddress, redisAddress string, r *raft.Raft, tm *transport.Manager, grpcs *grpc.Server, rd *RedisServer) Node {
return Node{
grpcAddress: grpcAddress,
raftAddress: raftAddress,
redisAddress: redisAddress,
grpcServer: grpcs,
redisServer: rd,
raft: r,
tm: tm,
}
}

Expand All @@ -98,9 +112,17 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
var redisAdders []string
var nodes []Node

const (
waitTimeout = 5 * time.Second
waitInterval = 100 * time.Millisecond
)

cfg := raft.Configuration{}
ports := make([]portsAdress, n)

ctx := context.Background()
var lc net.ListenConfig

// port assign
for i := 0; i < n; i++ {
ports[i] = portAssigner()
Expand Down Expand Up @@ -145,7 +167,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
leaderhealth.Setup(r, s, []string{"Example"})
raftadmin.Register(s, r)

grpcSock, err := net.Listen("tcp", port.grpcAddress)
grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
assert.NoError(t, err)

grpcAdders = append(grpcAdders, port.grpcAddress)
Expand All @@ -154,7 +176,7 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
assert.NoError(t, s.Serve(grpcSock))
}()

l, err := net.Listen("tcp", port.redisAddress)
l, err := lc.Listen(ctx, "tcp", port.redisAddress)
assert.NoError(t, err)
rd := NewRedisServer(l, st, coordinator)
go func() {
Expand All @@ -165,14 +187,34 @@ func createNode(t *testing.T, n int) ([]Node, []string, []string) {
port.grpcAddress,
port.raftAddress,
port.redisAddress,
r,
tm,
s,
rd),
)
rd,
))

}

//nolint:mnd
time.Sleep(10 * time.Second)
d := &net.Dialer{Timeout: time.Second}
for _, n := range nodes {
assert.Eventually(t, func() bool {
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
if err != nil {
return false
}
_ = conn.Close()
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
if err != nil {
return false
}
_ = conn.Close()
return true
}, waitTimeout, waitInterval)
}

assert.Eventually(t, func() bool {
return nodes[0].raft.State() == raft.Leader
}, waitTimeout, waitInterval)

return nodes, grpcAdders, redisAdders
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/server/demo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"log/slog"
"net"
"os"
Expand Down Expand Up @@ -55,6 +56,8 @@ func main() {
func run(eg *errgroup.Group) error {

cfg := raft.Configuration{}
ctx := context.Background()
var lc net.ListenConfig

for i := 0; i < 3; i++ {
var suffrage raft.ServerSuffrage
Expand Down Expand Up @@ -93,7 +96,7 @@ func run(eg *errgroup.Group) error {
leaderhealth.Setup(r, s, []string{"RawKV"})
raftadmin.Register(s, r)

grpcSock, err := net.Listen("tcp", grpcAdders[i])
grpcSock, err := lc.Listen(ctx, "tcp", grpcAdders[i])
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -102,7 +105,7 @@ func run(eg *errgroup.Group) error {
return errors.WithStack(s.Serve(grpcSock))
})

l, err := net.Listen("tcp", redisAdders[i])
l, err := lc.Listen(ctx, "tcp", redisAdders[i])
if err != nil {
return errors.WithStack(err)
}
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ func main() {
}

ctx := context.Background()
var lc net.ListenConfig

_, port, err := net.SplitHostPort(*myAddr)
if err != nil {
log.Fatalf("failed to parse local address (%q): %v", *myAddr, err)
}

grpcSock, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
grpcSock, err := lc.Listen(ctx, "tcp", fmt.Sprintf(":%s", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
Expand All @@ -72,7 +74,7 @@ func main() {
raftadmin.Register(gs, r)
reflection.Register(gs)

redisL, err := net.Listen("tcp", *redisAddr)
redisL, err := lc.Listen(ctx, "tcp", *redisAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
Expand Down
Loading