Skip to content

Commit 6b5b191

Browse files
committed
refactor
1 parent 1cf3c68 commit 6b5b191

File tree

8 files changed

+270
-308
lines changed

8 files changed

+270
-308
lines changed

cse_collector.go

Lines changed: 0 additions & 27 deletions
This file was deleted.

cse_reporter.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package metricsink
2+
3+
// Forked from github.com/afex/hystrix-go
4+
// Some parts of this file have been modified to make it functional in this package
5+
6+
import (
7+
"crypto/tls"
8+
"github.com/go-chassis/go-chassis/core/common"
9+
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
10+
"github.com/go-mesh/openlogging"
11+
"net/http"
12+
"sync"
13+
"time"
14+
)
15+
16+
var initOnce = sync.Once{}
17+
18+
// CseCollectorConfig is a struct to keep monitoring information
19+
type CseCollectorConfig struct {
20+
// CseMonitorAddr is the http address of the csemonitor server
21+
CseMonitorAddr string
22+
// Headers for csemonitor server
23+
Header http.Header
24+
// TickInterval specifies the period that this collector will send metrics to the server.
25+
TimeInterval time.Duration
26+
// Config structure to configure a TLS client for sending Metric data
27+
TLSConfig *tls.Config
28+
29+
Env string
30+
}
31+
32+
func init() {
33+
hystrix.InstallReporter("CSE Monitoring", reportMetricsToCSEDashboard)
34+
}
35+
36+
var reporter *Reporter
37+
38+
//GetReporter get reporter singleton
39+
func GetReporter() (*Reporter, error) {
40+
var errResult error
41+
initOnce.Do(func() {
42+
monitorServerURL, err := getMonitorEndpoint()
43+
if err != nil {
44+
openlogging.GetLogger().Warnf("Get Monitoring URL failed, CSE monitoring function disabled, err: %v", err)
45+
errResult = err
46+
}
47+
openlogging.GetLogger().Infof("send metric to monitoring service : %s", monitorServerURL)
48+
tlsConfig, err := getTLSForClient(monitorServerURL)
49+
if err != nil {
50+
openlogging.GetLogger().Errorf("Get %s.%s TLS config failed,error : %s", monitorServerURL, common.Consumer, err)
51+
errResult = err
52+
}
53+
reporter, err = NewReporter(&CseCollectorConfig{
54+
CseMonitorAddr: monitorServerURL,
55+
Header: getAuthHeaders(),
56+
TLSConfig: tlsConfig,
57+
})
58+
if err != nil {
59+
openlogging.Error("new reporter failed", openlogging.WithTags(openlogging.Tags{
60+
"err": err.Error(),
61+
}))
62+
errResult = err
63+
}
64+
})
65+
return reporter, errResult
66+
}
67+
68+
//reportMetricsToCSEDashboard use send metrics to cse dashboard
69+
func reportMetricsToCSEDashboard(cb *hystrix.CircuitBreaker) error {
70+
r, err := GetReporter()
71+
if err != nil {
72+
return err
73+
}
74+
r.Send(cb)
75+
return nil
76+
}

csemonitor.go

Lines changed: 31 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package metricsink
22

33
import (
4-
"crypto/tls"
5-
"net/http"
6-
"os"
7-
"time"
8-
94
"github.com/go-chassis/go-archaius"
105
chassisRuntime "github.com/go-chassis/go-chassis/pkg/runtime"
116
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
127
"github.com/go-mesh/openlogging"
8+
"github.com/huaweicse/cse-collector/pkg/monitoring"
9+
"os"
1310
"runtime"
1411
)
1512

@@ -18,71 +15,51 @@ var IsMonitoringConnected bool
1815

1916
// Reporter is a struct to store the registry address and different monitoring information
2017
type Reporter struct {
21-
CseMonitorAddr string
22-
Header http.Header
23-
Interval time.Duration
24-
Percentiles []float64
25-
TLSConfig *tls.Config
26-
app string
27-
version string
28-
service string
29-
environment string
30-
serviceID string
31-
metricsAPI *CseMonitorClient
18+
environment string
19+
c *monitoring.CseMonitorClient
3220
}
3321

3422
// NewReporter creates a new monitoring object for CSE type collections
35-
func NewReporter(addr string, header http.Header, interval time.Duration, tls *tls.Config, app, version, service, env string) *Reporter {
36-
reporter := &Reporter{
37-
CseMonitorAddr: addr,
38-
Header: header,
39-
Interval: interval,
40-
Percentiles: []float64{0.5, 0.75, 0.95, 0.99, 0.999},
41-
TLSConfig: tls,
42-
app: app,
43-
version: version,
44-
service: service,
45-
environment: env,
46-
}
47-
metricsAPI, err := NewCseMonitorClient(reporter.Header, reporter.CseMonitorAddr, reporter.TLSConfig, "v2")
23+
func NewReporter(config *CseCollectorConfig) (*Reporter, error) {
24+
c, err := monitoring.NewCseMonitorClient(config.Header, config.CseMonitorAddr, config.TLSConfig)
4825
if err != nil {
4926
openlogging.GetLogger().Errorf("Get cse monitor client failed:%s", err)
27+
return nil, err
28+
}
29+
reporter := &Reporter{
30+
environment: config.Env,
5031
}
51-
reporter.metricsAPI = metricsAPI
5232
IsMonitoringConnected = true
53-
return reporter
33+
return &Reporter{
34+
environment: config.Env,
35+
c: c,
36+
}, nil
5437
}
5538

56-
// Run creates a go_routine which runs continuously and capture the monitoring data
57-
func (reporter *Reporter) Run(cb *hystrix.CircuitBreaker) {
58-
ticker := time.Tick(reporter.Interval)
59-
60-
for range ticker {
61-
if archaius.GetBool("cse.monitor.client.enable", true) {
62-
reporter.serviceID = chassisRuntime.ServiceID
63-
monitorData := reporter.getData(cb, reporter.app, reporter.version,
64-
reporter.service, reporter.environment, reporter.serviceID, chassisRuntime.InstanceID)
65-
err := reporter.metricsAPI.PostMetrics(monitorData)
66-
if err != nil {
67-
openlogging.GetLogger().Warnf("Unable to report to monitoring server, err: %v", err)
68-
}
39+
//Send send metrics to monitoring service
40+
func (reporter *Reporter) Send(cb *hystrix.CircuitBreaker) {
41+
if archaius.GetBool("cse.monitor.client.enable", true) {
42+
monitorData := reporter.getData(cb)
43+
err := reporter.c.PostMetrics(monitorData)
44+
if err != nil {
45+
openlogging.GetLogger().Warnf("unable to report to monitoring server, err: %v", err)
6946
}
7047
}
7148
}
7249

73-
func (reporter *Reporter) getData(cb *hystrix.CircuitBreaker,
74-
app, version, service, env, serviceID, instanceID string) MonitorData {
75-
var monitorData = NewMonitorData()
76-
monitorData.AppID = app
77-
monitorData.Version = version
78-
monitorData.Name = service
79-
monitorData.ServiceID = serviceID
80-
monitorData.InstanceID = instanceID
81-
monitorData.Environment = env
50+
func (reporter *Reporter) getData(cb *hystrix.CircuitBreaker) monitoring.MonitorData {
51+
var monitorData = monitoring.NewMonitorData()
52+
monitorData.AppID = chassisRuntime.App
53+
monitorData.Version = chassisRuntime.Version
54+
monitorData.Name = chassisRuntime.ServiceName
55+
monitorData.ServiceID = chassisRuntime.ServiceID
56+
monitorData.InstanceID = chassisRuntime.InstanceID
57+
58+
monitorData.Environment = reporter.environment
8259
monitorData.Instance, _ = os.Hostname()
8360
monitorData.Memory = getProcessInfo()
8461
monitorData.Thread = threadCreateProfile.Count()
8562
monitorData.CPU = float64(runtime.NumCPU())
86-
monitorData.appendInterfaceInfo(cb.Name, cb.Metrics.DefaultCollector())
63+
monitorData.AppendInterfaceInfo(cb.Name, cb.Metrics.DefaultCollector())
8764
return *monitorData
8865
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ module github.com/huaweicse/cse-collector
33
require (
44
github.com/go-chassis/foundation v0.0.0-20190621030543-c3b63f787f4c
55
github.com/go-chassis/go-archaius v0.19.0
6-
github.com/go-chassis/go-chassis v1.5.2-0.20190713094014-10193b3a09b8
6+
github.com/go-chassis/go-chassis v1.6.0
77
github.com/go-mesh/openlogging v1.0.0
88
)

monitor_data.go

Lines changed: 0 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,145 +1,13 @@
11
package metricsink
22

33
import (
4-
"fmt"
5-
"math"
64
"runtime"
75
"runtime/pprof"
8-
"strconv"
96
"strings"
10-
"time"
11-
12-
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
13-
"github.com/go-chassis/go-chassis/third_party/forked/afex/hystrix-go/hystrix/metric_collector"
14-
"github.com/go-mesh/openlogging"
157
)
168

179
var threadCreateProfile = pprof.Lookup("threadcreate")
1810

19-
// MonitorData is an object which stores the monitoring information for an application
20-
type MonitorData struct {
21-
AppID string `json:"appId"`
22-
Version string `json:"version"`
23-
Name string `json:"name"`
24-
Environment string `json:"environment"`
25-
Instance string `json:"instance"`
26-
Thread int `json:"thread"`
27-
Customs map[string]interface{} `json:"customs"` // ?
28-
Interfaces []*InterfaceInfo `json:"interfaces"`
29-
CPU float64 `json:"cpu"`
30-
Memory map[string]interface{} `json:"memory"`
31-
ServiceID string `json:"serviceId"`
32-
InstanceID string `json:"instanceId"`
33-
}
34-
35-
// InterfaceInfo is an object which store the monitoring information of a particular interface
36-
type InterfaceInfo struct {
37-
Name string `json:"name"`
38-
Desc string `json:"desc"`
39-
QPS float64 `json:"qps"`
40-
Latency int `json:"latency"`
41-
L995 int `json:"l995"`
42-
L99 int `json:"l99"`
43-
L90 int `json:"l90"`
44-
L75 int `json:"l75"`
45-
L50 int `json:"l50"`
46-
L25 int `json:"l25"`
47-
L5 int `json:"l5"`
48-
Rate float64 `json:"rate"`
49-
Total int64 `json:"total"`
50-
Failure int64 `json:"failure"`
51-
ShortCircuited int64 `json:"shortCircuited"`
52-
IsCircuitBreakerOpen bool `json:"circuitBreakerOpen"`
53-
SemaphoreRejected int64 `json:"semaphoreRejected"`
54-
ThreadPoolRejected int64 `json:"threadPoolRejected"`
55-
CountTimeout int64 `json:"countTimeout"`
56-
FailureRate float64 `json:"failureRate"`
57-
successCount int64
58-
}
59-
60-
// NewMonitorData creates a new monitoring object
61-
func NewMonitorData() *MonitorData {
62-
monitorData := new(MonitorData)
63-
monitorData.Interfaces = make([]*InterfaceInfo, 0)
64-
return monitorData
65-
}
66-
67-
func (monitorData *MonitorData) getOrCreateInterfaceInfo(name string) *InterfaceInfo {
68-
for _, interfaceInfo := range monitorData.Interfaces {
69-
if interfaceInfo.Name == name {
70-
return interfaceInfo
71-
}
72-
}
73-
interfaceInfo := new(InterfaceInfo)
74-
interfaceInfo.Name = name
75-
interfaceInfo.Desc = name
76-
monitorData.Interfaces = append(monitorData.Interfaces, interfaceInfo)
77-
return interfaceInfo
78-
}
79-
80-
func (monitorData *MonitorData) appendInterfaceInfo(name string, c *metricCollector.DefaultMetricCollector) {
81-
var interfaceInfo = monitorData.getOrCreateInterfaceInfo(name)
82-
now := time.Now()
83-
//attempts:
84-
interfaceInfo.Total = int64(c.NumRequests().Sum(now))
85-
//errors
86-
interfaceInfo.Failure = int64(c.Failures().Sum(now))
87-
//shortCircuits
88-
interfaceInfo.ShortCircuited = int64(c.ShortCircuits().Sum(now))
89-
//successes
90-
interfaceInfo.successCount = int64(c.Successes().Sum(now))
91-
92-
if isCBOpen, err := hystrix.IsCircuitBreakerOpen(name); err != nil {
93-
interfaceInfo.IsCircuitBreakerOpen = false
94-
openlogging.Error("can't get circuit status", openlogging.WithTags(openlogging.Tags{
95-
"err": err.Error(),
96-
"name": name,
97-
}))
98-
} else {
99-
interfaceInfo.IsCircuitBreakerOpen = isCBOpen
100-
}
101-
102-
qps := (float64(interfaceInfo.Total) * (1 - math.Exp(-5.0/60.0/1)))
103-
movingAverageFor3Precision, err := strconv.ParseFloat(fmt.Sprintf("%.3f", qps), 64)
104-
if err == nil {
105-
interfaceInfo.QPS = movingAverageFor3Precision
106-
} else {
107-
interfaceInfo.QPS = 0
108-
}
109-
runDuration := c.RunDuration()
110-
interfaceInfo.L5 = int(runDuration.Percentile(0.05))
111-
interfaceInfo.L25 = int(runDuration.Percentile(0.25))
112-
interfaceInfo.L50 = int(float64(runDuration.Percentile(0.5)))
113-
interfaceInfo.L75 = int(runDuration.Percentile(0.75))
114-
interfaceInfo.L90 = int(runDuration.Percentile(0.90))
115-
interfaceInfo.L99 = int(runDuration.Percentile(0.99))
116-
interfaceInfo.L995 = int(runDuration.Percentile(0.995))
117-
interfaceInfo.Latency = int(runDuration.Mean())
118-
interfaceInfo.Rate = 1 //rate is no use any more and must be set to 1
119-
if interfaceInfo.Total == 0 {
120-
interfaceInfo.FailureRate = 0
121-
} else {
122-
totalErrorCount := interfaceInfo.Failure + interfaceInfo.SemaphoreRejected + interfaceInfo.ThreadPoolRejected + interfaceInfo.CountTimeout
123-
if totalErrorCount == 0 {
124-
interfaceInfo.FailureRate = 0
125-
} else {
126-
failureRate, err := strconv.ParseFloat(fmt.Sprintf("%.3f", float64(totalErrorCount)/float64(interfaceInfo.Total)), 64)
127-
if err == nil && failureRate > 0 {
128-
interfaceInfo.FailureRate = failureRate
129-
} else {
130-
openlogging.GetLogger().Warnf("Error in calculating the failureRate %v, default value(0) is assigned to failureRate", failureRate)
131-
interfaceInfo.FailureRate = 0
132-
}
133-
}
134-
}
135-
}
136-
137-
func GetInterfaceName(metricName string) string {
138-
command := strings.Split(metricName, ".")
139-
return strings.Join(command[:len(command)-1], ".")
140-
141-
}
142-
14311
func getProcessInfo() map[string]interface{} {
14412
var memoryInfo = make(map[string]interface{})
14513
var memStats = runtime.MemStats{}

0 commit comments

Comments
 (0)