Skip to content

Commit 9837c61

Browse files
committed
feat: add the logic of gRPC Load Balancing Strategy(LBS)
and this test case
1 parent 5b62aeb commit 9837c61

File tree

18 files changed

+926
-36
lines changed

18 files changed

+926
-36
lines changed

README.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Microservice framework implemented based on Golang.
3838
│ │ └─translator
3939
│ │ └─gin
4040
│ └─user 举例 user 服务
41-
│ ├─client
41+
│ ├─client 用于本地测试 user 服务的客户端 rpc 服务
4242
│ └─srv
4343
│ ├─config 服务的配置项,Log error 等子服务的相关逻辑全部注册到配置中
4444
│ ├─controller 表示层
@@ -67,12 +67,12 @@ Microservice framework implemented based on Golang.
6767
│ ├─registry
6868
│ │ └─consul 服务注册中心相关逻辑,参考 kratos
6969
│ └─server
70-
│ ├─restserver
71-
│ │ ├─middlewares
70+
│ ├─restserver http 服务的初始化配置
71+
│ │ ├─middlewares http 服务的中间件
7272
│ │ │ └─auth
73-
│ │ ├─pprof
74-
│ │ └─validation
75-
│ └─rpcserver
73+
│ │ ├─pprof http 服务的 pprof 相关逻辑
74+
│ │ └─validation http 服务的参数校验
75+
│ └─rpcserver rpc 服务的初始化配置
7676
│ ├─client.go rpc 客户端的初始化配置
7777
│ ├─server.go rpc 服务端的初始化配置
7878
│ ├─clientinterceptors 客户端的拦截器:超时连接器
@@ -81,13 +81,13 @@ Microservice framework implemented based on Golang.
8181
│ │ └─discovery 服务发现,负载均衡
8282
│ │ ├─builder.go 服务发现的构建器
8383
│ │ └─resolver.go 服务发现的解析器,负载均衡的逻辑在这里实现 UpdateState 核心
84-
│ ├─selector
85-
│ │ ├─node
86-
│ │ │ ├─direct
87-
│ │ │ └─ewma
88-
│ │ ├─p2c
89-
│ │ ├─random
90-
│ │ └─wrr
84+
│ ├─selector 重写 grpc 接口,具体服务相关的实现
85+
│ │ ├─node gRPC 服务节点
86+
│ │ │ ├─direct 直连节点
87+
│ │ │ └─ewma ewma算法节点,用于实现 p2c 负载均衡策略
88+
│ │ ├─p2c 负载均衡 [Power of Two Random Choices] 算法
89+
│ │ ├─random 负载均衡随机算法
90+
│ │ └─wrr 负载均衡 加权轮询 [Weighted Round Robin] 算法
9191
│ └─serverinterceptors 服务端的拦截器:超时,crash恢复
9292
├─pkg
9393
│ ├─app 包括的项目的启动服务,配置文件的读取,命令行工具,以及其他选项

app/pkg/options/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type RegistryOptions struct {
1212
Scheme string `json:"scheme" mapstructure:"scheme,omitempty"`
1313
}
1414

15+
// NewRegistryOptions 创建一个默认注册中心的配置
1516
func NewRegistryOptions() *RegistryOptions {
1617
return &RegistryOptions{
1718
Address: "127.0.0.1:8500",

app/pkg/options/server.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,50 @@
11
package options
22

33
import (
4-
"github.com/CoderI421/gframework/pkg/errors"
54
"github.com/spf13/pflag"
65
)
76

87
// ServerOptions service discovery center options console, etcd, nacos etc.
98
type ServerOptions struct {
10-
Name string `json:"name" mapstructure:"name"`
11-
Host string `json:"host" mapstructure:"host"`
12-
Port int `json:"port" mapstructure:"port"`
9+
Name string `json:"name,omitempty" mapstructure:"name"` // 服务名称
10+
Host string `json:"host,omitempty" mapstructure:"host"` // 服务地址
11+
Port int `json:"port,omitempty" mapstructure:"port"` // 服务端口
12+
HttpPort int `json:"http_port,omitempty" mapstructure:"http_port"` // http端口
13+
EnableProfiling bool `json:"profiling,omitempty" mapstructure:"profiling"` // 是否开启性能分析
14+
EnableLimit bool `json:"limit,omitempty" mapstructure:"limit"` // 是否开启限流
15+
EnableMetrics bool `json:"metrics,omitempty" mapstructure:"metrics"` // 是否开启指标
16+
EnableHealthCheck bool `json:"health_check,omitempty" mapstructure:"health_check"` // 是否开启健康检查
17+
Middlewares []string `json:"middlewares,omitempty" mapstructure:"middlewares"` // 中间件
1318
}
1419

1520
func NewServerOptions() *ServerOptions {
1621
return &ServerOptions{
17-
Host: "127.0.0.1",
18-
Port: 8078,
19-
Name: "user-srv",
22+
Host: "127.0.0.1",
23+
Port: 8078,
24+
HttpPort: 8079,
25+
Name: "user-srv",
26+
EnableProfiling: true,
27+
EnableLimit: true,
28+
EnableMetrics: true,
2029
}
2130
}
2231

23-
func (o *ServerOptions) Validate() (errs []error) {
24-
if o.Host == "" || o.Port == 0 {
25-
errs = append(errs, errors.New("Host and Port must be"))
26-
}
32+
func (so *ServerOptions) Validate() (errs []error) {
33+
errs = []error{}
2734
return
2835
}
2936

30-
func (o *ServerOptions) AddFlags(fs *pflag.FlagSet) {
31-
fs.StringVar(&o.Host, "server.host", o.Host, "server host default is 127.0.0.1")
32-
33-
fs.IntVar(&o.Port, "server.port", o.Port, "server port default is 8078")
34-
fs.StringVar(&o.Name, "server.name", o.Name, "server name default is mxshop-user-srv")
37+
func (so *ServerOptions) AddFlags(fs *pflag.FlagSet) {
38+
fs.BoolVar(&so.EnableProfiling, "server.enable-profiling", so.EnableProfiling,
39+
"enable-profiling, if true, will add <host>:<port>/debug/pprof/, default is true")
40+
fs.BoolVar(&so.EnableMetrics, "server.enable-metrics", so.EnableMetrics,
41+
"enable-metrics, if true, will add /metrics, default is true")
42+
fs.BoolVar(&so.EnableHealthCheck, "server.enable-health-check", so.EnableHealthCheck,
43+
"enable-health-check, if true, will add health check route, default is true")
44+
//fs.StringVarP 带有简写命令
45+
// (接收值的变量,命令名称,默认值,描述)
46+
fs.StringVar(&so.Host, "server.host", so.Host, "server host default is 127.0.0.1")
47+
fs.IntVar(&so.Port, "server.port", so.Port, "server port default is 8078")
48+
fs.IntVar(&so.HttpPort, "server.http-port", so.HttpPort, "server http port default is 8079")
49+
fs.StringVar(&so.Name, "server.name", so.Name, "server name default is user-srv")
3550
}

app/user/client/client.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,21 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/hashicorp/consul/api"
9-
108
v1 "github.com/CoderI421/gframework/api/user/v1"
119
"github.com/CoderI421/gframework/gmicro/registry/consul"
1210
"github.com/CoderI421/gframework/gmicro/server/rpcserver"
11+
"github.com/CoderI421/gframework/gmicro/server/rpcserver/selector"
12+
"github.com/CoderI421/gframework/gmicro/server/rpcserver/selector/random"
13+
"github.com/hashicorp/consul/api"
1314
)
1415

1516
func main() {
17+
//客户端,设置全局负载均衡策略,这里选择了 random
18+
// 这个逻辑中,balancerName是selector,在 gmicro/server/rpcserver/balancer.go中规定的
19+
selector.SetGlobalSelector(random.NewBuilder())
20+
// selector.SetGlobalSelector(random.NewBuilder()) 设定的全局的 selector 然后这里,调用 selector 进行注册
21+
rpcserver.InitBuilder()
22+
1623
conf := api.DefaultConfig()
1724
conf.Address = "127.0.0.1:8500"
1825
conf.Scheme = "http"
@@ -27,6 +34,7 @@ func main() {
2734

2835
conn, err := rpcserver.DialInsecure(
2936
context.Background(), rpcserver.WithDiscovery(r),
37+
rpcserver.WithBalancerName("selector"),
3038
/*
3139
第3个/是为了第二个参数是空的
3240
默认格式:direct://<authority>/127.0.0.1:8078
@@ -42,9 +50,13 @@ func main() {
4250
}
4351
defer conn.Close()
4452
uc := v1.NewUserClient(conn)
45-
re, err := uc.GetUserList(context.Background(), &v1.PageInfo{})
46-
if err != nil {
47-
panic(err)
53+
// 在终端中,使用 --server.port=*** --server.http-port 命令启动启动多个,这里就会轮询调用,测试负载均衡
54+
for {
55+
re, err := uc.GetUserList(context.Background(), &v1.PageInfo{})
56+
if err != nil {
57+
panic(err)
58+
}
59+
fmt.Println(re)
60+
time.Sleep(time.Second * 5)
4861
}
49-
fmt.Println(re)
5062
}

gmicro/app/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (a *App) Run() error {
5858
a.mux.Unlock()
5959

6060
go func() {
61-
// start the rpc server
61+
// start the rpc server goroutine
6262
if a.opts.rpcServer != nil {
6363
err := a.opts.rpcServer.Start()
6464
if err != nil {

gmicro/server/rpcserver/balancer.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package rpcserver
2+
3+
import (
4+
"google.golang.org/grpc/balancer"
5+
"google.golang.org/grpc/balancer/base"
6+
"google.golang.org/grpc/metadata"
7+
8+
"github.com/CoderI421/gframework/gmicro/registry"
9+
"github.com/CoderI421/gframework/gmicro/server/rpcserver/selector"
10+
)
11+
12+
const (
13+
balancerName = "selector"
14+
)
15+
16+
var (
17+
_ base.PickerBuilder = &balancerBuilder{}
18+
_ balancer.Picker = &balancerPicker{}
19+
)
20+
21+
// InitBuilder initializes the builder for the selector balancer.
22+
// 初始化 grpc balancer
23+
func InitBuilder() {
24+
b := base.NewBalancerBuilder(
25+
balancerName,
26+
&balancerBuilder{
27+
// 获取全局的 selector
28+
builder: selector.GlobalSelector(),
29+
},
30+
base.Config{HealthCheck: true},
31+
)
32+
balancer.Register(b)
33+
}
34+
35+
type balancerBuilder struct {
36+
builder selector.Builder
37+
}
38+
39+
// Build creates a grpc Picker.
40+
func (b *balancerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
41+
if len(info.ReadySCs) == 0 {
42+
// Block the RPC until a new picker is available via UpdateState().
43+
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
44+
}
45+
nodes := make([]selector.Node, 0, len(info.ReadySCs))
46+
for conn, info := range info.ReadySCs {
47+
ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance)
48+
nodes = append(nodes, &grpcNode{
49+
Node: selector.NewNode("grpc", info.Address.Addr, ins),
50+
subConn: conn,
51+
})
52+
}
53+
p := &balancerPicker{
54+
//自己传递的算法 注意先后顺序
55+
selector: b.builder.Build(),
56+
}
57+
p.selector.Apply(nodes)
58+
return p
59+
}
60+
61+
// balancerPicker is a grpc picker.
62+
type balancerPicker struct {
63+
selector selector.Selector
64+
}
65+
66+
// Pick pick instances.
67+
func (p *balancerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
68+
n, done, err := p.selector.Select(info.Ctx)
69+
if err != nil {
70+
return balancer.PickResult{}, err
71+
}
72+
73+
return balancer.PickResult{
74+
SubConn: n.(*grpcNode).subConn,
75+
Done: func(di balancer.DoneInfo) {
76+
done(info.Ctx, selector.DoneInfo{
77+
Err: di.Err,
78+
BytesSent: di.BytesSent,
79+
BytesReceived: di.BytesReceived,
80+
ReplyMD: Trailer(di.Trailer),
81+
})
82+
},
83+
}, nil
84+
}
85+
86+
// Trailer is a grpc trailder MD.
87+
type Trailer metadata.MD
88+
89+
// Get get a grpc trailer value.
90+
func (t Trailer) Get(k string) string {
91+
v := metadata.MD(t).Get(k)
92+
if len(v) > 0 {
93+
return v[0]
94+
}
95+
return ""
96+
}
97+
98+
type grpcNode struct {
99+
selector.Node
100+
subConn balancer.SubConn
101+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package selector
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// Balancer is balancer interface
9+
type Balancer interface {
10+
Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, done DoneFunc, err error)
11+
}
12+
13+
// BalancerBuilder build balancer
14+
type BalancerBuilder interface {
15+
Build() Balancer
16+
}
17+
18+
// WeightedNode calculates scheduling weight in real time
19+
type WeightedNode interface {
20+
Node
21+
22+
// Raw returns the original node
23+
Raw() Node
24+
25+
// Weight is the runtime calculated weight
26+
Weight() float64
27+
28+
// Pick the node
29+
Pick() DoneFunc
30+
31+
// PickElapsed is time elapsed since the latest pick
32+
PickElapsed() time.Duration
33+
}
34+
35+
// WeightedNodeBuilder is WeightedNode Builder
36+
type WeightedNodeBuilder interface {
37+
Build(Node) WeightedNode
38+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package selector
2+
3+
import (
4+
"strconv"
5+
6+
"github.com/CoderI421/gframework/gmicro/registry"
7+
)
8+
9+
// DefaultNode is selector node
10+
type DefaultNode struct {
11+
scheme string
12+
addr string
13+
weight *int64
14+
version string
15+
name string
16+
metadata map[string]string
17+
}
18+
19+
// Scheme is node scheme
20+
func (n *DefaultNode) Scheme() string {
21+
return n.scheme
22+
}
23+
24+
// Address is node address
25+
func (n *DefaultNode) Address() string {
26+
return n.addr
27+
}
28+
29+
// ServiceName is node serviceName
30+
func (n *DefaultNode) ServiceName() string {
31+
return n.name
32+
}
33+
34+
// InitialWeight is node initialWeight
35+
func (n *DefaultNode) InitialWeight() *int64 {
36+
return n.weight
37+
}
38+
39+
// Version is node version
40+
func (n *DefaultNode) Version() string {
41+
return n.version
42+
}
43+
44+
// Metadata is node metadata
45+
func (n *DefaultNode) Metadata() map[string]string {
46+
return n.metadata
47+
}
48+
49+
// NewNode new node
50+
func NewNode(scheme, addr string, ins *registry.ServiceInstance) Node {
51+
n := &DefaultNode{
52+
scheme: scheme,
53+
addr: addr,
54+
}
55+
if ins != nil {
56+
n.name = ins.Name
57+
n.version = ins.Version
58+
n.metadata = ins.Metadata
59+
if str, ok := ins.Metadata["weight"]; ok {
60+
if weight, err := strconv.ParseInt(str, 10, 64); err == nil {
61+
n.weight = &weight
62+
}
63+
}
64+
}
65+
return n
66+
}

0 commit comments

Comments
 (0)