Skip to content

Commit bf24d57

Browse files
Merge pull request #18 from savitaashture/flush
Added changes to flush the metrics data after a period of interval
2 parents ad19d5c + 5185330 commit bf24d57

File tree

3 files changed

+47
-9
lines changed

3 files changed

+47
-9
lines changed

cse_collector.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ func (c *CseCollector) updateTimerMetric(prefix string, dur time.Duration) {
7777
count.Update(dur)
7878
}
7979

80+
func (c *CseCollector) cleanMetric(prefix string) {
81+
count, ok := metrics.GetOrRegister(prefix, metrics.NewCounter).(metrics.Counter)
82+
if !ok {
83+
return
84+
}
85+
count.Clear()
86+
}
87+
8088
// IncrementAttempts function increments the number of calls to this circuit.
8189
// This registers as a counter
8290
func (c *CseCollector) IncrementAttempts() {
@@ -148,4 +156,14 @@ func (c *CseCollector) UpdateRunDuration(runDuration time.Duration) {
148156
}
149157

150158
// Reset function is a noop operation in this collector.
151-
func (c *CseCollector) Reset() {}
159+
func (c *CseCollector) Reset() {
160+
c.cleanMetric(c.attempts)
161+
c.cleanMetric(c.failures)
162+
c.cleanMetric(c.successes)
163+
c.cleanMetric(c.shortCircuits)
164+
c.cleanMetric(c.errors)
165+
c.cleanMetric(c.rejects)
166+
c.cleanMetric(c.timeouts)
167+
c.cleanMetric(c.fallbackSuccesses)
168+
c.cleanMetric(c.fallbackFailures)
169+
}

csemonitor.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/ServiceComb/go-chassis/core/config"
1212
"github.com/ServiceComb/go-chassis/core/lager"
1313
"github.com/ServiceComb/go-chassis/core/registry"
14+
"github.com/ServiceComb/go-chassis/third_party/forked/afex/hystrix-go/hystrix"
1415
"github.com/rcrowley/go-metrics"
1516
)
1617

@@ -51,6 +52,20 @@ func NewReporter(r metrics.Registry, addr string, header http.Header, interval t
5152

5253
// Run creates a go_routine which runs continuously and capture the monitoring data
5354
func (reporter *Reporter) Run() {
55+
56+
go reporter.postData()
57+
go reporter.cleanData()
58+
59+
}
60+
61+
func (reporter *Reporter) cleanData() {
62+
ticker := time.Tick(20 * time.Second)
63+
for range ticker {
64+
hystrix.Flush()
65+
}
66+
}
67+
68+
func (reporter *Reporter) postData() {
5469
ticker := time.Tick(reporter.Interval)
5570
metricsAPI := NewCseMonitorClient(reporter.Header, reporter.CseMonitorAddr, reporter.TLSConfig, "v2")
5671
IsMonitoringConnected = true

monitorData.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package metricsink
22

33
import (
4+
"fmt"
5+
"math"
46
"runtime"
57
"runtime/pprof"
8+
"strconv"
69
"strings"
710
"time"
811

9-
"fmt"
1012
"github.com/rcrowley/go-metrics"
11-
"strconv"
1213
)
1314

1415
var threadCreateProfile = pprof.Lookup("threadcreate")
@@ -73,6 +74,7 @@ func (monitorData *MonitorData) getOrCreateInterfaceInfo(name string) *Interface
7374
monitorData.Interfaces = append(monitorData.Interfaces, interfaceInfo)
7475
return interfaceInfo
7576
}
77+
7678
func (monitorData *MonitorData) appendInterfaceInfo(name string, i interface{}) {
7779
var interfaceInfo = monitorData.getOrCreateInterfaceInfo(getInterfaceName(name))
7880
switch metric := i.(type) {
@@ -87,6 +89,15 @@ func (monitorData *MonitorData) appendInterfaceInfo(name string, i interface{})
8789
case "successes":
8890
interfaceInfo.successCount = metric.Count()
8991
}
92+
93+
qps := (float64(interfaceInfo.Total) * (1 - math.Exp(-5.0/60.0/1)))
94+
movingAverageFor3Precision, err := strconv.ParseFloat(fmt.Sprintf("%.3f", qps), 64)
95+
if err == nil {
96+
interfaceInfo.QPS = movingAverageFor3Precision
97+
} else {
98+
interfaceInfo.QPS = 0
99+
}
100+
90101
case metrics.Timer:
91102
t := metric.Snapshot()
92103
ps := t.Percentiles([]float64{0.05, 0.25, 0.5, 0.75, 0.90, 0.99, 0.995})
@@ -100,12 +111,6 @@ func (monitorData *MonitorData) appendInterfaceInfo(name string, i interface{})
100111
interfaceInfo.L99 = int(ps[5] / float64(time.Millisecond))
101112
interfaceInfo.L995 = int(ps[6] / float64(time.Millisecond))
102113
interfaceInfo.Latency = int(t.Mean() / float64(time.Millisecond))
103-
movingAverageFor3Precision, err := strconv.ParseFloat(fmt.Sprintf("%.3f", t.Rate1()), 64)
104-
if err == nil {
105-
interfaceInfo.QPS = movingAverageFor3Precision
106-
} else {
107-
interfaceInfo.QPS = 0
108-
}
109114
}
110115
}
111116
if interfaceInfo.Total == 0 {

0 commit comments

Comments
 (0)