Skip to content

Commit d298472

Browse files
committed
feat: rpc client service is ready, add direct builder and discovery builder
rpc 客户端服务,添加直连 rpc 逻辑,和服务发现模式
1 parent d67ad41 commit d298472

File tree

12 files changed

+1489
-11
lines changed

12 files changed

+1489
-11
lines changed

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
run:
22
timeout: 5m
3-
modules-download-mode: readonly
3+
modules-download-mode: mod
44

55
govet:
66
disable-all: true

README.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,22 @@ Microservice framework implemented based on Golang.
7373
│ │ ├─pprof
7474
│ │ └─validation
7575
│ └─rpcserver
76-
│ ├─clientinterceptors
77-
│ ├─resolver
78-
│ │ ├─direct
79-
│ │ └─discovery
76+
│ ├─client.go rpc 客户端的初始化配置
77+
│ ├─server.go rpc 服务端的初始化配置
78+
│ ├─clientinterceptors 客户端的拦截器:超时连接器
79+
│ ├─resolver 服务发现相关的逻辑 解析器
80+
│ │ ├─direct 直连
81+
│ │ └─discovery 服务发现,负载均衡
82+
│ │ ├─builder.go 服务发现的构建器
83+
│ │ └─resolver.go 服务发现的解析器,负载均衡的逻辑在这里实现 UpdateState 核心
8084
│ ├─selector
8185
│ │ ├─node
8286
│ │ │ ├─direct
8387
│ │ │ └─ewma
8488
│ │ ├─p2c
8589
│ │ ├─random
8690
│ │ └─wrr
87-
│ └─serverinterceptors
91+
│ └─serverinterceptors 服务端的拦截器:超时,crash恢复
8892
├─pkg
8993
│ ├─app 包括的项目的启动服务,配置文件的读取,命令行工具,以及其他选项
9094
│ │ ├─app.go 服务启动:命令行工具,日志,错误包,配置等

gmicro/registry/registry.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package registry
22

3-
import "context"
3+
import (
4+
"context"
5+
)
46

57
// ServiceInstance is the service instance for registry center.
68
type ServiceInstance struct {

gmicro/server/rpcserver/client.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package rpcserver
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"google.golang.org/grpc"
8+
grpcinsecure "google.golang.org/grpc/credentials/insecure"
9+
10+
"github.com/CoderI421/gframework/gmicro/registry"
11+
"github.com/CoderI421/gframework/gmicro/server/rpcserver/clientinterceptors"
12+
"github.com/CoderI421/gframework/gmicro/server/rpcserver/resolver/discovery"
13+
"github.com/CoderI421/gframework/pkg/log"
14+
)
15+
16+
type ClientOption func(o *clientOptions)
17+
type clientOptions struct {
18+
// 服务端的地址
19+
endpoint string
20+
// 超时时间
21+
timeout time.Duration
22+
// 服务发现接口
23+
discovery registry.Discovery
24+
// Unary 服务的拦截器
25+
unaryInts []grpc.UnaryClientInterceptor
26+
// Stream 服务的拦截器
27+
streamInts []grpc.StreamClientInterceptor
28+
// 用户自己设置 grpc 连接的结构体,例如: grpc.WithInsecure(), grpc.WithTransportCredentials()
29+
rpcOpts []grpc.DialOption
30+
// 根据 Name 生成负载均衡的策略
31+
balancerName string
32+
33+
// 客户端的日志
34+
logger log.Logger
35+
}
36+
37+
// 设置服务端的地址
38+
func WithEndpoint(endpoint string) ClientOption {
39+
return func(o *clientOptions) {
40+
o.endpoint = endpoint
41+
}
42+
}
43+
44+
// 设置超时时间
45+
func WithClientTimeout(timeout time.Duration) ClientOption {
46+
return func(o *clientOptions) {
47+
o.timeout = timeout
48+
}
49+
}
50+
51+
// 设置服务发现
52+
func WithDiscovery(d registry.Discovery) ClientOption {
53+
return func(o *clientOptions) {
54+
o.discovery = d
55+
}
56+
}
57+
58+
// 设置拦截器
59+
func WithClientUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption {
60+
return func(o *clientOptions) {
61+
o.unaryInts = in
62+
}
63+
}
64+
65+
// 设置stream拦截器
66+
func WithClientStreamInterceptor(in ...grpc.StreamClientInterceptor) ClientOption {
67+
return func(o *clientOptions) {
68+
o.streamInts = in
69+
}
70+
}
71+
72+
// 设置grpc的dial选项
73+
func WithClientOptions(opts ...grpc.DialOption) ClientOption {
74+
return func(o *clientOptions) {
75+
o.rpcOpts = opts
76+
}
77+
}
78+
79+
// 设置负载均衡器
80+
func WithBalancerName(name string) ClientOption {
81+
return func(o *clientOptions) {
82+
o.balancerName = name
83+
}
84+
}
85+
86+
// 设置日志
87+
func WithClientLogger(logger log.Logger) ClientOption {
88+
return func(o *clientOptions) {
89+
o.logger = logger
90+
}
91+
}
92+
93+
// 非安全拨号
94+
func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
95+
return dial(ctx, true, opts...)
96+
}
97+
98+
func Dial(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
99+
return dial(ctx, false, opts...)
100+
}
101+
102+
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
103+
// 默认配置
104+
options := clientOptions{
105+
timeout: 2000 * time.Millisecond,
106+
balancerName: "round_robin",
107+
}
108+
109+
for _, o := range opts {
110+
o(&options)
111+
}
112+
113+
//TODO 客户端默认拦截器
114+
ints := []grpc.UnaryClientInterceptor{
115+
clientinterceptors.TimeoutInterceptor(options.timeout),
116+
}
117+
streamInts := []grpc.StreamClientInterceptor{}
118+
119+
if len(options.unaryInts) > 0 {
120+
ints = append(ints, options.unaryInts...)
121+
}
122+
if len(options.streamInts) > 0 {
123+
streamInts = append(streamInts, options.streamInts...)
124+
}
125+
126+
//可以由用户端自己传递 这些默认的
127+
grpcOpts := []grpc.DialOption{
128+
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "` + options.balancerName + `"}`),
129+
grpc.WithChainUnaryInterceptor(ints...),
130+
grpc.WithChainStreamInterceptor(streamInts...),
131+
}
132+
133+
//服务发现的选项
134+
if &options.discovery != nil {
135+
grpcOpts = append(grpcOpts, grpc.WithResolvers(
136+
discovery.NewBuilder(options.discovery,
137+
discovery.WithInsecure(insecure)),
138+
))
139+
}
140+
141+
// 如果是非安全模式
142+
if insecure {
143+
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials()))
144+
}
145+
146+
if len(options.rpcOpts) > 0 {
147+
grpcOpts = append(grpcOpts, options.rpcOpts...)
148+
}
149+
150+
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
151+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package clientinterceptors
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"google.golang.org/grpc"
8+
)
9+
10+
func TimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
11+
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
12+
invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
13+
if timeout <= 0 {
14+
return invoker(ctx, method, req, reply, cc, opts...)
15+
}
16+
ctx, cancel := context.WithTimeout(ctx, timeout)
17+
defer cancel()
18+
return invoker(ctx, method, req, reply, cc, opts...)
19+
}
20+
}
21+
22+
//借鉴go-zero
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package direct
2+
3+
import (
4+
"strings"
5+
6+
"google.golang.org/grpc/resolver"
7+
)
8+
9+
type directBuilder struct{}
10+
11+
func init() {
12+
resolver.Register(NewBuilder())
13+
}
14+
15+
/*
16+
NewBuilder creates a directBuilder which is used to factory direct resolvers.
17+
example:
18+
19+
direct://<authority>/127.0.0.1:9000
20+
21+
到达这样一个目的
22+
*/
23+
func NewBuilder() *directBuilder {
24+
return &directBuilder{}
25+
}
26+
func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
27+
addrs := make([]resolver.Address, 0)
28+
for _, addr := range strings.Split(strings.TrimPrefix(target.URL.Path, "/"), ",") {
29+
addrs = append(addrs, resolver.Address{Addr: addr})
30+
}
31+
//grpc建立连接的逻辑都在这里UpdateState
32+
err := cc.UpdateState(resolver.State{Addresses: addrs})
33+
if err != nil {
34+
return nil, err
35+
}
36+
return newDirectResolver(), nil
37+
}
38+
39+
func (d *directBuilder) Scheme() string {
40+
return "direct"
41+
}
42+
43+
var _ resolver.Builder = &directBuilder{}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package direct
2+
3+
import "google.golang.org/grpc/resolver"
4+
5+
type directResolver struct{}
6+
7+
func newDirectResolver() *directResolver {
8+
return &directResolver{}
9+
}
10+
func (r *directResolver) Close() {}
11+
12+
func (r *directResolver) ResolveNow(options resolver.ResolveNowOptions) {}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package discovery
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"time"
8+
9+
"github.com/CoderI421/gframework/gmicro/registry"
10+
11+
"google.golang.org/grpc/resolver"
12+
)
13+
14+
const name = "discovery"
15+
16+
// Option is builder option.
17+
type Option func(o *builder)
18+
19+
// WithTimeout with timeout option.
20+
func WithTimeout(timeout time.Duration) Option {
21+
return func(b *builder) {
22+
b.timeout = timeout
23+
}
24+
}
25+
26+
// WithInsecure with isSecure option.
27+
func WithInsecure(insecure bool) Option {
28+
return func(b *builder) {
29+
b.insecure = insecure
30+
}
31+
}
32+
33+
type builder struct {
34+
discoverer registry.Discovery
35+
timeout time.Duration
36+
insecure bool
37+
}
38+
39+
// NewBuilder creates a builder which is used to factory registry resolvers.
40+
func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder {
41+
b := &builder{
42+
discoverer: d,
43+
timeout: time.Second * 10,
44+
insecure: false,
45+
}
46+
for _, o := range opts {
47+
o(b)
48+
}
49+
return b
50+
}
51+
52+
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
53+
var (
54+
err error
55+
w registry.Watcher
56+
)
57+
done := make(chan struct{}, 1)
58+
ctx, cancel := context.WithCancel(context.Background())
59+
go func() {
60+
w, err = b.discoverer.Watch(ctx, strings.TrimPrefix(target.URL.Path, "/"))
61+
close(done)
62+
}()
63+
select {
64+
case <-done:
65+
case <-time.After(b.timeout):
66+
err = errors.New("discovery create watcher overtime")
67+
}
68+
if err != nil {
69+
cancel()
70+
return nil, err
71+
}
72+
r := &discoveryResolver{
73+
w: w,
74+
cc: cc,
75+
ctx: ctx,
76+
cancel: cancel,
77+
insecure: b.insecure,
78+
}
79+
go r.watch()
80+
return r, nil
81+
}
82+
83+
// Scheme return scheme of discovery
84+
func (*builder) Scheme() string {
85+
return name
86+
}

0 commit comments

Comments
 (0)