Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions adapter/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package adapter
import (
"context"
"strconv"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -164,8 +163,7 @@ func Test_grpc_transaction(t *testing.T) {
}

func rawKVClient(t *testing.T, hosts []string) pb.RawKVClient {
dials := "multi:///" + strings.Join(hosts, ",")
conn, err := grpc.NewClient(dials,
conn, err := grpc.NewClient(hosts[0],
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
Expand All @@ -175,8 +173,7 @@ func rawKVClient(t *testing.T, hosts []string) pb.RawKVClient {
}

func transactionalKVClient(t *testing.T, hosts []string) pb.TransactionalKVClient {
dials := "multi:///" + strings.Join(hosts, ",")
conn, err := grpc.NewClient(dials,
conn, err := grpc.NewClient(hosts[0],
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
Expand Down
55 changes: 48 additions & 7 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adapter

import (
"context"
"net"
"strconv"
"sync"
Expand All @@ -26,6 +27,14 @@
for _, n := range nodes {
n.grpcServer.Stop()
n.redisServer.Stop()
if n.raft != nil {
n.raft.Shutdown()
}
if n.tm != nil {
if err := n.tm.Close(); err != nil {
log.Printf("error closing transport: %v", err)

Check failure on line 35 in adapter/test_util.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

undefined: log
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: log (typecheck)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
undefined: log) (typecheck)

}
}
}
}

Expand Down Expand Up @@ -80,15 +89,19 @@
redisAddress string
grpcServer *grpc.Server
redisServer *RedisServer
raft *raft.Raft
tm *transport.Manager
}

func newNode(grpcAddress, raftAddress, redisAddress string, grpcs *grpc.Server, rd *RedisServer) Node {
func newNode(grpcAddress, raftAddress, redisAddress string, r *raft.Raft, tm *transport.Manager, grpcs *grpc.Server, rd *RedisServer) Node {
return Node{
grpcAddress: grpcAddress,
raftAddress: raftAddress,
redisAddress: redisAddress,
grpcServer: grpcs,
redisServer: rd,
raft: r,
tm: tm,
}
}

Expand All @@ -98,9 +111,17 @@
var redisAdders []string
var nodes []Node

const (
waitTimeout = 5 * time.Second
waitInterval = 100 * time.Millisecond
)

cfg := raft.Configuration{}
ports := make([]portsAdress, n)

ctx := context.Background()
var lc net.ListenConfig

// port assign
for i := 0; i < n; i++ {
ports[i] = portAssigner()
Expand Down Expand Up @@ -145,7 +166,7 @@
leaderhealth.Setup(r, s, []string{"Example"})
raftadmin.Register(s, r)

grpcSock, err := net.Listen("tcp", port.grpcAddress)
grpcSock, err := lc.Listen(ctx, "tcp", port.grpcAddress)
assert.NoError(t, err)

grpcAdders = append(grpcAdders, port.grpcAddress)
Expand All @@ -154,7 +175,7 @@
assert.NoError(t, s.Serve(grpcSock))
}()

l, err := net.Listen("tcp", port.redisAddress)
l, err := lc.Listen(ctx, "tcp", port.redisAddress)
assert.NoError(t, err)
rd := NewRedisServer(l, st, coordinator)
go func() {
Expand All @@ -165,14 +186,34 @@
port.grpcAddress,
port.raftAddress,
port.redisAddress,
r,
tm,
s,
rd),
)
rd,
))

}

//nolint:mnd
time.Sleep(10 * time.Second)
d := &net.Dialer{Timeout: time.Second}
for _, n := range nodes {
assert.Eventually(t, func() bool {
conn, err := d.DialContext(ctx, "tcp", n.grpcAddress)
if err != nil {
return false
}
_ = conn.Close()
conn, err = d.DialContext(ctx, "tcp", n.redisAddress)
if err != nil {
return false
}
_ = conn.Close()
return true
}, waitTimeout, waitInterval)
}

assert.Eventually(t, func() bool {
return nodes[0].raft.State() == raft.Leader
}, waitTimeout, waitInterval)

return nodes, grpcAdders, redisAdders
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/server/demo.go
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
could not import github.com/bootjp/elastickv/adapter (-: # github.com/bootjp/elastickv/adapter

"github.com/bootjp/elastickv/adapter"

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"log/slog"
"net"
"os"
Expand Down Expand Up @@ -55,6 +56,8 @@ func main() {
func run(eg *errgroup.Group) error {

cfg := raft.Configuration{}
ctx := context.Background()
var lc net.ListenConfig

for i := 0; i < 3; i++ {
var suffrage raft.ServerSuffrage
Expand Down Expand Up @@ -93,7 +96,7 @@ func run(eg *errgroup.Group) error {
leaderhealth.Setup(r, s, []string{"RawKV"})
raftadmin.Register(s, r)

grpcSock, err := net.Listen("tcp", grpcAdders[i])
grpcSock, err := lc.Listen(ctx, "tcp", grpcAdders[i])
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -102,7 +105,7 @@ func run(eg *errgroup.Group) error {
return errors.WithStack(s.Serve(grpcSock))
})

l, err := net.Listen("tcp", redisAdders[i])
l, err := lc.Listen(ctx, "tcp", redisAdders[i])
if err != nil {
return errors.WithStack(err)
}
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ func main() {
}

ctx := context.Background()
var lc net.ListenConfig

_, port, err := net.SplitHostPort(*myAddr)
if err != nil {
log.Fatalf("failed to parse local address (%q): %v", *myAddr, err)
}

grpcSock, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
grpcSock, err := lc.Listen(ctx, "tcp", fmt.Sprintf(":%s", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
Expand All @@ -72,7 +74,7 @@ func main() {
raftadmin.Register(gs, r)
reflection.Register(gs)

redisL, err := net.Listen("tcp", *redisAddr)
redisL, err := lc.Listen(ctx, "tcp", *redisAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
Expand Down
Loading