Skip to content

Commit 02c4050

Browse files
committed
feat: rpc server add the logic open-telemetry,add the comment for rpc client test
1 parent b7abe8b commit 02c4050

File tree

5 files changed

+67
-5
lines changed

5 files changed

+67
-5
lines changed

app/pkg/options/tracing.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package options
2+
3+
import (
4+
"github.com/CoderI421/gframework/pkg/errors"
5+
"github.com/spf13/pflag"
6+
)
7+
8+
type TelemetryOptions struct {
9+
// Name is the name of the service.
10+
Name string `json:"name"`
11+
// 连接地址
12+
Endpoint string `json:"endpoint"`
13+
// SampleRate is the rate at which traces are sampled. 1.0 means all traces are sampled, 0.0 means no traces are sampled.
14+
// 意思是采样率,1.0表示所有的都采样,0.0表示不采样
15+
Sampler float64 `json:"sampler"`
16+
// Batcher is the type of batcher to use for sending traces to the collector.
17+
// Batcher是用于将跟踪发送到收集器的批处理程序类型。
18+
Batcher string `json:"batcher"`
19+
}
20+
21+
func NewTelemetryOptions() *TelemetryOptions {
22+
return &TelemetryOptions{
23+
Name: "shop",
24+
Endpoint: "http://127.0.0.1:14268/api/traces",
25+
Sampler: 1.0,
26+
Batcher: "jaeger",
27+
}
28+
}
29+
30+
func (t *TelemetryOptions) Validate() (errs []error) {
31+
if t.Batcher != "jaeger" && t.Batcher != "zipkin" {
32+
errs = append(errs, errors.New("open-telemetry batcher only supports: jaeger and zipkin"))
33+
}
34+
return
35+
}
36+
37+
func (t *TelemetryOptions) AddFlags(fs *pflag.FlagSet) {
38+
fs.StringVar(&t.Name, "telemetry.name", t.Name, "open-telemetry name")
39+
fs.StringVar(&t.Endpoint, "telemetry.endpoint", t.Endpoint, "open-telemetry endpoint")
40+
fs.Float64Var(&t.Sampler, "telemetry.sampler", t.Sampler, "open-telemetry sampler")
41+
fs.StringVar(&t.Batcher, "telemetry.batcher", t.Batcher, "open-telemetry batcher,only support jaeger and zipkin")
42+
}

app/user/client/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ func main() {
5050
}
5151
defer conn.Close()
5252
uc := v1.NewUserClient(conn)
53-
// 在终端中,使用 --server.port=*** --server.http-port 命令启动启动多个,这里就会轮询调用,测试负载均衡
53+
54+
// 测试 grpc 服务的负载均衡用例
55+
// 在终端中,使用 --server.port=*** --server.http-port -c configs/user/srv.yaml 命令启动启动多个,这里就会轮询调用,测试负载均衡
5456
for {
5557
re, err := uc.GetUserList(context.Background(), &v1.PageInfo{})
5658
if err != nil {

app/user/srv/config/config.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ var _ app.CliOptions = &Config{}
1111

1212
func New() *Config {
1313
return &Config{
14-
Log: log.NewOptions(),
15-
Server: options.NewServerOptions(),
16-
Registry: options.NewRegistryOptions(),
14+
Log: log.NewOptions(),
15+
Server: options.NewServerOptions(),
16+
Registry: options.NewRegistryOptions(),
17+
Telemetry: options.NewTelemetryOptions(),
1718
}
1819
}
1920

@@ -23,6 +24,8 @@ type Config struct {
2324
Server *options.ServerOptions `json:"server" mapstructure:"server"`
2425
// 注册中心
2526
Registry *options.RegistryOptions `json:"registry" mapstructure:"registry"`
27+
// 链路追踪
28+
Telemetry *options.TelemetryOptions `json:"telemetry" mapstructure:"telemetry"`
2629
}
2730

2831
// Flags implements app.CliOptions interface.Add flags to the specified FlagSet object.
@@ -31,6 +34,7 @@ func (c *Config) Flags() (fss cliflag.NamedFlagSets) {
3134
c.Log.AddFlags(fss.FlagSet("logs"))
3235
c.Server.AddFlags(fss.FlagSet("server"))
3336
c.Registry.AddFlags(fss.FlagSet("registry"))
37+
c.Telemetry.AddFlags(fss.FlagSet("telemetry"))
3438
return fss
3539
}
3640

@@ -40,5 +44,6 @@ func (c *Config) Validate() (errors []error) {
4044
errors = append(errors, c.Log.Validate()...)
4145
errors = append(errors, c.Server.Validate()...)
4246
errors = append(errors, c.Registry.Validate()...)
47+
errors = append(errors, c.Telemetry.Validate()...)
4348
return
4449
}

app/user/srv/rpc.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package srv
33
import (
44
"fmt"
55

6+
"github.com/CoderI421/gframework/gmicro/core/trace"
7+
68
upbv1 "github.com/CoderI421/gframework/api/user/v1"
79
srv1 "github.com/CoderI421/gframework/app/user/srv/service/v1"
810

@@ -13,6 +15,15 @@ import (
1315
)
1416

1517
func NewUserRPCServer(cfg *config.Config) (*rpcserver.Server, error) {
18+
// 初始化 open-telemetry 的 exporter
19+
// 这里会根据 endpoint 为单元注册 trace 服务的实例
20+
trace.InitAgent(trace.Options{
21+
Name: cfg.Telemetry.Name,
22+
Endpoint: cfg.Telemetry.Endpoint,
23+
Sampler: cfg.Telemetry.Sampler,
24+
Batcher: cfg.Telemetry.Batcher,
25+
})
26+
1627
data := mock.NewUsers()
1728
srv := srv1.NewUserService(data)
1829
userver := user.NewUserServer(srv)

gmicro/core/trace/agent.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ const (
2222

2323
var (
2424
// struct{}空结构体 不占内存,zerobase
25+
// 这个map的作用是,记录已经初始化过的endpoint,避免重复初始化
2526
agents = make(map[string]struct{})
26-
lock sync.Mutex
27+
// 保证map的并发安全
28+
lock sync.Mutex
2729
)
2830

2931
func InitAgent(o Options) {

0 commit comments

Comments
 (0)